Amazon Neptune · Arazzo Workflow

Amazon Neptune Property Graph Stream Replay

Version 1.0.0

Read property graph change records from the start of the stream, then continue from the last event id.

1 workflow 1 source API 1 provider
View Spec View on GitHub DatabaseGraph DatabaseGremlinNeptuneProperty GraphRDFSPARQLArazzoWorkflows

Provider

amazon-neptune

Workflows

propertygraph-stream-replay
Read the earliest property graph stream records, then page forward from the cursor.
Reads property graph change records from TRIM_HORIZON, then continues after the returned last event id.
2 steps inputs: limit outputs: cursorCommitNum, firstBatchRecords, nextBatchRecords
1
readFromStart
getPropertyGraphStream
Read the earliest available property graph change-log records starting at TRIM_HORIZON and capture the last event id as a cursor.
2
continueFromCursor
getPropertyGraphStream
Continue reading the stream immediately after the captured position using AFTER_SEQUENCE_NUMBER with the prior commit and operation numbers.

Source API Descriptions

Arazzo Workflow Specification

amazon-neptune-propertygraph-stream-replay-workflow.yml Raw ↑
arazzo: 1.0.1
info:
  title: Amazon Neptune Property Graph Stream Replay
  summary: Read property graph change records from the start of the stream, then continue from the last event id.
  description: >-
    Consumes the Neptune property graph change stream in two chained reads. The
    first read starts at TRIM_HORIZON to fetch the earliest available change-log
    records and captures the last event id (commit and operation numbers). The
    second read continues immediately after that position using
    AFTER_SEQUENCE_NUMBER with the captured commit and op numbers, demonstrating
    cursor-based stream pagination. Every step spells out its request inline so
    the flow can be read and executed without opening the underlying OpenAPI
    description.
  version: 1.0.0
sourceDescriptions:
- name: neptuneStreamsApi
  url: ../openapi/amazon-neptune-streams-openapi.yml
  type: openapi
workflows:
- workflowId: propertygraph-stream-replay
  summary: Read the earliest property graph stream records, then page forward from the cursor.
  description: >-
    Reads property graph change records from TRIM_HORIZON, then continues after
    the returned last event id.
  inputs:
    type: object
    properties:
      limit:
        type: integer
        description: Maximum number of records to return per read (1-100000).
  steps:
  - stepId: readFromStart
    description: >-
      Read the earliest available property graph change-log records starting at
      TRIM_HORIZON and capture the last event id as a cursor.
    operationId: getPropertyGraphStream
    parameters:
    - name: iteratorType
      in: query
      value: TRIM_HORIZON
    - name: limit
      in: query
      value: $inputs.limit
    successCriteria:
    - condition: $statusCode == 200
    outputs:
      records: $response.body#/records
      lastCommitNum: $response.body#/lastEventId/commitNum
      lastOpNum: $response.body#/lastEventId/opNum
      totalRecords: $response.body#/totalRecords
  - stepId: continueFromCursor
    description: >-
      Continue reading the stream immediately after the captured position using
      AFTER_SEQUENCE_NUMBER with the prior commit and operation numbers.
    operationId: getPropertyGraphStream
    parameters:
    - name: iteratorType
      in: query
      value: AFTER_SEQUENCE_NUMBER
    - name: commitNum
      in: query
      value: $steps.readFromStart.outputs.lastCommitNum
    - name: opNum
      in: query
      value: $steps.readFromStart.outputs.lastOpNum
    - name: limit
      in: query
      value: $inputs.limit
    successCriteria:
    - condition: $statusCode == 200
    outputs:
      records: $response.body#/records
      lastCommitNum: $response.body#/lastEventId/commitNum
      totalRecords: $response.body#/totalRecords
  outputs:
    firstBatchRecords: $steps.readFromStart.outputs.records
    nextBatchRecords: $steps.continueFromCursor.outputs.records
    cursorCommitNum: $steps.continueFromCursor.outputs.lastCommitNum