Amazon Kinesis · Arazzo Workflow

Amazon Kinesis Register Consumer and Confirm

Version 1.0.0

Register an enhanced fan-out consumer and poll until it becomes ACTIVE.

1 workflow 1 source API 1 provider
View Spec View on GitHub AnalyticsBig DataData ProcessingReal-TimeStreamingArazzoWorkflows

Provider

amazon-kinesis

Workflows

register-consumer-and-confirm
Register a stream consumer and poll until it is ACTIVE.
Registers a consumer by stream ARN and name, captures the consumer ARN, and polls DescribeStreamConsumer until the consumer status is ACTIVE.
2 steps inputs: consumerName, streamARN outputs: consumerARN, consumerStatus
1
registerConsumer
RegisterStreamConsumer
Register the enhanced fan-out consumer with the stream and capture its ARN. The consumer starts in CREATING status.
2
pollConsumer
DescribeStreamConsumer
Describe the consumer, looping while CREATING and ending once the consumer status becomes ACTIVE.

Source API Descriptions

Arazzo Workflow Specification

amazon-kinesis-register-consumer-and-confirm-workflow.yml Raw ↑
arazzo: 1.0.1
info:
  title: Amazon Kinesis Register Consumer and Confirm
  summary: Register an enhanced fan-out consumer and poll until it becomes ACTIVE.
  description: >-
    Registers an enhanced fan-out consumer against a stream and waits for it to
    become usable. RegisterStreamConsumer creates the consumer in CREATING
    status; the workflow then polls DescribeStreamConsumer, looping while the
    consumer is CREATING and ending once its status reports ACTIVE, at which
    point it can subscribe to shards. Each step inlines its AWS JSON protocol
    request, including the required X-Amz-Target header, so the flow can be read
    and executed without opening the underlying OpenAPI description.
  version: 1.0.0
sourceDescriptions:
- name: kinesisDataStreamsApi
  url: ../openapi/amazon-kinesis-data-streams-openapi.yml
  type: openapi
workflows:
- workflowId: register-consumer-and-confirm
  summary: Register a stream consumer and poll until it is ACTIVE.
  description: >-
    Registers a consumer by stream ARN and name, captures the consumer ARN, and
    polls DescribeStreamConsumer until the consumer status is ACTIVE.
  inputs:
    type: object
    required:
    - streamARN
    - consumerName
    properties:
      streamARN:
        type: string
        description: The ARN of the data stream to register the consumer against.
      consumerName:
        type: string
        description: A name for the consumer, unique within the data stream.
  steps:
  - stepId: registerConsumer
    description: >-
      Register the enhanced fan-out consumer with the stream and capture its
      ARN. The consumer starts in CREATING status.
    operationId: RegisterStreamConsumer
    parameters:
    - name: X-Amz-Target
      in: header
      value: Kinesis_20131202.RegisterStreamConsumer
    requestBody:
      contentType: application/x-amz-json-1.1
      payload:
        StreamARN: $inputs.streamARN
        ConsumerName: $inputs.consumerName
    successCriteria:
    - condition: $statusCode == 200
    outputs:
      consumerARN: $response.body#/Consumer/ConsumerARN
      consumerStatus: $response.body#/Consumer/ConsumerStatus
  - stepId: pollConsumer
    description: >-
      Describe the consumer, looping while CREATING and ending once the consumer
      status becomes ACTIVE.
    operationId: DescribeStreamConsumer
    parameters:
    - name: X-Amz-Target
      in: header
      value: Kinesis_20131202.DescribeStreamConsumer
    requestBody:
      contentType: application/x-amz-json-1.1
      payload:
        ConsumerARN: $steps.registerConsumer.outputs.consumerARN
    successCriteria:
    - condition: $statusCode == 200
    outputs:
      consumerStatus: $response.body#/ConsumerDescription/ConsumerStatus
      consumerARN: $response.body#/ConsumerDescription/ConsumerARN
    onSuccess:
    - name: stillCreating
      type: goto
      stepId: pollConsumer
      criteria:
      - context: $response.body
        condition: $.ConsumerDescription.ConsumerStatus == "CREATING"
        type: jsonpath
    - name: nowActive
      type: end
      criteria:
      - context: $response.body
        condition: $.ConsumerDescription.ConsumerStatus == "ACTIVE"
        type: jsonpath
  outputs:
    consumerARN: $steps.pollConsumer.outputs.consumerARN
    consumerStatus: $steps.pollConsumer.outputs.consumerStatus