Prisma · Arazzo Workflow

Prisma Pulse Create and Resume a Named Stream

Version 1.0.0

Create a resumable named event stream, read its cursor position, and fetch the last persisted event by ULID.

1 workflow 1 source API 1 provider
View Spec View on GitHub ArazzoWorkflows

Provider

prisma

Workflows

create-and-resume-named-stream
Create a named stream, read its cursor, and fetch the last persisted event by ULID.
Creates a resumable named stream for a model, lists streams to read the saved cursor position, and fetches the cursor event when one is present.
3 steps inputs: apiKey, model, streamName outputs: cursor, resumeEventId, streamName
1
createStream
createStream
Create a resumable named event stream for the supplied model. Requires event persistence to be enabled on the environment.
2
readStreamCursor
listStreams
List the active streams and read the cursor position for the newly created stream, branching on whether a saved cursor event exists to resume from.
3
fetchCursorEvent
getEvent
Fetch the specific persisted change event identified by the stream cursor ULID to confirm the resume point.

Source API Descriptions

Arazzo Workflow Specification

prisma-pulse-named-stream-resume-workflow.yml Raw ↑
arazzo: 1.0.1
info:
  title: Prisma Pulse Create and Resume a Named Stream
  summary: Create a resumable named event stream, read its cursor position, and fetch the last persisted event by ULID.
  description: >-
    Sets up a resumable Prisma Pulse named event stream and inspects its
    resumability state. The workflow creates a named stream for a model, lists
    the active streams to read the stream's saved cursor (the ULID of the last
    processed event), and branches: when a cursor exists it fetches that
    specific persisted event by ID to confirm the resume point. The live
    consumeStreamEvents endpoint is a long-lived server-sent-events connection
    and is not directly executable as a single request, so this flow resolves
    the resume point via the cursor and the getEvent lookup instead. Every step
    spells out its request inline so the flow can be read and executed without
    opening the underlying OpenAPI description.
  version: 1.0.0
sourceDescriptions:
- name: pulseApi
  url: ../openapi/prisma-pulse-openapi.yml
  type: openapi
workflows:
- workflowId: create-and-resume-named-stream
  summary: Create a named stream, read its cursor, and fetch the last persisted event by ULID.
  description: >-
    Creates a resumable named stream for a model, lists streams to read the
    saved cursor position, and fetches the cursor event when one is present.
  inputs:
    type: object
    required:
    - apiKey
    - streamName
    - model
    properties:
      apiKey:
        type: string
        description: Pulse API key, sent as a Bearer token in the Authorization header.
      streamName:
        type: string
        description: Unique name for the resumable stream (e.g. all-user-events).
      model:
        type: string
        description: The Prisma model name to stream change events for (e.g. User).
  steps:
  - stepId: createStream
    description: >-
      Create a resumable named event stream for the supplied model. Requires
      event persistence to be enabled on the environment.
    operationId: createStream
    parameters:
    - name: Authorization
      in: header
      value: Bearer $inputs.apiKey
    requestBody:
      contentType: application/json
      payload:
        name: $inputs.streamName
        model: $inputs.model
    successCriteria:
    - condition: $statusCode == 201
    outputs:
      streamName: $response.body#/name
      status: $response.body#/status
      cursor: $response.body#/cursor
  - stepId: readStreamCursor
    description: >-
      List the active streams and read the cursor position for the newly
      created stream, branching on whether a saved cursor event exists to
      resume from.
    operationId: listStreams
    parameters:
    - name: Authorization
      in: header
      value: Bearer $inputs.apiKey
    successCriteria:
    - condition: $statusCode == 200
    outputs:
      cursor: $response.body#/data/0/cursor
    onSuccess:
    - name: cursorPresent
      type: goto
      stepId: fetchCursorEvent
      criteria:
      - context: $response.body
        condition: $.data[0].cursor != null
        type: jsonpath
    - name: noCursor
      type: end
      criteria:
      - context: $response.body
        condition: $.data[0].cursor == null
        type: jsonpath
  - stepId: fetchCursorEvent
    description: >-
      Fetch the specific persisted change event identified by the stream cursor
      ULID to confirm the resume point.
    operationId: getEvent
    parameters:
    - name: eventId
      in: path
      value: $steps.readStreamCursor.outputs.cursor
    - name: Authorization
      in: header
      value: Bearer $inputs.apiKey
    successCriteria:
    - condition: $statusCode == 200
    outputs:
      eventId: $response.body#/id
      action: $response.body#/action
  outputs:
    streamName: $steps.createStream.outputs.streamName
    cursor: $steps.readStreamCursor.outputs.cursor
    resumeEventId: $steps.fetchCursorEvent.outputs.eventId