Salesforce · Arazzo Workflow

Salesforce Bulk Upsert Records

Version 1.0.0

Run the full Bulk API 2.0 upsert lifecycle — create an upsert ingest job keyed on an external Id field, upload CSV, close, poll, and read successful results.

1 workflow 1 source API 1 provider
View Spec View on GitHub AIAnalyticsCloudCommerceCRMCustomer ServiceEnterpriseMarketingPlatformSalesArazzoWorkflows

Provider

salesforce

Workflows

bulk-upsert-records
Bulk upsert records into a Salesforce object by external Id via the Bulk API 2.0 ingest lifecycle.
Creates an upsert ingest job keyed on an external Id field, uploads CSV data, closes the job, polls until JobComplete, and returns the successful results CSV.
5 steps inputs: csvData, externalIdFieldName, lineEnding, object outputs: jobId, numberRecordsFailed, numberRecordsProcessed, state, successfulResults
1
createJob
createIngestJob
Create a new Bulk API 2.0 ingest job configured for the upsert operation against the target object, keyed on the supplied external Id field. The job starts in the Open state.
2
uploadData
uploadJobData
Upload the CSV payload to the open ingest job. The header row must include the external Id field used for matching.
3
closeJob
updateIngestJob
Close the ingest job by setting its state to UploadComplete, which tells Salesforce to begin processing the upserts.
4
pollJob
getIngestJobInfo
Poll the ingest job state. When the job reaches JobComplete, continue to retrieve results; while still InProgress or UploadComplete, loop back to poll again; if the job Failed or Aborted, stop the workflow.
5
getResults
getSuccessfulResults
Retrieve the CSV of successfully processed records once the job has reached JobComplete. Each row includes the added sf__Id and sf__Created columns.

Source API Descriptions

Arazzo Workflow Specification

salesforce-bulk-upsert-records-workflow.yml Raw ↑
arazzo: 1.0.1
info:
  title: Salesforce Bulk Upsert Records
  summary: Run the full Bulk API 2.0 upsert lifecycle — create an upsert ingest job keyed on an external Id field, upload CSV, close, poll, and read successful results.
  description: >-
    The Bulk API 2.0 ingest lifecycle for upserting many records at once,
    matching on an external Id field so each row is created or updated as
    needed. The workflow creates an upsert ingest job for a target object with
    the supplied externalIdFieldName, uploads the CSV payload, closes the job to
    begin processing, polls the job state until it reaches JobComplete
    (branching to retry while InProgress and to fail if the job reports Failed
    or Aborted), and finally retrieves the CSV of successfully processed
    records. Every step spells out its request inline so the flow can be read
    and executed without opening the underlying OpenAPI description.
  version: 1.0.0
sourceDescriptions:
- name: salesforceBulkApi
  url: ../openapi/salesforce-bulk-api-2-openapi.yml
  type: openapi
workflows:
- workflowId: bulk-upsert-records
  summary: Bulk upsert records into a Salesforce object by external Id via the Bulk API 2.0 ingest lifecycle.
  description: >-
    Creates an upsert ingest job keyed on an external Id field, uploads CSV
    data, closes the job, polls until JobComplete, and returns the successful
    results CSV.
  inputs:
    type: object
    required:
    - object
    - externalIdFieldName
    - csvData
    properties:
      object:
        type: string
        description: The API name of the SObject to upsert records into (e.g. Account).
      externalIdFieldName:
        type: string
        description: >-
          The API name of the external Id field used to match existing records
          for the upsert (e.g. External_Id__c).
      csvData:
        type: string
        description: >-
          CSV payload with a header row of field API names (including the
          external Id field) and one record per subsequent row.
      lineEnding:
        type: string
        description: Line ending used in the CSV data (LF or CRLF). Defaults to LF.
  steps:
  - stepId: createJob
    description: >-
      Create a new Bulk API 2.0 ingest job configured for the upsert operation
      against the target object, keyed on the supplied external Id field. The
      job starts in the Open state.
    operationId: createIngestJob
    requestBody:
      contentType: application/json
      payload:
        operation: upsert
        object: $inputs.object
        externalIdFieldName: $inputs.externalIdFieldName
        contentType: CSV
        lineEnding: $inputs.lineEnding
        columnDelimiter: COMMA
    successCriteria:
    - condition: $statusCode == 200
    outputs:
      jobId: $response.body#/id
      state: $response.body#/state
  - stepId: uploadData
    description: >-
      Upload the CSV payload to the open ingest job. The header row must include
      the external Id field used for matching.
    operationId: uploadJobData
    parameters:
    - name: jobId
      in: path
      value: $steps.createJob.outputs.jobId
    requestBody:
      contentType: text/csv
      payload: $inputs.csvData
    successCriteria:
    - condition: $statusCode == 201
  - stepId: closeJob
    description: >-
      Close the ingest job by setting its state to UploadComplete, which tells
      Salesforce to begin processing the upserts.
    operationId: updateIngestJob
    parameters:
    - name: jobId
      in: path
      value: $steps.createJob.outputs.jobId
    requestBody:
      contentType: application/json
      payload:
        state: UploadComplete
    successCriteria:
    - condition: $statusCode == 200
    outputs:
      state: $response.body#/state
  - stepId: pollJob
    description: >-
      Poll the ingest job state. When the job reaches JobComplete, continue to
      retrieve results; while still InProgress or UploadComplete, loop back to
      poll again; if the job Failed or Aborted, stop the workflow.
    operationId: getIngestJobInfo
    parameters:
    - name: jobId
      in: path
      value: $steps.createJob.outputs.jobId
    successCriteria:
    - condition: $statusCode == 200
    outputs:
      state: $response.body#/state
      numberRecordsProcessed: $response.body#/numberRecordsProcessed
      numberRecordsFailed: $response.body#/numberRecordsFailed
    onSuccess:
    - name: jobComplete
      type: goto
      stepId: getResults
      criteria:
      - condition: $response.body#/state == "JobComplete"
    - name: stillProcessing
      type: goto
      stepId: pollJob
      criteria:
      - condition: $response.body#/state == "InProgress"
    onFailure:
    - name: jobFailed
      type: end
      criteria:
      - condition: $response.body#/state == "Failed"
  - stepId: getResults
    description: >-
      Retrieve the CSV of successfully processed records once the job has
      reached JobComplete. Each row includes the added sf__Id and sf__Created
      columns.
    operationId: getSuccessfulResults
    parameters:
    - name: jobId
      in: path
      value: $steps.createJob.outputs.jobId
    successCriteria:
    - condition: $statusCode == 200
    outputs:
      successfulResults: $response.body
  outputs:
    jobId: $steps.createJob.outputs.jobId
    state: $steps.pollJob.outputs.state
    numberRecordsProcessed: $steps.pollJob.outputs.numberRecordsProcessed
    numberRecordsFailed: $steps.pollJob.outputs.numberRecordsFailed
    successfulResults: $steps.getResults.outputs.successfulResults