Google Cloud Dataflow · Arazzo Workflow

Google Cloud Dataflow List Jobs and Snapshot

Version 1.0.0

List jobs in a region, inspect the first job, then snapshot it.

1 workflow 1 source API 1 provider
View Spec View on GitHub Apache BeamBatch ProcessingBig DataData ProcessingETLStream ProcessingArazzoWorkflows

Provider

google-cloud-dataflow

Workflows

list-jobs-and-snapshot
List jobs in a region, read the first one, and snapshot it.
Lists jobs in a region, captures the first job id, reads that job for its state, then creates a snapshot of it.
3 steps inputs: accessToken, filter, location, projectId, ttl outputs: currentState, jobId, snapshotId
1
listJobs
listLocationJobs
List the Dataflow jobs in the requested region and capture the first job id returned in the page.
2
inspectJob
getLocationJob
Read the first job from the list to inspect its current state before taking a snapshot.
3
snapshotJob
snapshotLocationJob
Create a snapshot of the inspected job, capturing its streaming state with the supplied retention window.

Source API Descriptions

Arazzo Workflow Specification

google-cloud-dataflow-list-jobs-and-snapshot-workflow.yml Raw ↑
arazzo: 1.0.1
info:
  title: Google Cloud Dataflow List Jobs and Snapshot
  summary: List jobs in a region, inspect the first job, then snapshot it.
  description: >-
    Discovers active jobs in a region and protects the first one with a
    snapshot. The workflow lists the jobs in a project location, captures the
    first job id from the list, reads that job to inspect its current state, then
    creates a snapshot of it so its streaming state can later seed a new job.
    Every step spells out its request inline, including the inline Bearer
    authorization Google Cloud requires, so the flow can be read and executed
    without opening the underlying OpenAPI description.
  version: 1.0.0
sourceDescriptions:
- name: dataflowApi
  url: ../openapi/google-cloud-dataflow-api-openapi.yml
  type: openapi
workflows:
- workflowId: list-jobs-and-snapshot
  summary: List jobs in a region, read the first one, and snapshot it.
  description: >-
    Lists jobs in a region, captures the first job id, reads that job for its
    state, then creates a snapshot of it.
  inputs:
    type: object
    required:
    - accessToken
    - projectId
    - location
    properties:
      accessToken:
        type: string
        description: Google Cloud OAuth 2.0 access token used as a Bearer credential.
      projectId:
        type: string
        description: The Google Cloud project id that owns the jobs.
      location:
        type: string
        description: The regional endpoint that contains the jobs (e.g. us-central1).
      filter:
        type: string
        description: Optional job state filter (e.g. ACTIVE, TERMINATED, ALL).
      ttl:
        type: string
        description: Snapshot retention duration as a duration string (e.g. 604800s).
  steps:
  - stepId: listJobs
    description: >-
      List the Dataflow jobs in the requested region and capture the first job
      id returned in the page.
    operationId: listLocationJobs
    parameters:
    - name: Authorization
      in: header
      value: Bearer $inputs.accessToken
    - name: projectId
      in: path
      value: $inputs.projectId
    - name: location
      in: path
      value: $inputs.location
    - name: filter
      in: query
      value: $inputs.filter
    successCriteria:
    - condition: $statusCode == 200
    outputs:
      firstJobId: $response.body#/jobs/0/id
      nextPageToken: $response.body#/nextPageToken
  - stepId: inspectJob
    description: >-
      Read the first job from the list to inspect its current state before
      taking a snapshot.
    operationId: getLocationJob
    parameters:
    - name: Authorization
      in: header
      value: Bearer $inputs.accessToken
    - name: projectId
      in: path
      value: $inputs.projectId
    - name: location
      in: path
      value: $inputs.location
    - name: jobId
      in: path
      value: $steps.listJobs.outputs.firstJobId
    successCriteria:
    - condition: $statusCode == 200
    outputs:
      jobId: $response.body#/id
      currentState: $response.body#/currentState
  - stepId: snapshotJob
    description: >-
      Create a snapshot of the inspected job, capturing its streaming state with
      the supplied retention window.
    operationId: snapshotLocationJob
    parameters:
    - name: Authorization
      in: header
      value: Bearer $inputs.accessToken
    - name: projectId
      in: path
      value: $inputs.projectId
    - name: location
      in: path
      value: $inputs.location
    - name: jobId
      in: path
      value: $steps.inspectJob.outputs.jobId
    requestBody:
      contentType: application/json
      payload:
        ttl: $inputs.ttl
        location: $inputs.location
    successCriteria:
    - condition: $statusCode == 200
    outputs:
      snapshotId: $response.body#/id
      snapshotState: $response.body#/state
  outputs:
    jobId: $steps.inspectJob.outputs.jobId
    currentState: $steps.inspectJob.outputs.currentState
    snapshotId: $steps.snapshotJob.outputs.snapshotId