Snowflake · Arazzo Workflow

Snowflake Create a Table and a Change Stream On It

Version 1.0.0

Create a table, create a stream that tracks changes to it, then describe the stream.

1 workflow 2 source APIs 1 provider
View Spec View on GitHub Data LakesData SharingData WarehousingDatabaseSQLArazzoWorkflows

Provider

snowflake

Workflows

create-stream-on-table
Create a table, create a stream tracking it, then fetch the stream to confirm.
Chains createTable, createStream, and fetchStream so a source table is provisioned, a change stream is created against it, and the stream is verified, all scoped to the same database and schema.
3 steps inputs: authToken, columns, comment, databaseName, schemaName, streamName, tableName, tokenType outputs: confirmedStream, streamStatus, tableStatus
1
createTable
createTable
Create the source table that the stream will track.
2
createStream
createStream
Create the stream that tracks changes to the source table.
3
fetchStream
fetchStream
Describe the stream to confirm it was created and read back its source.

Source API Descriptions

Arazzo Workflow Specification

snowflake-create-stream-on-table-workflow.yml Raw ↑
arazzo: 1.0.1
info:
  title: Snowflake Create a Table and a Change Stream On It
  summary: Create a table, create a stream that tracks changes to it, then describe the stream.
  description: >-
    Change-data-capture provisioning flow. The workflow creates a source table,
    creates a stream that tracks DML changes to that table, and describes the
    stream to confirm it was created and read back its source. Each step inlines
    its Authorization bearer token and the X-Snowflake-Authorization-Token-Type
    header, its create-mode query parameter, and its JSON request body where
    applicable so the chain can be read and executed without opening the
    underlying OpenAPI descriptions. Because the table and stream live in
    separate OpenAPI documents, chaining flows through workflow inputs and step
    outputs.
  version: 1.0.0
sourceDescriptions:
- name: tableApi
  url: ../openapi/table.yaml
  type: openapi
- name: streamApi
  url: ../openapi/stream.yaml
  type: openapi
workflows:
- workflowId: create-stream-on-table
  summary: Create a table, create a stream tracking it, then fetch the stream to confirm.
  description: >-
    Chains createTable, createStream, and fetchStream so a source table is
    provisioned, a change stream is created against it, and the stream is
    verified, all scoped to the same database and schema.
  inputs:
    type: object
    required:
    - authToken
    - databaseName
    - schemaName
    - tableName
    - columns
    - streamName
    properties:
      authToken:
        type: string
        description: Bearer token (KEYPAIR_JWT, OAUTH, or programmatic access token).
      tokenType:
        type: string
        description: Value for the X-Snowflake-Authorization-Token-Type header.
        default: OAUTH
      databaseName:
        type: string
        description: Database that holds the schema, table, and stream.
      schemaName:
        type: string
        description: Schema that holds the table and stream.
      tableName:
        type: string
        description: Name of the source table to create and track.
      columns:
        type: array
        description: Column definitions for the source table (name and datatype).
        items:
          type: object
      streamName:
        type: string
        description: Name of the stream to create on the table.
      comment:
        type: string
        description: Optional comment applied to the stream.
  steps:
  - stepId: createTable
    description: Create the source table that the stream will track.
    operationId: createTable
    parameters:
    - name: database
      in: path
      value: $inputs.databaseName
    - name: schema
      in: path
      value: $inputs.schemaName
    - name: createMode
      in: query
      value: errorIfExists
    - name: Authorization
      in: header
      value: Bearer $inputs.authToken
    - name: X-Snowflake-Authorization-Token-Type
      in: header
      value: $inputs.tokenType
    requestBody:
      contentType: application/json
      payload:
        name: $inputs.tableName
        columns: $inputs.columns
        change_tracking: true
    successCriteria:
    - condition: $statusCode == 200
    outputs:
      status: $response.body#/status
  - stepId: createStream
    description: Create the stream that tracks changes to the source table.
    operationId: createStream
    parameters:
    - name: database
      in: path
      value: $inputs.databaseName
    - name: schema
      in: path
      value: $inputs.schemaName
    - name: createMode
      in: query
      value: errorIfExists
    - name: Authorization
      in: header
      value: Bearer $inputs.authToken
    - name: X-Snowflake-Authorization-Token-Type
      in: header
      value: $inputs.tokenType
    requestBody:
      contentType: application/json
      payload:
        name: $inputs.streamName
        table_name: $inputs.tableName
        comment: $inputs.comment
    successCriteria:
    - condition: $statusCode == 200
    outputs:
      status: $response.body#/status
  - stepId: fetchStream
    description: Describe the stream to confirm it was created and read back its source.
    operationId: fetchStream
    parameters:
    - name: database
      in: path
      value: $inputs.databaseName
    - name: schema
      in: path
      value: $inputs.schemaName
    - name: name
      in: path
      value: $inputs.streamName
    - name: Authorization
      in: header
      value: Bearer $inputs.authToken
    - name: X-Snowflake-Authorization-Token-Type
      in: header
      value: $inputs.tokenType
    successCriteria:
    - condition: $statusCode == 200
    outputs:
      streamName: $response.body#/name
      sourceTable: $response.body#/table_name
  outputs:
    tableStatus: $steps.createTable.outputs.status
    streamStatus: $steps.createStream.outputs.status
    confirmedStream: $steps.fetchStream.outputs.streamName