Google Cloud Dataflow · Arazzo Workflow

Google Cloud Dataflow Drain Running Job

Version 1.0.0

Confirm a streaming job is running, request a drain, then poll until it is drained.

1 workflow 1 source API 1 provider
View Spec View on GitHub Apache BeamBatch ProcessingBig DataData ProcessingETLStream ProcessingArazzoWorkflows

Provider

google-cloud-dataflow

Workflows

drain-running-job
Confirm running, request drain, and poll until JOB_STATE_DRAINED.
Reads a job to confirm it is running, updates its requested state to JOB_STATE_DRAINING, then polls until it reaches JOB_STATE_DRAINED.
3 steps inputs: accessToken, jobId, location, projectId outputs: finalState, jobId
1
confirmRunning
getLocationJob
Read the job and confirm it is currently in the JOB_STATE_RUNNING state before requesting a drain.
2
requestDrain
updateLocationJob
Update the job's requested state to JOB_STATE_DRAINING so it stops ingesting new data while finishing in-flight work.
3
pollUntilDrained
getLocationJob
Poll the job until it reports the terminal JOB_STATE_DRAINED state, retrying while it is still draining.

Source API Descriptions

Arazzo Workflow Specification

google-cloud-dataflow-drain-running-job-workflow.yml Raw ↑
arazzo: 1.0.1
info:
  title: Google Cloud Dataflow Drain Running Job
  summary: Confirm a streaming job is running, request a drain, then poll until it is drained.
  description: >-
    Gracefully stops a running streaming Dataflow job by draining it, which stops
    ingesting new data while finishing in-flight work. The workflow reads the job
    to confirm it is running, issues an update that sets the requested state to
    JOB_STATE_DRAINING, then polls the job until it reports the terminal
    JOB_STATE_DRAINED state. Every step spells out its request inline, including
    the inline Bearer authorization Google Cloud requires, so the flow can be
    read and executed without opening the underlying OpenAPI description.
  version: 1.0.0
sourceDescriptions:
- name: dataflowApi
  url: ../openapi/google-cloud-dataflow-api-openapi.yml
  type: openapi
workflows:
- workflowId: drain-running-job
  summary: Confirm running, request drain, and poll until JOB_STATE_DRAINED.
  description: >-
    Reads a job to confirm it is running, updates its requested state to
    JOB_STATE_DRAINING, then polls until it reaches JOB_STATE_DRAINED.
  inputs:
    type: object
    required:
    - accessToken
    - projectId
    - location
    - jobId
    properties:
      accessToken:
        type: string
        description: Google Cloud OAuth 2.0 access token used as a Bearer credential.
      projectId:
        type: string
        description: The Google Cloud project id that owns the job.
      location:
        type: string
        description: The regional endpoint that contains the job (e.g. us-central1).
      jobId:
        type: string
        description: The id of the running streaming job to drain.
  steps:
  - stepId: confirmRunning
    description: >-
      Read the job and confirm it is currently in the JOB_STATE_RUNNING state
      before requesting a drain.
    operationId: getLocationJob
    parameters:
    - name: Authorization
      in: header
      value: Bearer $inputs.accessToken
    - name: projectId
      in: path
      value: $inputs.projectId
    - name: location
      in: path
      value: $inputs.location
    - name: jobId
      in: path
      value: $inputs.jobId
    successCriteria:
    - condition: $statusCode == 200
    - context: $response.body
      condition: $.currentState == "JOB_STATE_RUNNING"
      type: jsonpath
    outputs:
      currentState: $response.body#/currentState
  - stepId: requestDrain
    description: >-
      Update the job's requested state to JOB_STATE_DRAINING so it stops
      ingesting new data while finishing in-flight work.
    operationId: updateLocationJob
    parameters:
    - name: Authorization
      in: header
      value: Bearer $inputs.accessToken
    - name: projectId
      in: path
      value: $inputs.projectId
    - name: location
      in: path
      value: $inputs.location
    - name: jobId
      in: path
      value: $inputs.jobId
    - name: updateMask
      in: query
      value: requestedState
    requestBody:
      contentType: application/json
      payload:
        requestedState: JOB_STATE_DRAINING
    successCriteria:
    - condition: $statusCode == 200
    outputs:
      requestedState: $response.body#/requestedState
  - stepId: pollUntilDrained
    description: >-
      Poll the job until it reports the terminal JOB_STATE_DRAINED state,
      retrying while it is still draining.
    operationId: getLocationJob
    parameters:
    - name: Authorization
      in: header
      value: Bearer $inputs.accessToken
    - name: projectId
      in: path
      value: $inputs.projectId
    - name: location
      in: path
      value: $inputs.location
    - name: jobId
      in: path
      value: $inputs.jobId
    successCriteria:
    - condition: $statusCode == 200
    - context: $response.body
      condition: $.currentState == "JOB_STATE_DRAINED"
      type: jsonpath
    outputs:
      currentState: $response.body#/currentState
    onFailure:
    - name: keepPolling
      type: retry
      retryAfter: 30
      retryLimit: 60
      criteria:
      - condition: $statusCode == 200
  outputs:
    jobId: $inputs.jobId
    finalState: $steps.pollUntilDrained.outputs.currentState