Amazon Kinesis · Arazzo Workflow

Amazon Kinesis Create Stream and Put First Record

Version 1.0.0

Create a stream, wait until it is ACTIVE, then write the first data record.

1 workflow 1 source API 1 provider
View Spec View on GitHub AnalyticsBig DataData ProcessingReal-TimeStreamingArazzoWorkflows

Provider

amazon-kinesis

Workflows

create-stream-and-put-record
Create a stream, poll until ACTIVE, then put the first record.
Creates the stream, loops on DescribeStreamSummary while CREATING, and once ACTIVE writes a single record with PutRecord, returning the shard and sequence number the record landed on.
3 steps inputs: data, partitionKey, shardCount, streamName outputs: sequenceNumber, shardId, streamARN
1
createStream
CreateStream
Create the provisioned data stream with the requested shard count.
2
pollStatus
DescribeStreamSummary
Poll the stream summary, looping while CREATING and advancing to write the record once the stream is ACTIVE.
3
putRecord
PutRecord
Write the first data record into the now-active stream, returning the shard id and sequence number assigned to the record.

Source API Descriptions

Arazzo Workflow Specification

amazon-kinesis-create-stream-and-put-record-workflow.yml Raw ↑
arazzo: 1.0.1
info:
  title: Amazon Kinesis Create Stream and Put First Record
  summary: Create a stream, wait until it is ACTIVE, then write the first data record.
  description: >-
    A complete onboarding flow for a new Kinesis data stream. Because stream
    creation is asynchronous, the workflow creates the stream, polls
    DescribeStreamSummary until the status reports ACTIVE, and only then calls
    PutRecord to write the first data blob into the stream. Branching on
    StreamStatus prevents the producer from writing before the stream can
    accept records. Each step inlines its AWS JSON protocol request, including
    the required X-Amz-Target header, so the flow is self-describing.
  version: 1.0.0
sourceDescriptions:
- name: kinesisDataStreamsApi
  url: ../openapi/amazon-kinesis-data-streams-openapi.yml
  type: openapi
workflows:
- workflowId: create-stream-and-put-record
  summary: Create a stream, poll until ACTIVE, then put the first record.
  description: >-
    Creates the stream, loops on DescribeStreamSummary while CREATING, and once
    ACTIVE writes a single record with PutRecord, returning the shard and
    sequence number the record landed on.
  inputs:
    type: object
    required:
    - streamName
    - shardCount
    - data
    - partitionKey
    properties:
      streamName:
        type: string
        description: The name of the Kinesis data stream to create.
      shardCount:
        type: integer
        description: The number of shards for the provisioned stream.
      data:
        type: string
        description: The Base64-encoded data blob to write into the first record.
      partitionKey:
        type: string
        description: The partition key used to assign the record to a shard.
  steps:
  - stepId: createStream
    description: Create the provisioned data stream with the requested shard count.
    operationId: CreateStream
    parameters:
    - name: X-Amz-Target
      in: header
      value: Kinesis_20131202.CreateStream
    requestBody:
      contentType: application/x-amz-json-1.1
      payload:
        StreamName: $inputs.streamName
        ShardCount: $inputs.shardCount
        StreamModeDetails:
          StreamMode: PROVISIONED
    successCriteria:
    - condition: $statusCode == 200
  - stepId: pollStatus
    description: >-
      Poll the stream summary, looping while CREATING and advancing to write the
      record once the stream is ACTIVE.
    operationId: DescribeStreamSummary
    parameters:
    - name: X-Amz-Target
      in: header
      value: Kinesis_20131202.DescribeStreamSummary
    requestBody:
      contentType: application/x-amz-json-1.1
      payload:
        StreamName: $inputs.streamName
    successCriteria:
    - condition: $statusCode == 200
    outputs:
      streamStatus: $response.body#/StreamDescriptionSummary/StreamStatus
      streamARN: $response.body#/StreamDescriptionSummary/StreamARN
    onSuccess:
    - name: stillCreating
      type: goto
      stepId: pollStatus
      criteria:
      - context: $response.body
        condition: $.StreamDescriptionSummary.StreamStatus == "CREATING"
        type: jsonpath
    - name: nowActive
      type: goto
      stepId: putRecord
      criteria:
      - context: $response.body
        condition: $.StreamDescriptionSummary.StreamStatus == "ACTIVE"
        type: jsonpath
  - stepId: putRecord
    description: >-
      Write the first data record into the now-active stream, returning the
      shard id and sequence number assigned to the record.
    operationId: PutRecord
    parameters:
    - name: X-Amz-Target
      in: header
      value: Kinesis_20131202.PutRecord
    requestBody:
      contentType: application/x-amz-json-1.1
      payload:
        StreamName: $inputs.streamName
        Data: $inputs.data
        PartitionKey: $inputs.partitionKey
    successCriteria:
    - condition: $statusCode == 200
    outputs:
      shardId: $response.body#/ShardId
      sequenceNumber: $response.body#/SequenceNumber
  outputs:
    streamARN: $steps.pollStatus.outputs.streamARN
    shardId: $steps.putRecord.outputs.shardId
    sequenceNumber: $steps.putRecord.outputs.sequenceNumber