Google Cloud Dataflow · Arazzo Workflow

Google Cloud Dataflow Snapshot Streaming Job

Version 1.0.0

Confirm a streaming job is running, take a snapshot, then read the snapshot back.

1 workflow 1 source API 1 provider
View Spec View on GitHub Apache BeamBatch ProcessingBig DataData ProcessingETLStream ProcessingArazzoWorkflows

Provider

google-cloud-dataflow

Workflows

snapshot-streaming-job
Confirm a streaming job is running, snapshot it, and read the snapshot.
Reads a job to confirm it is running, creates a snapshot of the streaming pipeline, then retrieves the created snapshot to confirm its state.
3 steps inputs: accessToken, description, jobId, location, projectId, ttl outputs: snapshotId, sourceJobId, state
1
confirmRunning
getLocationJob
Read the job and confirm it is currently in the JOB_STATE_RUNNING state before creating a snapshot.
2
createSnapshot
snapshotLocationJob
Create a snapshot of the running streaming job, capturing its state for later reuse with the supplied retention window.
3
confirmSnapshot
getLocationSnapshot
Read the newly created snapshot back to confirm its state, source job, and creation metadata.

Source API Descriptions

Arazzo Workflow Specification

google-cloud-dataflow-snapshot-streaming-job-workflow.yml Raw ↑
arazzo: 1.0.1
info:
  title: Google Cloud Dataflow Snapshot Streaming Job
  summary: Confirm a streaming job is running, take a snapshot, then read the snapshot back.
  description: >-
    Captures the state of a running streaming Dataflow job into a snapshot that
    can later seed a new job. The workflow reads the job to confirm it is
    running, creates a snapshot with the supplied retention window, captures the
    new snapshot id, then reads the snapshot back to confirm its state and
    metadata. Every step spells out its request inline, including the inline
    Bearer authorization Google Cloud requires, so the flow can be read and
    executed without opening the underlying OpenAPI description.
  version: 1.0.0
sourceDescriptions:
- name: dataflowApi
  url: ../openapi/google-cloud-dataflow-api-openapi.yml
  type: openapi
workflows:
- workflowId: snapshot-streaming-job
  summary: Confirm a streaming job is running, snapshot it, and read the snapshot.
  description: >-
    Reads a job to confirm it is running, creates a snapshot of the streaming
    pipeline, then retrieves the created snapshot to confirm its state.
  inputs:
    type: object
    required:
    - accessToken
    - projectId
    - location
    - jobId
    properties:
      accessToken:
        type: string
        description: Google Cloud OAuth 2.0 access token used as a Bearer credential.
      projectId:
        type: string
        description: The Google Cloud project id that owns the job.
      location:
        type: string
        description: The regional endpoint that contains the job (e.g. us-central1).
      jobId:
        type: string
        description: The id of the running streaming job to snapshot.
      ttl:
        type: string
        description: Snapshot retention duration as a duration string (e.g. 604800s).
      description:
        type: string
        description: A human-readable description for the snapshot.
  steps:
  - stepId: confirmRunning
    description: >-
      Read the job and confirm it is currently in the JOB_STATE_RUNNING state
      before creating a snapshot.
    operationId: getLocationJob
    parameters:
    - name: Authorization
      in: header
      value: Bearer $inputs.accessToken
    - name: projectId
      in: path
      value: $inputs.projectId
    - name: location
      in: path
      value: $inputs.location
    - name: jobId
      in: path
      value: $inputs.jobId
    successCriteria:
    - condition: $statusCode == 200
    - context: $response.body
      condition: $.currentState == "JOB_STATE_RUNNING"
      type: jsonpath
    outputs:
      currentState: $response.body#/currentState
  - stepId: createSnapshot
    description: >-
      Create a snapshot of the running streaming job, capturing its state for
      later reuse with the supplied retention window.
    operationId: snapshotLocationJob
    parameters:
    - name: Authorization
      in: header
      value: Bearer $inputs.accessToken
    - name: projectId
      in: path
      value: $inputs.projectId
    - name: location
      in: path
      value: $inputs.location
    - name: jobId
      in: path
      value: $inputs.jobId
    requestBody:
      contentType: application/json
      payload:
        ttl: $inputs.ttl
        location: $inputs.location
        description: $inputs.description
    successCriteria:
    - condition: $statusCode == 200
    outputs:
      snapshotId: $response.body#/id
      snapshotState: $response.body#/state
  - stepId: confirmSnapshot
    description: >-
      Read the newly created snapshot back to confirm its state, source job, and
      creation metadata.
    operationId: getLocationSnapshot
    parameters:
    - name: Authorization
      in: header
      value: Bearer $inputs.accessToken
    - name: projectId
      in: path
      value: $inputs.projectId
    - name: location
      in: path
      value: $inputs.location
    - name: snapshotId
      in: path
      value: $steps.createSnapshot.outputs.snapshotId
    successCriteria:
    - condition: $statusCode == 200
    outputs:
      snapshotId: $response.body#/id
      state: $response.body#/state
      sourceJobId: $response.body#/sourceJobId
      creationTime: $response.body#/creationTime
  outputs:
    snapshotId: $steps.confirmSnapshot.outputs.snapshotId
    state: $steps.confirmSnapshot.outputs.state
    sourceJobId: $steps.confirmSnapshot.outputs.sourceJobId