Snowflake · Arazzo Workflow

Snowflake Create, Verify, and Refresh a Pipe

Version 1.0.0

Create a pipe with a COPY statement, describe it, then trigger a refresh to ingest files.

1 workflow 1 source API 1 provider
View Spec View on GitHub Data LakesData SharingData WarehousingDatabaseSQLArazzoWorkflows

Provider

snowflake

Workflows

create-and-refresh-pipe
Create a pipe, fetch it to confirm, then refresh it to queue staged files.
Chains createPipe, fetchPipe, and refreshPipe so a Snowpipe is provisioned with a COPY statement, verified, and then refreshed, all scoped to the same database and schema.
3 steps inputs: authToken, comment, copyStatement, databaseName, pipeName, schemaName, tokenType outputs: confirmedPipe, createStatus, refreshStatus
1
createPipe
createPipe
Create the pipe with its COPY INTO definition.
2
fetchPipe
fetchPipe
Describe the pipe to confirm it was created.
3
refreshPipe
refreshPipe
Refresh the pipe to queue staged files for ingestion.

Source API Descriptions

Arazzo Workflow Specification

snowflake-create-and-refresh-pipe-workflow.yml Raw ↑
arazzo: 1.0.1
info:
  title: Snowflake Create, Verify, and Refresh a Pipe
  summary: Create a pipe with a COPY statement, describe it, then trigger a refresh to ingest files.
  description: >-
    Snowpipe ingestion flow. The workflow creates a pipe backed by a COPY INTO
    statement, describes the pipe to confirm it was created and read back its
    definition, and then triggers a refresh that queues staged files matching the
    pipe for ingestion. Each step inlines its Authorization bearer token and the
    X-Snowflake-Authorization-Token-Type header, its query parameters, and its
    JSON request body where applicable so the chain can be read and executed
    without opening the underlying OpenAPI description.
  version: 1.0.0
sourceDescriptions:
- name: pipeApi
  url: ../openapi/pipe.yaml
  type: openapi
workflows:
- workflowId: create-and-refresh-pipe
  summary: Create a pipe, fetch it to confirm, then refresh it to queue staged files.
  description: >-
    Chains createPipe, fetchPipe, and refreshPipe so a Snowpipe is provisioned
    with a COPY statement, verified, and then refreshed, all scoped to the same
    database and schema.
  inputs:
    type: object
    required:
    - authToken
    - databaseName
    - schemaName
    - pipeName
    - copyStatement
    properties:
      authToken:
        type: string
        description: Bearer token (KEYPAIR_JWT, OAUTH, or programmatic access token).
      tokenType:
        type: string
        description: Value for the X-Snowflake-Authorization-Token-Type header.
        default: OAUTH
      databaseName:
        type: string
        description: Database that holds the schema and pipe.
      schemaName:
        type: string
        description: Schema that holds the pipe.
      pipeName:
        type: string
        description: Name of the pipe to create.
      copyStatement:
        type: string
        description: The COPY INTO statement that defines the pipe.
      comment:
        type: string
        description: Optional comment applied to the pipe.
  steps:
  - stepId: createPipe
    description: Create the pipe with its COPY INTO definition.
    operationId: createPipe
    parameters:
    - name: database
      in: path
      value: $inputs.databaseName
    - name: schema
      in: path
      value: $inputs.schemaName
    - name: createMode
      in: query
      value: errorIfExists
    - name: Authorization
      in: header
      value: Bearer $inputs.authToken
    - name: X-Snowflake-Authorization-Token-Type
      in: header
      value: $inputs.tokenType
    requestBody:
      contentType: application/json
      payload:
        name: $inputs.pipeName
        copy_statement: $inputs.copyStatement
        comment: $inputs.comment
    successCriteria:
    - condition: $statusCode == 200
    outputs:
      status: $response.body#/status
  - stepId: fetchPipe
    description: Describe the pipe to confirm it was created.
    operationId: fetchPipe
    parameters:
    - name: database
      in: path
      value: $inputs.databaseName
    - name: schema
      in: path
      value: $inputs.schemaName
    - name: name
      in: path
      value: $inputs.pipeName
    - name: Authorization
      in: header
      value: Bearer $inputs.authToken
    - name: X-Snowflake-Authorization-Token-Type
      in: header
      value: $inputs.tokenType
    successCriteria:
    - condition: $statusCode == 200
    outputs:
      pipeName: $response.body#/name
  - stepId: refreshPipe
    description: Refresh the pipe to queue staged files for ingestion.
    operationId: refreshPipe
    parameters:
    - name: database
      in: path
      value: $inputs.databaseName
    - name: schema
      in: path
      value: $inputs.schemaName
    - name: name
      in: path
      value: $inputs.pipeName
    - name: Authorization
      in: header
      value: Bearer $inputs.authToken
    - name: X-Snowflake-Authorization-Token-Type
      in: header
      value: $inputs.tokenType
    successCriteria:
    - condition: $statusCode == 200
    outputs:
      status: $response.body#/status
  outputs:
    createStatus: $steps.createPipe.outputs.status
    confirmedPipe: $steps.fetchPipe.outputs.pipeName
    refreshStatus: $steps.refreshPipe.outputs.status