Pipeline offsets and starting state

New Pipeline

For a new pipeline this will be either the earliest or latest point in the source stream.

The Decodable Web UI showing the choices for starting state for a new pipeline

If you select the latest then only new records that arrive on the stream after you start the pipeline will be processed. To process all existing records on the stream and new ones that arrive, select earliest.

A diagram showing the conceptual difference between earliest and latest starting state

Existing Pipeline

If a pipeline has previously been run, when you restart it you will have more options to control the starting state.

The Decodable Web UI showing the choices for starting state for an existing pipeline

Restore state from the latest checkpoint

When the pipeline was previously stopped its state was saved. This is known as the checkpoint. By default, the pipeline will restart from this point, restoring the saved state and continuing to process records from the source stream at the point at which it left off before being stopped.

A diagram showing a checkpoint in a pipeline

Discard state

You can also opt to discard this state when you restart the pipeline. This can be useful for reprocessing records, or skipping to the latest point of the source stream and ignoring records that haven’t been processed yet.

Discarding state means that you may skip—​or re-process—​records from the source stream.

Earliest or latest offset

You can use the same options as above to start the pipeline from the earliest or latest offsets.

A diagram showing discarding state and starting from earliest or latest offset

Snapshots

You also have the option of discarding state and starting from a given snapshot. Snapshots are user-controlled and can be created on demand and automatically on a schedule. Each snapshot has its own saved state from which the pipeline can restart.

A diagram showing snapshots and their state

Multiple Streams

If your pipeline has multiple source streams, you can specify a start position per stream.

To do this with the Decodable CLI use the --start-position flag per stream. For example, to start (also referred to as activate) pipeline id 123456 so that it processes stream orders from the latest point, and stream products from the earliest, run:

decodable pipeline activate 123456   \
  --start-position products=earliest \
  --start-position orders=latest

If the pipeline has already run previously, you will also need to add the --force argument.

You can also do this through the pipeline activation API call with the following request payload:

{
  "start_positions": {
    "orders": {
      "type": "TAG",
      "value": "latest"
    },
    "products": {
      "type": "TAG",
      "value": "earliest"
    }
  },
[…]
}

Custom Pipelines

When a new Custom pipeline starts, it’s responsible for determining within the code its own starting offsets. Depending on what the pipeline is doing, the concept of a starting offset may not even apply.

As a result, when starting a Custom pipeline that was previously stopped and checkpointed, the starting state options are slightly different.

The Decodable Web UI showing the choices for starting state for resuming a custom pipeline

As above, you can opt to resume from the checkpoint, discard state and restore a snapshot—​or discard state and start from whatever point the custom pipeline is configured to do within its code.