Streams
A stream is a sequence of data records that are stored in a distributed manner and processed in real-time by Decodable. Streams can be read or written to, and they always have a schema that specifies its fields. Once a stream is defined, it can be used as an input or output by any number of pipelines or connections. If you are familiar with Apache Kafka or Amazon Kinesis, a stream in Decodable is conceptually similar to a "stream" in Kinesis or a "topic" in Apache Kafka.
Stream Types
The following table describes the different types of streams that Decodable supports.
Stream type | Description |
---|---|
Append | A type of data stream in which new records are continuously added to the end of the stream. In other words, an append stream is a stream that is only ever appended to and each individual record in a change stream is an independent event. Partitions When data is written to an append stream, it is split into multiple partitions based on the partition key specified by the user. Decodable ensures that records with the same partition key are stored in the same partition and processed by the same task. To specify a partition key, identify one or more existing physical fields in the schema. If you do not specify any fields to use as the partition key, then records are distributed randomly over all stream partitions. To create a stream with the APPEND type, define a schema without specifying a primary key. |
Change | A type of data stream that captures changes to a database in real-time. Records in a change stream represent a change made to a database, such as updates, inserts, and deletes. A record in a change stream is called a Change Data Capture (CDC) record. To create a stream with the CHANGE type, you must specify one or more fields in the stream's schema to use as the primary key. A primary key is one or more field(s) that contain a value that can be used to uniquely identify each row in a table. Decodable also supports a composite primary key, which is when you specify more than one field to use as a primary key that, when combined, can be used to uniquely identify each row in the table.You can specify a primary key in the Decodable web interface in the Stream Schema Definition view by entering the name of the fields in the Primary Key Fields box. Any fields that you enter in the Primary Key Fields box must explicitly have a type that is not null . For example, if you wanted to make the string and int fields named ProductID and OrderID the primary keys, then enter string not null and int not null as the Type and then enter ProductID and OrderID in the Primary Key Fields field.For an example on how to specify a primary key in the Decodable CLI, see Creating a change stream. |
You can also convert an append stream to a change stream and vice versa with the to_change()
and to_append()
functions. See Process a Change stream as an Append stream and Process an Append stream as a Change stream for an example.
Managing Streams
All streams have the following attributes.
Element | Description |
---|---|
id | A system-generated unique ID used to track this stream. |
name | A unique name for the stream. Names can only contain letters, numbers, dashes and underscores. |
schema | The structure of the record as a set of fields. It describes the fields that are present in the record and the data type for those fields. |
description | Optional. A description of your stream. |
watermark | An expression specifying the field that contains the record's event time. You can also include an optional interval to allow for late arriving data. The field must be one of the following types: TIMESTAMP(3) , TIMESTAMP(2) , TIMESTAMP(1) , TIMESTAMP_LTZ(3) , TIMESTAMP_LTZ(2) , or TIMESTAMP_LTZ(1) .If you want to remove a watermark from an existing stream, update the value to "" , an empty string.For example: - "timestamp_field AS timestamp_field" - "timestamp_field AS timestamp_field INTERVAL '0.001' SECOND" - "timestamp_field AS timestamp_field - INTERVAL '5' MINUTE" |
Updating Streams
You can update a stream's schema or change the partition or primary key used in the stream from the Streams page in Decodable Web or using the Decodable CLI. Regardless of which tool you use to update the stream, there are two general workflows you can use depending on whether you want to prioritize convenience or correctness.
Note: Updating an actively used stream affects the pipelines and connections that the stream is connected to. Depending on the change you are making to the stream, attached connections and pipelines can break and become incompatible.
If you want to update a stream with convenience prioritized, then do the following steps.
- Stop any connections or pipelines that are attached to the stream that you want to update.
- Clear any records that are either in the stream that you want to update or in a stream that is connected to it. In Decodable Web, select the dropdown menu (...) and then select Clear.
- Make the desired updates to the stream.
- Restart any connections and pipelines that you stopped in Step 1. If you are using the Decodable CLI, make sure the
--force
flag is set. If you are using Decodable Web, then make sure Discard State is selected.
The second workflow, which prioritizes correctness, involves recreating the attached connections, pipelines, and streams from scratch. Best practices are to do this second workflow if you want to change a stream's partition key. Do the following steps.
- Stop any connections or pipelines that are attached to the stream that you want to update.
- Clone all of the resources.
- Clone the streams with a new name. Make the changes that you wanted to make in the new stream.
- Clone the connections. Make sure that the cloned connections are attached to the newly cloned stream.
- Clone the pipelines. You must edit the cloned pipelines so that their SQL references the newly cloned stream name.
- Delete the older connection(s), pipeline(s), and stream(s). This is an optional clean-up step.
Schema
Each Stream has a schema that defines the structure of the Stream's records as a set of fields — each with a name and a data type. The schema that users manage is called logical schema.
Example Operations
Creating a change stream
decodable stream create \
--name my_change_stream \
--description "transactions from MYSQL database" \
--field "customerID=int not null" --primary-key "customerID" \
--field "productID=string not null" --primary-key "productID" \
--field "transaction=int"
# Created stream my_stream (2cd1d83a)
Creating an append stream with a single field
decodable stream create \
--name my_stream \
--description "raw data from prod Kafka" \
--field raw_data=STRING
# Created stream my_stream (2fc5dc5a)
Viewing the append stream we created
decodable stream get 2fc5dc5a
# my_stream
# id 2fc5dc5a
# description raw data from prod Kafka
# type APPEND
# fields
# 0 raw_data STRING
# partition key fields -
# primary key fields -
# watermarks -
# properties
# partition.count 10
# properties.compression.type zstd
# create time 2023-05-01T19:19:45Z
# update time 2023-05-01T19:19:45Z
Updating the append stream to add another field
decodable stream update 2fc5dc5a \
--field raw_data=STRING \
--field new_field=STRING
# Updated stream "2fc5dc5a"
Updating a schema
From the CLI or SDK, be sure to supply the entire new schema, not just the new fields!
Data Types
The following field types are supported.
String Types
Type | Description |
---|---|
CHAR(n) | Fixed length character string. |
VARCHAR(n) | Variable length character string with a max length of n. |
STRING | Synonym for VARCHAR(2147483647). |
Binary Types
Type | Description |
---|---|
BINARY(n) | Fixed length binary string. |
VARBINARY(n) | Variable length binary string with a max length of n. |
BYTES | Synonym for VARBINARY(2147483647). |
Numeric Types
Type | Description |
---|---|
DECIMAL(p,s) | Exact numeric type with a fixed precision (number of digits) of p and a scale (number of digits to the right of the decimal) of s. p must be in the range of 1 to 31, inclusive. If omitted, it defaults to 10. s must be in the range of 0 to p, inclusive. If s is specified, p must also be specified. When omitted, it defaults to 0. |
DEC(p,s) | Synonym for DECIMAL(p,s). |
NUMERIC(p,s) | Synonym for DECIMAL(p,s). |
TINYINT | A 1 byte signed integer. |
SMALLINT | A 2 byte signed integer. |
INT | A 4 byte signed integer. |
BIGINT | An 8 byte signed integer. |
FLOAT | An approximate floating point integer. |
DOUBLE parameters] | Synonym for FLOAT. |
Date and Time Types
Type | Description | |
---|---|---|
DATE | A date containing the year, month, and day. | |
TIME(p) | A time without a timezone. If specified, p indicates the precision of fractional seconds up to nanosecond-level precision. Does not support leap second adjustments. Values must be within the range of 00:00:00 to 23:59:59 . | |
TIMESTAMP(p) [[WITH\WITHOUT] TIME ZONE] | A date and time with or without a timezone. As with TIME, p specifies the fractional second precision, if it is provided. Values must be within the range of 0000-01-01 00:00:00 to 9999-12-31 23:59:59.999999999 . | |
TIMESTAMP_LTZ(p) | A synonym for TIMESTAMP(p) WITH LOCAL TIME ZONE . |
Compound Types
Type | Description |
---|---|
ARRAY<T> | An array of a type T, and a maximum size of 2^31-1. |
T ARRAY | Synonym for ARRAY<T>. |
MAP<K,V> | An associative array with keys of type K and values of type V. Both K and V support any type. Each unique key can only have one value. |
ROW<name type ['description'], ...> | A structure of named fields. name is the field name, type is its type, and description is an optional text description of the field. Field definitions can be repeated as many times as necessary. |
Other Types
Type | Description |
---|---|
BOOLEAN | A true / false value. |
INTERVAL | |
MULTISET |
Computed fields
In addition to the types above, a computed field may be used in a Stream schema.
Such a field's type is given as AS
, followed by an SQL expression. The field's actual data type is derived automatically from the given expression and does not have to be declared manually.
For example: AS 1 + 1
.
The SQL expression may include:
- Any operator or function in the Decodable function catalog;
- References to other fields declared in the same schema;
The Decodable service will normalize the expression, and will reject any malformed expression with an error.
One common use is to provide processing time: AS PROCTIME()
.
Note that Stream preview will show null
for computed fields, but the correct values will be provided correctly to consuming Pipelines and Connections.
Constraints
Fields may optionally provide one or more SQL constraints on fields.
Constraint | Description |
---|---|
NOT NULL | To mark a field as required. |
PRIMARY KEY | A field that contains a value that can be used to unique identify each row in a table. When working with change streams, Decodable uses the primary key in order to interpret which modification, based on the op field, to perform. |
Additional constraints are coming soon!
This page contains copied contents from the Apache Flink® documentation.
See Credits page for details.
Physical Schema
In addition to logical schema, users can query the physical schema of a stream. The physical schema of an Append stream is the same as the the logical schema. The physical schema of a Change stream contains the Debezium schema wrapper as change streams are stored in the Debezium format internally. Physical schema is especially useful when to_append()
is used to allow a Change stream to be processed as an Append stream.
For example if a change stream has two fields:
fields
0 id INTEGER NOT NULL
1 value STRING
partition key fields -
primary key fields
0 id
The physical schema of the stream would be
schema
0 OP STRING
1 before ROW<id: INTEGER, value: STRING>
2 after ROW<id: INTEGER, value: STRING>
Updated 7 days ago