Amazon Kinesis · Arazzo Workflow

Amazon Kinesis List Shards and Read From First

Version 1.0.0

List a stream's shards, then read from the first shard from the trim horizon.

1 workflow 1 source API 1 provider
View Spec View on GitHub AnalyticsBig DataData ProcessingReal-TimeStreamingArazzoWorkflows

Provider

amazon-kinesis

Workflows

list-shards-and-read-from-first
List shards, then read from the first shard at the trim horizon.
Lists the shards in the stream, takes the first shard id, gets a TRIM_HORIZON iterator for it, and reads the records from the start of the shard.
3 steps inputs: streamName outputs: nextShardIterator, records, shards
1
listShards
ListShards
List the shards in the stream and capture the id of the first shard to read from.
2
getShardIterator
GetShardIterator
Get a TRIM_HORIZON iterator for the first shard, positioning the reader at the oldest untrimmed record in that shard.
3
getRecords
GetRecords
Read records from the first shard using the iterator, returning the records and the next iterator for continued reading.

Source API Descriptions

Arazzo Workflow Specification

amazon-kinesis-list-shards-and-read-from-first-workflow.yml Raw ↑
arazzo: 1.0.1
info:
  title: Amazon Kinesis List Shards and Read From First
  summary: List a stream's shards, then read from the first shard from the trim horizon.
  description: >-
    Discovers the shard topology of a stream and begins consuming from it.
    ListShards returns the shards that make up the stream; the workflow takes
    the first shard, requests a TRIM_HORIZON iterator for it with
    GetShardIterator, and reads the oldest available records with GetRecords.
    This is the standard entry point for a consumer that does not yet know the
    shard map. Each step inlines its AWS JSON protocol request, including the
    required X-Amz-Target header, so the flow can be read and executed without
    opening the underlying OpenAPI description.
  version: 1.0.0
sourceDescriptions:
- name: kinesisDataStreamsApi
  url: ../openapi/amazon-kinesis-data-streams-openapi.yml
  type: openapi
workflows:
- workflowId: list-shards-and-read-from-first
  summary: List shards, then read from the first shard at the trim horizon.
  description: >-
    Lists the shards in the stream, takes the first shard id, gets a
    TRIM_HORIZON iterator for it, and reads the records from the start of the
    shard.
  inputs:
    type: object
    required:
    - streamName
    properties:
      streamName:
        type: string
        description: The name of the Kinesis data stream to inspect and read from.
  steps:
  - stepId: listShards
    description: >-
      List the shards in the stream and capture the id of the first shard to
      read from.
    operationId: ListShards
    parameters:
    - name: X-Amz-Target
      in: header
      value: Kinesis_20131202.ListShards
    requestBody:
      contentType: application/x-amz-json-1.1
      payload:
        StreamName: $inputs.streamName
    successCriteria:
    - condition: $statusCode == 200
    outputs:
      shards: $response.body#/Shards
      firstShardId: $response.body#/Shards/0/ShardId
  - stepId: getShardIterator
    description: >-
      Get a TRIM_HORIZON iterator for the first shard, positioning the reader at
      the oldest untrimmed record in that shard.
    operationId: GetShardIterator
    parameters:
    - name: X-Amz-Target
      in: header
      value: Kinesis_20131202.GetShardIterator
    requestBody:
      contentType: application/x-amz-json-1.1
      payload:
        StreamName: $inputs.streamName
        ShardId: $steps.listShards.outputs.firstShardId
        ShardIteratorType: TRIM_HORIZON
    successCriteria:
    - condition: $statusCode == 200
    outputs:
      shardIterator: $response.body#/ShardIterator
  - stepId: getRecords
    description: >-
      Read records from the first shard using the iterator, returning the
      records and the next iterator for continued reading.
    operationId: GetRecords
    parameters:
    - name: X-Amz-Target
      in: header
      value: Kinesis_20131202.GetRecords
    requestBody:
      contentType: application/x-amz-json-1.1
      payload:
        ShardIterator: $steps.getShardIterator.outputs.shardIterator
    successCriteria:
    - condition: $statusCode == 200
    outputs:
      records: $response.body#/Records
      nextShardIterator: $response.body#/NextShardIterator
      millisBehindLatest: $response.body#/MillisBehindLatest
  outputs:
    shards: $steps.listShards.outputs.shards
    records: $steps.getRecords.outputs.records
    nextShardIterator: $steps.getRecords.outputs.nextShardIterator