Streams
A Stream carries records within Decodable and can be read or written to. It is conceptually similar to a "topic" in Kafka, or a "stream" in AWS Kinesis. Streams always have a schema that specifies its fields. Once a stream is defined, it can be used as an input or output by any number of pipelines or connections.
Batch Analogy
If you're coming from a batch processing background, you can think of a Decodable stream like a database table. A stream's records are like a table's rows, and its fields are like a table's columns.
Stream Types
There are two types of streams in Decodable.
Append - Each record in the stream is an independent event (eg: records from a click event stream). To create a stream with APPEND
type, define a schema without any key fields.
Change - Records in the stream can either be a new record, or a record that updates/deletes a previous record based on the key field defined in the schema (eg: database change logs). To create a stream with CHANGE
type, define a schema with a key field using --field key_field="string primary key"
.
Managing Streams
All streams have these common elements:
id (system-generated) - A unique id generated by the system to track this stream. See Concepts - Resource IDs for more information.
name (required) - A name to help you identify this stream which can be used in pipelines when specifying a source or sink.
Naming Convention
Stream names are typically short, all lowercase, and with words separated by underscores (
_
). Examples:http_raw
,http_event
.
schema (required) - A schema describing all records in the stream. See Schema below for more information.
description - A longer description of the stream. Use this to help you and your team know when and how to use this stream.
watermark - An expression specifying a field to use as event time and an optional interval to allow for late arriving data. The field must be of a supported type. To remove a watermark from an existing Stream, update the value to "", an empty String.
Examples:
"timestamp_field AS timestamp_field"
"timestamp_field AS timestamp_field - INTERVAL '0.001' SECOND"
"timestamp_field AS timestamp_field - INTERVAL '5' MINUTE"
Supported types for watermark fields
- TIMESTAMP(3)
- TIMESTAMP(2)
- TIMESTAMP(1)
- TIMESTAMP_LTZ(3)
- TIMESTAMP_LTZ(2)
- TIMESTAMP_LTZ(1)
Schema
Each Stream has a schema that defines the structure of the Stream's records as a set of fields โ each with a name and a data type. The schema that users manage is called logical schema.
Example Operations
Creating a stream with a single field
decodable stream create \
--name my_stream \
--description "raw data from prod Kafka" \
--field raw_data=STRING
# Created stream my_stream (2fc5dc5a)
Viewing the stream we just created
decodable stream get 2fc5dc5a
# my_stream
# id 2fc5dc5a
# description raw data from prod Kafka
# schema
# 0 raw_data STRING
# 1 new_field STRING
# create time 2021-11-03T20:28:48Z
# update time 2021-11-03T21:39:34Z
Updating the stream to add another field
decodable stream update 2fc5dc5a \
--field raw_data=STRING \
--field new_field=STRING
# Updated stream "2fc5dc5a"
Updating a schema
From the CLI or SDK, be sure to supply the entire new schema, not just the new fields!
Data Types
The following field types are supported.
Something missing?
If there's a specific type that's missing, let us know!
String Types
Type | Description |
---|---|
CHAR(n) | Fixed length character string. |
VARCHAR(n) | Variable length character string with a max length of n. |
STRING | Synonym for VARCHAR(2147483647). |
Binary Types
Type | Description |
---|---|
BINARY(n) | Fixed length binary string. |
VARBINARY(n) | Variable length binary string with a max length of n. |
BYTES | Synonym for VARBINARY(2147483647). |
Numeric Types
Type | Description |
---|---|
DECIMAL(p,s) | Exact numeric type with a fixed precision (number of digits) of p and a scale (number of digits to the right of the decimal) of s. p must be in the range of 1 to 31, inclusive. If omitted, it defaults to 10. s must be in the range of 0 to p, inclusive. If s is specified, p must also be specified. When omitted, it defaults to 0. |
DEC(p,s) | Synonym for DECIMAL(p,s). |
NUMERIC(p,s) | Synonym for DECIMAL(p,s). |
TINYINT | A 1 byte signed integer. |
SMALLINT | A 2 byte signed integer. |
INT | A 4 byte signed integer. |
BIGINT | An 8 byte signed integer. |
FLOAT | An approximate floating point integer. |
DOUBLE [PRECISION] | Synonym for FLOAT. |
Date and Time Types
Type | Description | |
---|---|---|
DATE | A date containing the year, month, and day. | |
TIME(p) | A time without a timezone. If specified, p indicates the precision of fractional seconds up to nanosecond-level precision. Does not support leap second adjustments. Values must be within the range of 00:00:00 to 23:59:59 . | |
TIMESTAMP(p) [[WITH\WITHOUT] TIME ZONE] | A date and time with or without a timezone. As with TIME, p specifies the fractional second precision, if it is provided. Values must be within the range of 0000-01-01 00:00:00 to 9999-12-31 23:59:59.999999999 . | |
TIMESTAMP_LTZ(p) | A synonym for TIMESTAMP(p) WITH LOCAL TIME ZONE . |
Compound Types
Type | Description |
---|---|
ARRAY<T> | An array of a type T, and a maximum size of 2^31-1. |
T ARRAY | Synonym for ARRAY<T>. |
MAP<K,V> | An associative array with keys of type K and values of type V. Both K and V support any type. Each unique key can only have one value. |
ROW<name type ['description'], ...> | A structure of named fields. name is the field name, type is its type, and description is an optional text description of the field. Field definitions can be repeated as many times as necessary. |
Other Types
Type | Description |
---|---|
BOOLEAN | A true / false value. |
INTERVAL | |
MULTISET |
Computed fields
In addition to the types above, a computed field may be used in a Stream schema.
Such a field's type is given as AS
, followed by an SQL expression. The field's actual data type is derived automatically from the given expression and does not have to be declared manually.
For example: AS 1 + 1
.
The SQL expression may include:
- Any operator or function in the Decodable function catalog;
- References to other fields declared in the same schema;
The Decodable service will normalize the expression, and will reject any malformed expression with an error.
One common use is to provide processing time: AS PROCTIME()
.
Note that Stream preview will show null
for computed fields, but the correct values will be provided correctly to consuming Pipelines and Connections.
Constraints
Fields may optionally provide one or more SQL constraints on fields.
Constraint | Description |
---|---|
NOT NULL | To mark a field as required. |
PRIMARY KEY | To indicate that a field is used to uniquely identify a row. This means that the stream created will have type=CHANGE . If a record in the stream has a key that appeared before, the key would be updated with the new value. Currently, only one field may be included in the primary key: composite keys are not currently supported. |
Additional constraints are coming soon!
This page contains copied contents from the Apache Flinkยฎ documentation.
See Credits page for details.
Physical Schema
In addition to logical schema, users can query the physical schema of a stream. The physical schema of an Append stream is the same as the the logical schema. The physical schema of a Change stream contains the Debezium schema wrapper as change streams are stored in the Debezium format internally. Physical schema is especially useful when to_append()
is used to allow a Change stream to be processed as an Append stream.
For example if a change stream has two fields:
schema
0 id INTEGER PRIMARY KEY
1 value STRING
The physical schema of the stream would be
schema
0 OP STRING
1 before ROW<id: INTEGER, value: STRING>
2 after ROW<id: INTEGER, value: STRING>
Updated 12 days ago