Google Cloud Dataflow · Arazzo Workflow

Google Cloud Dataflow Cancel Running Job

Version 1.0.0

Confirm a job is running, request cancellation, then poll until it is cancelled.

1 workflow 1 source API 1 provider
View Spec View on GitHub Apache BeamBatch ProcessingBig DataData ProcessingETLStream ProcessingArazzoWorkflows

Provider

google-cloud-dataflow

Workflows

cancel-running-job
Confirm running, request cancel, and poll until JOB_STATE_CANCELLED.
Reads a job to confirm it is running, updates its requested state to JOB_STATE_CANCELLED, then polls until it reaches JOB_STATE_CANCELLED.
3 steps inputs: accessToken, jobId, location, projectId outputs: finalState, jobId
1
confirmRunning
getLocationJob
Read the job and confirm it is currently in the JOB_STATE_RUNNING state before requesting cancellation.
2
requestCancel
updateLocationJob
Update the job's requested state to JOB_STATE_CANCELLED to immediately stop the pipeline and discard in-flight work.
3
pollUntilCancelled
getLocationJob
Poll the job until it reports the terminal JOB_STATE_CANCELLED state, retrying while it is still cancelling.

Source API Descriptions

Arazzo Workflow Specification

google-cloud-dataflow-cancel-running-job-workflow.yml Raw ↑
arazzo: 1.0.1
info:
  title: Google Cloud Dataflow Cancel Running Job
  summary: Confirm a job is running, request cancellation, then poll until it is cancelled.
  description: >-
    Immediately stops a running Dataflow job by cancelling it, discarding any
    in-flight work. The workflow reads the job to confirm it is running, issues
    an update that sets the requested state to JOB_STATE_CANCELLED, then polls
    the job until it reports the terminal JOB_STATE_CANCELLED state. Every step
    spells out its request inline, including the inline Bearer authorization
    Google Cloud requires, so the flow can be read and executed without opening
    the underlying OpenAPI description.
  version: 1.0.0
sourceDescriptions:
- name: dataflowApi
  url: ../openapi/google-cloud-dataflow-api-openapi.yml
  type: openapi
workflows:
- workflowId: cancel-running-job
  summary: Confirm running, request cancel, and poll until JOB_STATE_CANCELLED.
  description: >-
    Reads a job to confirm it is running, updates its requested state to
    JOB_STATE_CANCELLED, then polls until it reaches JOB_STATE_CANCELLED.
  inputs:
    type: object
    required:
    - accessToken
    - projectId
    - location
    - jobId
    properties:
      accessToken:
        type: string
        description: Google Cloud OAuth 2.0 access token used as a Bearer credential.
      projectId:
        type: string
        description: The Google Cloud project id that owns the job.
      location:
        type: string
        description: The regional endpoint that contains the job (e.g. us-central1).
      jobId:
        type: string
        description: The id of the running job to cancel.
  steps:
  - stepId: confirmRunning
    description: >-
      Read the job and confirm it is currently in the JOB_STATE_RUNNING state
      before requesting cancellation.
    operationId: getLocationJob
    parameters:
    - name: Authorization
      in: header
      value: Bearer $inputs.accessToken
    - name: projectId
      in: path
      value: $inputs.projectId
    - name: location
      in: path
      value: $inputs.location
    - name: jobId
      in: path
      value: $inputs.jobId
    successCriteria:
    - condition: $statusCode == 200
    - context: $response.body
      condition: $.currentState == "JOB_STATE_RUNNING"
      type: jsonpath
    outputs:
      currentState: $response.body#/currentState
  - stepId: requestCancel
    description: >-
      Update the job's requested state to JOB_STATE_CANCELLED to immediately
      stop the pipeline and discard in-flight work.
    operationId: updateLocationJob
    parameters:
    - name: Authorization
      in: header
      value: Bearer $inputs.accessToken
    - name: projectId
      in: path
      value: $inputs.projectId
    - name: location
      in: path
      value: $inputs.location
    - name: jobId
      in: path
      value: $inputs.jobId
    - name: updateMask
      in: query
      value: requestedState
    requestBody:
      contentType: application/json
      payload:
        requestedState: JOB_STATE_CANCELLED
    successCriteria:
    - condition: $statusCode == 200
    outputs:
      requestedState: $response.body#/requestedState
  - stepId: pollUntilCancelled
    description: >-
      Poll the job until it reports the terminal JOB_STATE_CANCELLED state,
      retrying while it is still cancelling.
    operationId: getLocationJob
    parameters:
    - name: Authorization
      in: header
      value: Bearer $inputs.accessToken
    - name: projectId
      in: path
      value: $inputs.projectId
    - name: location
      in: path
      value: $inputs.location
    - name: jobId
      in: path
      value: $inputs.jobId
    successCriteria:
    - condition: $statusCode == 200
    - context: $response.body
      condition: $.currentState == "JOB_STATE_CANCELLED"
      type: jsonpath
    outputs:
      currentState: $response.body#/currentState
    onFailure:
    - name: keepPolling
      type: retry
      retryAfter: 30
      retryLimit: 60
      criteria:
      - condition: $statusCode == 200
  outputs:
    jobId: $inputs.jobId
    finalState: $steps.pollUntilCancelled.outputs.currentState