Amazon Kinesis · Arazzo Workflow

Amazon Kinesis Create and Activate Stream

Version 1.0.0

Create a Kinesis data stream and poll until it reaches ACTIVE status.

1 workflow 1 source API 1 provider
View Spec View on GitHub AnalyticsBig DataData ProcessingReal-TimeStreamingArazzoWorkflows

Provider

amazon-kinesis

Workflows

create-and-activate-stream
Create a provisioned Kinesis data stream and wait until it is ACTIVE.
Calls CreateStream with the supplied name and shard count, then polls DescribeStreamSummary, looping while the status is CREATING and ending once the status becomes ACTIVE.
2 steps inputs: shardCount, streamName outputs: openShardCount, streamARN, streamStatus
1
createStream
CreateStream
Create the data stream in PROVISIONED mode with the requested shard count. The stream is returned immediately in CREATING status.
2
pollStatus
DescribeStreamSummary
Read the stream summary to check its current status. While the status is CREATING the step loops back to poll again; once ACTIVE the workflow ends.

Source API Descriptions

Arazzo Workflow Specification

amazon-kinesis-create-and-activate-stream-workflow.yml Raw ↑
arazzo: 1.0.1
info:
  title: Amazon Kinesis Create and Activate Stream
  summary: Create a Kinesis data stream and poll until it reaches ACTIVE status.
  description: >-
    Stream creation in Kinesis Data Streams is asynchronous: CreateStream
    returns immediately with the stream set to CREATING, and the service later
    transitions it to ACTIVE. This workflow issues a CreateStream call and then
    repeatedly calls DescribeStreamSummary, branching on the reported
    StreamStatus so the flow only completes once the stream is ready to accept
    records. Every step spells out its AWS JSON protocol request inline,
    including the required X-Amz-Target header, so the flow can be read and
    executed without opening the underlying OpenAPI description.
  version: 1.0.0
sourceDescriptions:
- name: kinesisDataStreamsApi
  url: ../openapi/amazon-kinesis-data-streams-openapi.yml
  type: openapi
workflows:
- workflowId: create-and-activate-stream
  summary: Create a provisioned Kinesis data stream and wait until it is ACTIVE.
  description: >-
    Calls CreateStream with the supplied name and shard count, then polls
    DescribeStreamSummary, looping while the status is CREATING and ending once
    the status becomes ACTIVE.
  inputs:
    type: object
    required:
    - streamName
    - shardCount
    properties:
      streamName:
        type: string
        description: The name of the Kinesis data stream to create.
      shardCount:
        type: integer
        description: The number of shards for the provisioned stream.
  steps:
  - stepId: createStream
    description: >-
      Create the data stream in PROVISIONED mode with the requested shard
      count. The stream is returned immediately in CREATING status.
    operationId: CreateStream
    parameters:
    - name: X-Amz-Target
      in: header
      value: Kinesis_20131202.CreateStream
    requestBody:
      contentType: application/x-amz-json-1.1
      payload:
        StreamName: $inputs.streamName
        ShardCount: $inputs.shardCount
        StreamModeDetails:
          StreamMode: PROVISIONED
    successCriteria:
    - condition: $statusCode == 200
  - stepId: pollStatus
    description: >-
      Read the stream summary to check its current status. While the status is
      CREATING the step loops back to poll again; once ACTIVE the workflow ends.
    operationId: DescribeStreamSummary
    parameters:
    - name: X-Amz-Target
      in: header
      value: Kinesis_20131202.DescribeStreamSummary
    requestBody:
      contentType: application/x-amz-json-1.1
      payload:
        StreamName: $inputs.streamName
    successCriteria:
    - condition: $statusCode == 200
    outputs:
      streamStatus: $response.body#/StreamDescriptionSummary/StreamStatus
      streamARN: $response.body#/StreamDescriptionSummary/StreamARN
      openShardCount: $response.body#/StreamDescriptionSummary/OpenShardCount
    onSuccess:
    - name: stillCreating
      type: goto
      stepId: pollStatus
      criteria:
      - context: $response.body
        condition: $.StreamDescriptionSummary.StreamStatus == "CREATING"
        type: jsonpath
    - name: nowActive
      type: end
      criteria:
      - context: $response.body
        condition: $.StreamDescriptionSummary.StreamStatus == "ACTIVE"
        type: jsonpath
  outputs:
    streamARN: $steps.pollStatus.outputs.streamARN
    streamStatus: $steps.pollStatus.outputs.streamStatus
    openShardCount: $steps.pollStatus.outputs.openShardCount