Amazon Kinesis · Arazzo Workflow

Amazon Kinesis Put Records and Read Back

Version 1.0.0

Batch-write records, get a shard iterator, then read the records back.

1 workflow 1 source API 1 provider
View Spec View on GitHub AnalyticsBig DataData ProcessingReal-TimeStreamingArazzoWorkflows

Provider

amazon-kinesis

Workflows

put-records-and-read-back
Batch-put records, then read them back from the landing shard.
Writes a batch of records with PutRecords, derives the shard from the first result entry, gets a TRIM_HORIZON iterator on that shard, and reads the records back.
3 steps inputs: records, streamName outputs: failedRecordCount, nextShardIterator, records
1
putRecords
PutRecords
Write the batch of records in a single call and capture the shard id of the first result entry to read from.
2
getShardIterator
GetShardIterator
Obtain a TRIM_HORIZON shard iterator for the shard the first record landed on, positioning the reader at the oldest record in the shard.
3
getRecords
GetRecords
Read the records back from the shard using the iterator, returning the retrieved records and the next iterator for continued reading.

Source API Descriptions

Arazzo Workflow Specification

amazon-kinesis-put-records-and-read-back-workflow.yml Raw ↑
arazzo: 1.0.1
info:
  title: Amazon Kinesis Put Records and Read Back
  summary: Batch-write records, get a shard iterator, then read the records back.
  description: >-
    Demonstrates the core producer-to-consumer round trip in Kinesis Data
    Streams. The workflow writes a batch of records with PutRecords, captures
    the shard id the first record landed on, obtains a TRIM_HORIZON shard
    iterator for that shard with GetShardIterator, and then reads the records
    back with GetRecords. Each step inlines its AWS JSON protocol request,
    including the required X-Amz-Target header, so the round trip can be read
    and executed without opening the underlying OpenAPI description.
  version: 1.0.0
sourceDescriptions:
- name: kinesisDataStreamsApi
  url: ../openapi/amazon-kinesis-data-streams-openapi.yml
  type: openapi
workflows:
- workflowId: put-records-and-read-back
  summary: Batch-put records, then read them back from the landing shard.
  description: >-
    Writes a batch of records with PutRecords, derives the shard from the first
    result entry, gets a TRIM_HORIZON iterator on that shard, and reads the
    records back.
  inputs:
    type: object
    required:
    - streamName
    - records
    properties:
      streamName:
        type: string
        description: The name of the Kinesis data stream to write to and read from.
      records:
        type: array
        description: >-
          The records to write, each an object with Data (Base64) and
          PartitionKey.
        items:
          type: object
          required:
          - Data
          - PartitionKey
          properties:
            Data:
              type: string
              description: The Base64-encoded data blob.
            PartitionKey:
              type: string
              description: The partition key for the record.
  steps:
  - stepId: putRecords
    description: >-
      Write the batch of records in a single call and capture the shard id of
      the first result entry to read from.
    operationId: PutRecords
    parameters:
    - name: X-Amz-Target
      in: header
      value: Kinesis_20131202.PutRecords
    requestBody:
      contentType: application/x-amz-json-1.1
      payload:
        StreamName: $inputs.streamName
        Records: $inputs.records
    successCriteria:
    - condition: $statusCode == 200
    outputs:
      failedRecordCount: $response.body#/FailedRecordCount
      firstShardId: $response.body#/Records/0/ShardId
      firstSequenceNumber: $response.body#/Records/0/SequenceNumber
  - stepId: getShardIterator
    description: >-
      Obtain a TRIM_HORIZON shard iterator for the shard the first record landed
      on, positioning the reader at the oldest record in the shard.
    operationId: GetShardIterator
    parameters:
    - name: X-Amz-Target
      in: header
      value: Kinesis_20131202.GetShardIterator
    requestBody:
      contentType: application/x-amz-json-1.1
      payload:
        StreamName: $inputs.streamName
        ShardId: $steps.putRecords.outputs.firstShardId
        ShardIteratorType: TRIM_HORIZON
    successCriteria:
    - condition: $statusCode == 200
    outputs:
      shardIterator: $response.body#/ShardIterator
  - stepId: getRecords
    description: >-
      Read the records back from the shard using the iterator, returning the
      retrieved records and the next iterator for continued reading.
    operationId: GetRecords
    parameters:
    - name: X-Amz-Target
      in: header
      value: Kinesis_20131202.GetRecords
    requestBody:
      contentType: application/x-amz-json-1.1
      payload:
        ShardIterator: $steps.getShardIterator.outputs.shardIterator
    successCriteria:
    - condition: $statusCode == 200
    outputs:
      records: $response.body#/Records
      nextShardIterator: $response.body#/NextShardIterator
      millisBehindLatest: $response.body#/MillisBehindLatest
  outputs:
    failedRecordCount: $steps.putRecords.outputs.failedRecordCount
    records: $steps.getRecords.outputs.records
    nextShardIterator: $steps.getRecords.outputs.nextShardIterator