A stream is a sequence of data records that are stored in a distributed manner and processed in real-time by Decodable. Streams can be read or written to, and they always have a schema that specifies its fields. Once a stream is defined, it can be used as an input or output by any number of pipelines or connections. If you are familiar with Apache Kafka or Amazon Kinesis, a stream in Decodable is conceptually similar to a "stream" in Kinesis or a "topic" in Apache Kafka.

Stream Types

The following table describes the different types of streams that Decodable supports.

Stream typeDescription
AppendA type of data stream in which new records are continuously added to the end of the stream. In other words, an append stream is a stream that is only ever appended to and each individual record in a change stream is an independent event.

Partitions
When data is written to an append stream, it is split into multiple partitions based on the partition key specified by the user. Decodable ensures that records with the same partition key are stored in the same partition and processed by the same task. To specify a partition key, identify one or more existing physical fields in the schema. If you do not specify any fields to use as the partition key, then records are distributed randomly over all stream partitions.

To create a stream with the APPEND type, define a schema without specifying a primary key.
ChangeA type of data stream that captures changes to a database in real-time. Records in a change stream represent a change made to a database, such as updates, inserts, and deletes. A record in a change stream is called a Change Data Capture (CDC) record.

To create a stream with the CHANGE type, you must specify one or more fields in the stream's schema to use as the primary key. A primary key is one or more field(s) that contain a value that can be used to uniquely identify each row in a table. Decodable also supports a composite primary key, which is when you specify more than one field to use as a primary key that, when combined, can be used to uniquely identify each row in the table.

You can specify a primary key in the Decodable web interface in the Stream Schema Definition view by entering the name of the fields in the Primary Key Fields box. Any fields that you enter in the Primary Key Fields box must explicitly have a type that is not null. For example, if you wanted to make the string and int fields named ProductID and OrderID the primary keys, then enter string not null and int not null as the Type and then enter ProductID and OrderID in the Primary Key Fields field.

For an example on how to specify a primary key in the Decodable CLI, see Creating a change stream.

You can also convert an append stream to a change stream and vice versa with the to_change() and to_append() functions. See Process a Change stream as an Append stream and Process an Append stream as a Change stream for an example.

Managing Streams

All streams have the following attributes.

ElementDescription
idA system-generated unique ID used to track this stream.
nameA unique name for the stream. Names can only contain letters, numbers, dashes and underscores.
schemaThe structure of the record as a set of fields. It describes the fields that are present in the record and the data type for those fields.
descriptionOptional. A description of your stream.
watermarkAn expression specifying the field that contains the record's event time. You can also include an optional interval to allow for late arriving data. The field must be one of the following types: TIMESTAMP(3), TIMESTAMP(2), TIMESTAMP(1), TIMESTAMP_LTZ(3), TIMESTAMP_LTZ(2), or TIMESTAMP_LTZ(1).

If you want to remove a watermark from an existing stream, update the value to "", an empty string.

For example:
- "timestamp_field AS timestamp_field"
- "timestamp_field AS timestamp_field INTERVAL '0.001' SECOND"
- "timestamp_field AS timestamp_field - INTERVAL '5' MINUTE"

Updating Streams

You can update a stream's schema or change the partition or primary key used in the stream from the Streams page in Decodable Web or using the Decodable CLI. Regardless of which tool you use to update the stream, there are two general workflows you can use depending on whether you want to prioritize convenience or correctness.

Note: Updating an actively used stream affects the pipelines and connections that the stream is connected to. Depending on the change you are making to the stream, attached connections and pipelines can break and become incompatible.

If you want to update a stream with convenience prioritized, then do the following steps.

  1. Stop any connections or pipelines that are attached to the stream that you want to update.
  2. Clear any records that are either in the stream that you want to update or in a stream that is connected to it. In Decodable Web, select the dropdown menu (...) and then select Clear.
    This image shows how to navigate to the Clear button to clear a stream.
  3. Make the desired updates to the stream.
  4. Restart any connections and pipelines that you stopped in Step 1. If you are using the Decodable CLI, make sure the --force flag is set. If you are using Decodable Web, then make sure Discard State is selected.

The second workflow, which prioritizes correctness, involves recreating the attached connections, pipelines, and streams from scratch. Best practices are to do this second workflow if you want to change a stream's partition key. Do the following steps.

  1. Stop any connections or pipelines that are attached to the stream that you want to update.
  2. Clone all of the resources.
    1. Clone the streams with a new name. Make the changes that you wanted to make in the new stream.
    2. Clone the connections. Make sure that the cloned connections are attached to the newly cloned stream.
    3. Clone the pipelines. You must edit the cloned pipelines so that their SQL references the newly cloned stream name.
  3. Delete the older connection(s), pipeline(s), and stream(s). This is an optional clean-up step.

Schema

Each Stream has a schema that defines the structure of the Stream's records as a set of fields — each with a name and a data type. The schema that users manage is called logical schema.

Example Operations

Creating a change stream

decodable stream create \
  --name my_change_stream \
  --description "transactions from MYSQL database" \
  --field "customerID=int not null" --primary-key "customerID" \
  --field "productID=string not null" --primary-key "productID" \
  --field "transaction=int" 

# Created stream my_stream (2cd1d83a)

Creating an append stream with a single field

decodable stream create \
  --name my_stream \
  --description "raw data from prod Kafka" \
  --field raw_data=STRING
# Created stream my_stream (2fc5dc5a)

Viewing the append stream we created

decodable stream get 2fc5dc5a
# my_stream
#   id                       2fc5dc5a
#   description              raw data from prod Kafka
#   type                     APPEND
#   fields
#     0  raw_data            STRING
#   partition key fields     -
#   primary key fields       -
#   watermarks               -
#   properties
#     partition.count        10
#     properties.compression.type zstd
#   create time              2023-05-01T19:19:45Z
#   update time              2023-05-01T19:19:45Z

Updating the append stream to add another field

decodable stream update 2fc5dc5a \
  --field raw_data=STRING  \
  --field new_field=STRING
# Updated stream "2fc5dc5a"

🚧

Updating a schema

From the CLI or SDK, be sure to supply the entire new schema, not just the new fields!

Data Types

The following field types are supported.

String Types

TypeDescription
CHAR(n)Fixed length character string.
VARCHAR(n)Variable length character string with a max length of n.
STRINGSynonym for VARCHAR(2147483647).

Binary Types

TypeDescription
BINARY(n)Fixed length binary string.
VARBINARY(n)Variable length binary string with a max length of n.
BYTESSynonym for VARBINARY(2147483647).

Numeric Types

TypeDescription
DECIMAL(p,s)Exact numeric type with a fixed precision (number of digits) of p and a scale (number of digits to the right of the decimal) of s.

p must be in the range of 1 to 31, inclusive. If omitted, it defaults to 10.

s must be in the range of 0 to p, inclusive. If s is specified, p must also be specified. When omitted, it defaults to 0.
DEC(p,s)Synonym for DECIMAL(p,s).
NUMERIC(p,s)Synonym for DECIMAL(p,s).
TINYINTA 1 byte signed integer.
SMALLINTA 2 byte signed integer.
INTA 4 byte signed integer.
BIGINTAn 8 byte signed integer.
FLOATAn approximate floating point integer.
DOUBLE parameters]Synonym for FLOAT.

Date and Time Types

TypeDescription
DATEA date containing the year, month, and day.
TIME(p)A time without a timezone. If specified, p indicates the precision of fractional seconds up to nanosecond-level precision. Does not support leap second adjustments. Values must be within the range of 00:00:00 to 23:59:59.
TIMESTAMP(p) [[WITH\WITHOUT] TIME ZONE]A date and time with or without a timezone. As with TIME, p specifies the fractional second precision, if it is provided. Values must be within the range of 0000-01-01 00:00:00 to 9999-12-31 23:59:59.999999999.
TIMESTAMP_LTZ(p)A synonym for TIMESTAMP(p) WITH LOCAL TIME ZONE.

Compound Types

TypeDescription
ARRAY<T>An array of a type T, and a maximum size of 2^31-1.
T ARRAYSynonym for ARRAY<T>.
MAP<K,V>An associative array with keys of type K and values of type V. Both K and V support any type. Each unique key can only have one value.
ROW<name type ['description'], ...>A structure of named fields. name is the field name, type is its type, and description is an optional text description of the field. Field definitions can be repeated as many times as necessary.

Other Types

TypeDescription
BOOLEANA true / false value.
INTERVAL
MULTISET

Computed fields

In addition to the types above, a computed field may be used in a Stream schema.

Such a field's type is given as AS, followed by an SQL expression. The field's actual data type is derived automatically from the given expression and does not have to be declared manually.

For example: AS 1 + 1.

The SQL expression may include:

The Decodable service will normalize the expression, and will reject any malformed expression with an error.

One common use is to provide processing time: AS PROCTIME().

Note that Stream preview will show null for computed fields, but the correct values will be provided correctly to consuming Pipelines and Connections.

Constraints

Fields may optionally provide one or more SQL constraints on fields.

ConstraintDescription
NOT NULLTo mark a field as required.
PRIMARY KEYA field that contains a value that can be used to unique identify each row in a table. When working with change streams, Decodable uses the primary key in order to interpret which modification, based on the op field, to perform.

Additional constraints are coming soon!

📘

This page contains copied contents from the Apache Flink® documentation.

See Credits page for details.

Physical Schema

In addition to logical schema, users can query the physical schema of a stream. The physical schema of an Append stream is the same as the the logical schema. The physical schema of a Change stream contains the Debezium schema wrapper as change streams are stored in the Debezium format internally. Physical schema is especially useful when to_append() is used to allow a Change stream to be processed as an Append stream.

For example if a change stream has two fields:

fields
    0  id                  INTEGER NOT NULL
    1  value               STRING
  partition key fields     -
  primary key fields
    0                      id

The physical schema of the stream would be

schema
   0  OP              STRING
   1  before          ROW<id: INTEGER, value: STRING>
   2  after           ROW<id: INTEGER, value: STRING>

What’s Next