Salesforce · Arazzo Workflow

Salesforce Bulk Insert Records

Version 1.0.0

Run the full Bulk API 2.0 insert lifecycle — create an ingest job, upload CSV, close, poll, and read successful results.

1 workflow 1 source API 1 provider
View Spec View on GitHub AIAnalyticsCloudCommerceCRMCustomer ServiceEnterpriseMarketingPlatformSalesArazzoWorkflows

Provider

salesforce

Workflows

bulk-insert-records
Bulk insert records into a Salesforce object via the Bulk API 2.0 ingest lifecycle.
Creates an insert ingest job, uploads CSV data, closes the job, polls until JobComplete, and returns the successful results CSV.
5 steps inputs: csvData, lineEnding, object outputs: jobId, numberRecordsFailed, numberRecordsProcessed, state, successfulResults
1
createJob
createIngestJob
Create a new Bulk API 2.0 ingest job configured for the insert operation against the target object. The job starts in the Open state.
2
uploadData
uploadJobData
Upload the CSV payload to the open ingest job. The first row must be a header of field API names matching the target object.
3
closeJob
updateIngestJob
Close the ingest job by setting its state to UploadComplete, which tells Salesforce to begin processing the uploaded data.
4
pollJob
getIngestJobInfo
Poll the ingest job state. When the job reaches JobComplete, continue to retrieve results; while still InProgress or UploadComplete, loop back to poll again; if the job Failed or Aborted, stop the workflow.
5
getResults
getSuccessfulResults
Retrieve the CSV of successfully processed records once the job has reached JobComplete. Each row includes the added sf__Id column.

Source API Descriptions

Arazzo Workflow Specification

salesforce-bulk-insert-records-workflow.yml Raw ↑
arazzo: 1.0.1
info:
  title: Salesforce Bulk Insert Records
  summary: Run the full Bulk API 2.0 insert lifecycle — create an ingest job, upload CSV, close, poll, and read successful results.
  description: >-
    The Bulk API 2.0 ingest lifecycle for loading many records at once. The
    workflow creates an insert ingest job for a target object, uploads the CSV
    payload to the job, closes the job to begin processing, polls the job state
    until it reaches JobComplete (branching to retry while the job is still
    InProgress and to fail if the job reports Failed or Aborted), and finally
    retrieves the CSV of successfully processed records. Every step spells out
    its request inline so the flow can be read and executed without opening the
    underlying OpenAPI description.
  version: 1.0.0
sourceDescriptions:
- name: salesforceBulkApi
  url: ../openapi/salesforce-bulk-api-2-openapi.yml
  type: openapi
workflows:
- workflowId: bulk-insert-records
  summary: Bulk insert records into a Salesforce object via the Bulk API 2.0 ingest lifecycle.
  description: >-
    Creates an insert ingest job, uploads CSV data, closes the job, polls until
    JobComplete, and returns the successful results CSV.
  inputs:
    type: object
    required:
    - object
    - csvData
    properties:
      object:
        type: string
        description: The API name of the SObject to insert records into (e.g. Account).
      csvData:
        type: string
        description: >-
          CSV payload with a header row of field API names and one record per
          subsequent row.
      lineEnding:
        type: string
        description: Line ending used in the CSV data (LF or CRLF). Defaults to LF.
  steps:
  - stepId: createJob
    description: >-
      Create a new Bulk API 2.0 ingest job configured for the insert operation
      against the target object. The job starts in the Open state.
    operationId: createIngestJob
    requestBody:
      contentType: application/json
      payload:
        operation: insert
        object: $inputs.object
        contentType: CSV
        lineEnding: $inputs.lineEnding
        columnDelimiter: COMMA
    successCriteria:
    - condition: $statusCode == 200
    outputs:
      jobId: $response.body#/id
      state: $response.body#/state
  - stepId: uploadData
    description: >-
      Upload the CSV payload to the open ingest job. The first row must be a
      header of field API names matching the target object.
    operationId: uploadJobData
    parameters:
    - name: jobId
      in: path
      value: $steps.createJob.outputs.jobId
    requestBody:
      contentType: text/csv
      payload: $inputs.csvData
    successCriteria:
    - condition: $statusCode == 201
  - stepId: closeJob
    description: >-
      Close the ingest job by setting its state to UploadComplete, which tells
      Salesforce to begin processing the uploaded data.
    operationId: updateIngestJob
    parameters:
    - name: jobId
      in: path
      value: $steps.createJob.outputs.jobId
    requestBody:
      contentType: application/json
      payload:
        state: UploadComplete
    successCriteria:
    - condition: $statusCode == 200
    outputs:
      state: $response.body#/state
  - stepId: pollJob
    description: >-
      Poll the ingest job state. When the job reaches JobComplete, continue to
      retrieve results; while still InProgress or UploadComplete, loop back to
      poll again; if the job Failed or Aborted, stop the workflow.
    operationId: getIngestJobInfo
    parameters:
    - name: jobId
      in: path
      value: $steps.createJob.outputs.jobId
    successCriteria:
    - condition: $statusCode == 200
    outputs:
      state: $response.body#/state
      numberRecordsProcessed: $response.body#/numberRecordsProcessed
      numberRecordsFailed: $response.body#/numberRecordsFailed
    onSuccess:
    - name: jobComplete
      type: goto
      stepId: getResults
      criteria:
      - condition: $response.body#/state == "JobComplete"
    - name: stillProcessing
      type: goto
      stepId: pollJob
      criteria:
      - condition: $response.body#/state == "InProgress"
    onFailure:
    - name: jobFailed
      type: end
      criteria:
      - condition: $response.body#/state == "Failed"
  - stepId: getResults
    description: >-
      Retrieve the CSV of successfully processed records once the job has
      reached JobComplete. Each row includes the added sf__Id column.
    operationId: getSuccessfulResults
    parameters:
    - name: jobId
      in: path
      value: $steps.createJob.outputs.jobId
    successCriteria:
    - condition: $statusCode == 200
    outputs:
      successfulResults: $response.body
  outputs:
    jobId: $steps.createJob.outputs.jobId
    state: $steps.pollJob.outputs.state
    numberRecordsProcessed: $steps.pollJob.outputs.numberRecordsProcessed
    numberRecordsFailed: $steps.pollJob.outputs.numberRecordsFailed
    successfulResults: $steps.getResults.outputs.successfulResults