Google Cloud Dataflow · Arazzo Workflow

Google Cloud Dataflow Launch Flex Template and Monitor

Version 1.0.0

Launch a containerized Flex Template job, poll it to completion, then read its metrics.

1 workflow 1 source API 1 provider
View Spec View on GitHub Apache BeamBatch ProcessingBig DataData ProcessingETLStream ProcessingArazzoWorkflows

Provider

google-cloud-dataflow

Workflows

launch-flex-template-and-monitor
Launch a Flex Template job, poll until done, and read its metrics.
Launches a Flex Template into a region, captures the created job id, polls the job state until JOB_STATE_DONE, then retrieves the job metrics.
3 steps inputs: accessToken, containerSpecGcsPath, jobName, location, parameters, projectId outputs: finalState, jobId, metrics
1
launchFlexTemplate
launchLocationFlexTemplate
Launch a Dataflow job from the Flex Template container spec in the requested region.
2
pollJobState
getLocationJob
Fetch the launched job and confirm it has reached the terminal JOB_STATE_DONE state before reading metrics.
3
getMetrics
getLocationJobMetrics
Retrieve the execution metrics for the completed job, including counters and distributions emitted by the pipeline.

Source API Descriptions

Arazzo Workflow Specification

google-cloud-dataflow-launch-flex-template-and-monitor-workflow.yml Raw ↑
arazzo: 1.0.1
info:
  title: Google Cloud Dataflow Launch Flex Template and Monitor
  summary: Launch a containerized Flex Template job, poll it to completion, then read its metrics.
  description: >-
    Launches a Dataflow job from a Flex Template, which packages the pipeline
    code in a Docker container and supports dynamic configuration at launch time.
    The workflow submits the launch parameter, captures the created job id, polls
    the job until it reports JOB_STATE_DONE, and then retrieves the execution
    metrics. Every step spells out its request inline, including the inline
    Bearer authorization Google Cloud requires, so the flow can be read and
    executed without opening the underlying OpenAPI description.
  version: 1.0.0
sourceDescriptions:
- name: dataflowApi
  url: ../openapi/google-cloud-dataflow-api-openapi.yml
  type: openapi
workflows:
- workflowId: launch-flex-template-and-monitor
  summary: Launch a Flex Template job, poll until done, and read its metrics.
  description: >-
    Launches a Flex Template into a region, captures the created job id, polls
    the job state until JOB_STATE_DONE, then retrieves the job metrics.
  inputs:
    type: object
    required:
    - accessToken
    - projectId
    - location
    - jobName
    - containerSpecGcsPath
    properties:
      accessToken:
        type: string
        description: Google Cloud OAuth 2.0 access token used as a Bearer credential.
      projectId:
        type: string
        description: The Google Cloud project id that owns the job.
      location:
        type: string
        description: The regional endpoint that contains the job (e.g. us-central1).
      jobName:
        type: string
        description: The name to assign to the launched job.
      containerSpecGcsPath:
        type: string
        description: Cloud Storage path to the Flex Template container spec file (gs://...).
      parameters:
        type: object
        description: Map of runtime template parameter name/value pairs.
  steps:
  - stepId: launchFlexTemplate
    description: >-
      Launch a Dataflow job from the Flex Template container spec in the
      requested region.
    operationId: launchLocationFlexTemplate
    parameters:
    - name: Authorization
      in: header
      value: Bearer $inputs.accessToken
    - name: projectId
      in: path
      value: $inputs.projectId
    - name: location
      in: path
      value: $inputs.location
    requestBody:
      contentType: application/json
      payload:
        launchParameter:
          jobName: $inputs.jobName
          containerSpecGcsPath: $inputs.containerSpecGcsPath
          parameters: $inputs.parameters
        validateOnly: false
    successCriteria:
    - condition: $statusCode == 200
    outputs:
      jobId: $response.body#/job/id
      jobState: $response.body#/job/currentState
  - stepId: pollJobState
    description: >-
      Fetch the launched job and confirm it has reached the terminal
      JOB_STATE_DONE state before reading metrics.
    operationId: getLocationJob
    parameters:
    - name: Authorization
      in: header
      value: Bearer $inputs.accessToken
    - name: projectId
      in: path
      value: $inputs.projectId
    - name: location
      in: path
      value: $inputs.location
    - name: jobId
      in: path
      value: $steps.launchFlexTemplate.outputs.jobId
    successCriteria:
    - condition: $statusCode == 200
    - context: $response.body
      condition: $.currentState == "JOB_STATE_DONE"
      type: jsonpath
    outputs:
      currentState: $response.body#/currentState
    onSuccess:
    - name: completed
      type: goto
      stepId: getMetrics
      criteria:
      - context: $response.body
        condition: $.currentState == "JOB_STATE_DONE"
        type: jsonpath
    onFailure:
    - name: keepPolling
      type: retry
      retryAfter: 30
      retryLimit: 60
      criteria:
      - condition: $statusCode == 200
  - stepId: getMetrics
    description: >-
      Retrieve the execution metrics for the completed job, including counters
      and distributions emitted by the pipeline.
    operationId: getLocationJobMetrics
    parameters:
    - name: Authorization
      in: header
      value: Bearer $inputs.accessToken
    - name: projectId
      in: path
      value: $inputs.projectId
    - name: location
      in: path
      value: $inputs.location
    - name: jobId
      in: path
      value: $steps.launchFlexTemplate.outputs.jobId
    successCriteria:
    - condition: $statusCode == 200
    outputs:
      metricTime: $response.body#/metricTime
      metrics: $response.body#/metrics
  outputs:
    jobId: $steps.launchFlexTemplate.outputs.jobId
    finalState: $steps.pollJobState.outputs.currentState
    metrics: $steps.getMetrics.outputs.metrics