ZenML · Arazzo Workflow

ZenML Monitor Pipeline Run

Version 1.0.0

Find the latest run of a pipeline, poll its status to completion, and branch on success or failure.

1 workflow 1 source API 1 provider
View Spec View on GitHub AIMachine LearningMLOpsLLMOpsPipelinesOpen SourcePythonArazzoWorkflows

Provider

zenml

Workflows

monitor-pipeline-run
Locate a pipeline's latest run and poll it until it reaches a terminal status.
Lists runs for the supplied pipeline, reads the most recent run, and polls that run until its status is completed, failed, cached, or stopped, then branches on the outcome.
4 steps inputs: accessToken, pipelineId outputs: finalStatus, pipelineName, runId
1
findLatestRun
listPipelineRuns
List runs for the pipeline, newest first, so the most recent run can be selected for monitoring.
2
pollRun
getPipelineRun
Read the run and inspect its status. While the run is still initializing or running, loop back to re-read it; once it reaches a terminal status, branch on whether it completed successfully or not.
3
reportSuccess
getPipeline
Read the parent pipeline for a successful run to surface its name and version alongside the terminal run status.
4
reportFailure
getPipeline
Read the parent pipeline for a failed or stopped run so the failure can be reported against a named pipeline.

Source API Descriptions

Arazzo Workflow Specification

zenml-monitor-pipeline-run-workflow.yml Raw ↑
arazzo: 1.0.1
info:
  title: ZenML Monitor Pipeline Run
  summary: Find the latest run of a pipeline, poll its status to completion, and branch on success or failure.
  description: >-
    Monitors a pipeline run to a terminal state. ZenML runs are launched by the
    SDK/CLI rather than by a dedicated REST trigger endpoint, so this workflow
    locates the most recent run for a pipeline, reads that run, and re-reads it
    on a poll loop until the run reaches a terminal status. It then branches on
    whether the run completed successfully or failed. Every step spells out its
    request inline, including the bearer Authorization header, so the flow can
    be read and executed without opening the underlying OpenAPI description.
  version: 1.0.0
sourceDescriptions:
- name: zenmlApi
  url: ../openapi/zenml-openapi.yml
  type: openapi
workflows:
- workflowId: monitor-pipeline-run
  summary: Locate a pipeline's latest run and poll it until it reaches a terminal status.
  description: >-
    Lists runs for the supplied pipeline, reads the most recent run, and polls
    that run until its status is completed, failed, cached, or stopped, then
    branches on the outcome.
  inputs:
    type: object
    required:
    - accessToken
    - pipelineId
    properties:
      accessToken:
        type: string
        description: ZenML JWT access token obtained from the login endpoint.
      pipelineId:
        type: string
        description: The uuid of the pipeline whose latest run should be monitored.
  steps:
  - stepId: findLatestRun
    description: >-
      List runs for the pipeline, newest first, so the most recent run can be
      selected for monitoring.
    operationId: listPipelineRuns
    parameters:
    - name: Authorization
      in: header
      value: Bearer $inputs.accessToken
    - name: pipeline_id
      in: query
      value: $inputs.pipelineId
    - name: page
      in: query
      value: 1
    - name: size
      in: query
      value: 1
    successCriteria:
    - condition: $statusCode == 200
    outputs:
      runId: $response.body#/items/0/id
  - stepId: pollRun
    description: >-
      Read the run and inspect its status. While the run is still initializing
      or running, loop back to re-read it; once it reaches a terminal status,
      branch on whether it completed successfully or not.
    operationId: getPipelineRun
    parameters:
    - name: Authorization
      in: header
      value: Bearer $inputs.accessToken
    - name: run_id
      in: path
      value: $steps.findLatestRun.outputs.runId
    successCriteria:
    - condition: $statusCode == 200
    outputs:
      status: $response.body#/status
      runId: $response.body#/id
    onSuccess:
    - name: stillRunning
      type: goto
      stepId: pollRun
      criteria:
      - context: $response.body
        condition: $.status == 'running' || $.status == 'initializing'
        type: jsonpath
    - name: runCompleted
      type: goto
      stepId: reportSuccess
      criteria:
      - context: $response.body
        condition: $.status == 'completed' || $.status == 'cached'
        type: jsonpath
    - name: runFailed
      type: goto
      stepId: reportFailure
      criteria:
      - context: $response.body
        condition: $.status == 'failed' || $.status == 'stopped'
        type: jsonpath
  - stepId: reportSuccess
    description: >-
      Read the parent pipeline for a successful run to surface its name and
      version alongside the terminal run status.
    operationId: getPipeline
    parameters:
    - name: Authorization
      in: header
      value: Bearer $inputs.accessToken
    - name: pipeline_id
      in: path
      value: $inputs.pipelineId
    successCriteria:
    - condition: $statusCode == 200
    outputs:
      pipelineName: $response.body#/name
      pipelineVersion: $response.body#/version
    onSuccess:
    - name: done
      type: end
  - stepId: reportFailure
    description: >-
      Read the parent pipeline for a failed or stopped run so the failure can
      be reported against a named pipeline.
    operationId: getPipeline
    parameters:
    - name: Authorization
      in: header
      value: Bearer $inputs.accessToken
    - name: pipeline_id
      in: path
      value: $inputs.pipelineId
    successCriteria:
    - condition: $statusCode == 200
    outputs:
      pipelineName: $response.body#/name
  outputs:
    runId: $steps.pollRun.outputs.runId
    finalStatus: $steps.pollRun.outputs.status
    pipelineName: $steps.reportSuccess.outputs.pipelineName