Dagster · Arazzo Workflow

Dagster External Asset Event Pipeline

Version 1.0.0

Observe, materialize, and check an external asset in a single correlated event pipeline.

1 workflow 1 source API 1 provider
View Spec View on GitHub Data EngineeringData OrchestrationData PipelinesETLWorkflowsAssetsGraphQLArazzoWorkflows

Provider

dagster

Workflows

external-asset-event-pipeline
Report observation, materialization, and check events for one external asset key.
Reports an observation, then a materialization, then an asset check for the same asset key, sharing the data version across the events and branching on whether the check passed.
3 steps inputs: apiToken, assetKey, checkName, dataVersion, metadata, partition, passed, severity outputs: checkResult, materializationResult, observationResult
1
reportObservation
reportAssetObservation
Record an observation of the external asset, capturing the data version and metadata about the source before it is materialized.
2
reportMaterialization
reportAssetMaterialization
Record the materialization of the asset, reusing the data version from the observation so the events are correlated in Dagster.
3
reportCheck
reportAssetCheck
Record the asset check evaluation against the freshly materialized asset, carrying severity through when the check did not pass.

Source API Descriptions

Arazzo Workflow Specification

dagster-external-asset-event-pipeline-workflow.yml Raw ↑
arazzo: 1.0.1
info:
  title: Dagster External Asset Event Pipeline
  summary: Observe, materialize, and check an external asset in a single correlated event pipeline.
  description: >-
    Chains all three Dagster External Assets reporting events for one asset key
    into a single pipeline. The workflow first reports an observation of the
    source, then reports the materialization of the asset once the data has been
    produced, and finally records an asset check evaluation against the freshly
    materialized asset. A shared data version threads through all three events so
    Dagster can correlate them, and the flow branches on the final check outcome
    so a failing check carries its severity through. Because the Dagster External
    Assets REST API exposes only event-reporting endpoints, there is no read-back
    or status-polling step; every step inlines its request so the pipeline can be
    executed without opening the underlying OpenAPI description.
  version: 1.0.0
sourceDescriptions:
- name: dagsterExternalAssetsApi
  url: ../openapi/dagster-external-assets-rest-api-openapi.yml
  type: openapi
workflows:
- workflowId: external-asset-event-pipeline
  summary: Report observation, materialization, and check events for one external asset key.
  description: >-
    Reports an observation, then a materialization, then an asset check for the
    same asset key, sharing the data version across the events and branching on
    whether the check passed.
  inputs:
    type: object
    required:
    - apiToken
    - assetKey
    - dataVersion
    - checkName
    - passed
    properties:
      apiToken:
        type: string
        description: Dagster+ user API token sent in the Dagster-Cloud-Api-Token header.
      assetKey:
        type: string
        description: The external asset key driven through the event pipeline.
      dataVersion:
        type: string
        description: A user-supplied data version shared by the observation and materialization.
      partition:
        type: string
        description: The asset partition associated with the events.
      metadata:
        type: object
        description: Free-form key/value metadata attached to each reported event.
      checkName:
        type: string
        description: The identifier of the asset check evaluated after materialization.
      passed:
        type: boolean
        description: True if the asset check passed, false otherwise.
      severity:
        type: string
        description: Severity level reported when the check failed (WARN or ERROR).
  steps:
  - stepId: reportObservation
    description: >-
      Record an observation of the external asset, capturing the data version
      and metadata about the source before it is materialized.
    operationId: reportAssetObservation
    parameters:
    - name: Dagster-Cloud-Api-Token
      in: header
      value: $inputs.apiToken
    requestBody:
      contentType: application/json
      payload:
        asset_key: $inputs.assetKey
        metadata: $inputs.metadata
        data_version: $inputs.dataVersion
        partition: $inputs.partition
    successCriteria:
    - condition: $statusCode == 200
    outputs:
      observationResult: $response.body
    onSuccess:
    - name: continueToMaterialize
      type: goto
      stepId: reportMaterialization
      criteria:
      - condition: $statusCode == 200
  - stepId: reportMaterialization
    description: >-
      Record the materialization of the asset, reusing the data version from the
      observation so the events are correlated in Dagster.
    operationId: reportAssetMaterialization
    parameters:
    - name: Dagster-Cloud-Api-Token
      in: header
      value: $inputs.apiToken
    requestBody:
      contentType: application/json
      payload:
        asset_key: $inputs.assetKey
        metadata: $inputs.metadata
        data_version: $inputs.dataVersion
        partition: $inputs.partition
    successCriteria:
    - condition: $statusCode == 200
    outputs:
      materializationResult: $response.body
    onSuccess:
    - name: continueToCheck
      type: goto
      stepId: reportCheck
      criteria:
      - condition: $statusCode == 200
  - stepId: reportCheck
    description: >-
      Record the asset check evaluation against the freshly materialized asset,
      carrying severity through when the check did not pass.
    operationId: reportAssetCheck
    parameters:
    - name: Dagster-Cloud-Api-Token
      in: header
      value: $inputs.apiToken
    requestBody:
      contentType: application/json
      payload:
        asset_key: $inputs.assetKey
        check_name: $inputs.checkName
        passed: $inputs.passed
        severity: $inputs.severity
        metadata: $inputs.metadata
    successCriteria:
    - condition: $statusCode == 200
    outputs:
      checkResult: $response.body
    onSuccess:
    - name: checkPassed
      type: end
      criteria:
      - context: $response.body
        condition: $inputs.passed == true
        type: simple
    - name: checkFailed
      type: end
      criteria:
      - context: $response.body
        condition: $inputs.passed == false
        type: simple
  outputs:
    observationResult: $steps.reportObservation.outputs.observationResult
    materializationResult: $steps.reportMaterialization.outputs.materializationResult
    checkResult: $steps.reportCheck.outputs.checkResult