Google Cloud Dataflow · Arazzo Workflow

Google Cloud Dataflow Launch Classic Template and Monitor

Version 1.0.0

Launch a job from a classic Dataflow template, poll it to completion, then read its metrics.

1 workflow 1 source API 1 provider
View Spec View on GitHub Apache BeamBatch ProcessingBig DataData ProcessingETLStream ProcessingArazzoWorkflows

Provider

google-cloud-dataflow

Workflows

launch-classic-template-and-monitor
Launch a classic template job, poll until done, and read its metrics.
Launches a classic Dataflow template into a region, captures the created job id, polls the job state until JOB_STATE_DONE, then retrieves the job metrics.
3 steps inputs: accessToken, gcsPath, jobName, location, parameters, projectId outputs: finalState, jobId, metrics
1
launchTemplate
launchLocationTemplate
Launch a Dataflow job from the classic template at the supplied Cloud Storage path in the requested region.
2
pollJobState
getLocationJob
Fetch the launched job and confirm it has reached the terminal JOB_STATE_DONE state before reading metrics.
3
getMetrics
getLocationJobMetrics
Retrieve the execution metrics for the completed job, including counters and distributions emitted by the pipeline.

Source API Descriptions

Arazzo Workflow Specification

google-cloud-dataflow-launch-classic-template-and-monitor-workflow.yml Raw ↑
arazzo: 1.0.1
info:
  title: Google Cloud Dataflow Launch Classic Template and Monitor
  summary: Launch a job from a classic Dataflow template, poll it to completion, then read its metrics.
  description: >-
    Launches a Dataflow job from a classic template stored in Cloud Storage in a
    specific region, then watches the resulting job until it reaches a terminal
    state. The workflow launches the template, captures the new job id, polls the
    job state until it reports JOB_STATE_DONE, and finally pulls the execution
    metrics for the completed job. Every step spells out its request inline,
    including the inline Bearer authorization Google Cloud requires, so the flow
    can be read and executed without opening the underlying OpenAPI description.
  version: 1.0.0
sourceDescriptions:
- name: dataflowApi
  url: ../openapi/google-cloud-dataflow-api-openapi.yml
  type: openapi
workflows:
- workflowId: launch-classic-template-and-monitor
  summary: Launch a classic template job, poll until done, and read its metrics.
  description: >-
    Launches a classic Dataflow template into a region, captures the created job
    id, polls the job state until JOB_STATE_DONE, then retrieves the job metrics.
  inputs:
    type: object
    required:
    - accessToken
    - projectId
    - location
    - gcsPath
    - jobName
    properties:
      accessToken:
        type: string
        description: Google Cloud OAuth 2.0 access token used as a Bearer credential.
      projectId:
        type: string
        description: The Google Cloud project id that owns the job.
      location:
        type: string
        description: The regional endpoint that contains the job (e.g. us-central1).
      gcsPath:
        type: string
        description: Cloud Storage path to the classic template (gs://...).
      jobName:
        type: string
        description: The name to assign to the launched job.
      parameters:
        type: object
        description: Map of runtime template parameter name/value pairs.
  steps:
  - stepId: launchTemplate
    description: >-
      Launch a Dataflow job from the classic template at the supplied Cloud
      Storage path in the requested region.
    operationId: launchLocationTemplate
    parameters:
    - name: Authorization
      in: header
      value: Bearer $inputs.accessToken
    - name: projectId
      in: path
      value: $inputs.projectId
    - name: location
      in: path
      value: $inputs.location
    - name: gcsPath
      in: query
      value: $inputs.gcsPath
    requestBody:
      contentType: application/json
      payload:
        jobName: $inputs.jobName
        parameters: $inputs.parameters
    successCriteria:
    - condition: $statusCode == 200
    outputs:
      jobId: $response.body#/job/id
      jobState: $response.body#/job/currentState
  - stepId: pollJobState
    description: >-
      Fetch the launched job and confirm it has reached the terminal
      JOB_STATE_DONE state before reading metrics.
    operationId: getLocationJob
    parameters:
    - name: Authorization
      in: header
      value: Bearer $inputs.accessToken
    - name: projectId
      in: path
      value: $inputs.projectId
    - name: location
      in: path
      value: $inputs.location
    - name: jobId
      in: path
      value: $steps.launchTemplate.outputs.jobId
    successCriteria:
    - condition: $statusCode == 200
    - context: $response.body
      condition: $.currentState == "JOB_STATE_DONE"
      type: jsonpath
    outputs:
      currentState: $response.body#/currentState
    onSuccess:
    - name: completed
      type: goto
      stepId: getMetrics
      criteria:
      - context: $response.body
        condition: $.currentState == "JOB_STATE_DONE"
        type: jsonpath
    onFailure:
    - name: keepPolling
      type: retry
      retryAfter: 30
      retryLimit: 60
      criteria:
      - condition: $statusCode == 200
  - stepId: getMetrics
    description: >-
      Retrieve the execution metrics for the completed job, including counters
      and distributions emitted by the pipeline.
    operationId: getLocationJobMetrics
    parameters:
    - name: Authorization
      in: header
      value: Bearer $inputs.accessToken
    - name: projectId
      in: path
      value: $inputs.projectId
    - name: location
      in: path
      value: $inputs.location
    - name: jobId
      in: path
      value: $steps.launchTemplate.outputs.jobId
    successCriteria:
    - condition: $statusCode == 200
    outputs:
      metricTime: $response.body#/metricTime
      metrics: $response.body#/metrics
  outputs:
    jobId: $steps.launchTemplate.outputs.jobId
    finalState: $steps.pollJobState.outputs.currentState
    metrics: $steps.getMetrics.outputs.metrics