A Stream carries records within Decodable and can be read or written to. It is conceptually similar to a "topic" in Kafka, or a "stream" in AWS Kinesis. Streams always have a schema that specifies its fields. Once a stream is defined, it can be used as an input or output by any number of pipelines or connections.

๐Ÿ‘

Batch Analogy

If you're coming from a batch processing background, you can think of a Decodable stream like a database table. A stream's records are like a table's rows, and its fields are like a table's columns.

Stream Types

There are two types of streams in Decodable.
Append - Each record in the stream is an independent event (eg: records from a click event stream). To create a stream with APPEND type, define a schema without any key fields.
Change - Records in the stream can either be a new record, or a record that updates/deletes a previous record based on the key field defined in the schema (eg: database change logs). To create a stream with CHANGE type, define a schema with a key field using --field key_field="string primary key".

Managing Streams

All streams have these common elements:

id (system-generated) - A unique id generated by the system to track this stream. See Concepts - Resource IDs for more information.

name (required) - A name to help you identify this stream which can be used in pipelines when specifying a source or sink.

๐Ÿ“˜

Naming Convention

Stream names are typically short, all lowercase, and with words separated by underscores (_). Examples: http_raw, http_event.

schema (required) - A schema describing all records in the stream. See Schema below for more information.

description - A longer description of the stream. Use this to help you and your team know when and how to use this stream.

watermark - An expression specifying a field to use as event time and an optional interval to allow for late arriving data. The field must be of a supported type. To remove a watermark from an existing Stream, update the value to "", an empty String.

Examples:

  • "timestamp_field AS timestamp_field"
  • "timestamp_field AS timestamp_field - INTERVAL '0.001' SECOND"
  • "timestamp_field AS timestamp_field - INTERVAL '5' MINUTE"

๐Ÿ‘

Supported types for watermark fields

  • TIMESTAMP(3)
  • TIMESTAMP(2)
  • TIMESTAMP(1)
  • TIMESTAMP_LTZ(3)
  • TIMESTAMP_LTZ(2)
  • TIMESTAMP_LTZ(1)

Schema

Each Stream has a schema that defines the structure of the Stream's records as a set of fields โ€” each with a name and a data type. The schema that users manage is called logical schema.

Example Operations

Creating a stream with a single field

decodable stream create \
  --name my_stream \
  --description "raw data from prod Kafka" \
  --field raw_data=STRING
# Created stream my_stream (2fc5dc5a)

Viewing the stream we just created

decodable stream get 2fc5dc5a
# my_stream
#   id                       2fc5dc5a
#   description              raw data from prod Kafka
#   schema
#     0  raw_data              STRING
#     1  new_field             STRING
#   create time              2021-11-03T20:28:48Z
#   update time              2021-11-03T21:39:34Z

Updating the stream to add another field

decodable stream update 2fc5dc5a \
  --field raw_data=STRING  \
  --field new_field=STRING
# Updated stream "2fc5dc5a"

๐Ÿšง

Updating a schema

From the CLI or SDK, be sure to supply the entire new schema, not just the new fields!

Data Types

The following field types are supported.

๐Ÿ“˜

Something missing?

If there's a specific type that's missing, let us know!

String Types

TypeDescription
CHAR(n)Fixed length character string.
VARCHAR(n)Variable length character string with a max length of n.
STRINGSynonym for VARCHAR(2147483647).

Binary Types

TypeDescription
BINARY(n)Fixed length binary string.
VARBINARY(n)Variable length binary string with a max length of n.
BYTESSynonym for VARBINARY(2147483647).

Numeric Types

TypeDescription
DECIMAL(p,s)Exact numeric type with a fixed precision (number of digits) of p and a scale (number of digits to the right of the decimal) of s.

p must be in the range of 1 to 31, inclusive. If omitted, it defaults to 10.

s must be in the range of 0 to p, inclusive. If s is specified, p must also be specified. When omitted, it defaults to 0.
DEC(p,s)Synonym for DECIMAL(p,s).
NUMERIC(p,s)Synonym for DECIMAL(p,s).
TINYINTA 1 byte signed integer.
SMALLINTA 2 byte signed integer.
INTA 4 byte signed integer.
BIGINTAn 8 byte signed integer.
FLOATAn approximate floating point integer.
DOUBLE [PRECISION]Synonym for FLOAT.

Date and Time Types

TypeDescription
DATEA date containing the year, month, and day.
TIME(p)A time without a timezone. If specified, p indicates the precision of fractional seconds up to nanosecond-level precision. Does not support leap second adjustments. Values must be within the range of 00:00:00 to 23:59:59.
TIMESTAMP(p) [[WITH\WITHOUT] TIME ZONE]A date and time with or without a timezone. As with TIME, p specifies the fractional second precision, if it is provided. Values must be within the range of 0000-01-01 00:00:00 to 9999-12-31 23:59:59.999999999.
TIMESTAMP_LTZ(p)A synonym for TIMESTAMP(p) WITH LOCAL TIME ZONE.

Compound Types

TypeDescription
ARRAY<T>An array of a type T, and a maximum size of 2^31-1.
T ARRAYSynonym for ARRAY<T>.
MAP<K,V>An associative array with keys of type K and values of type V. Both K and V support any type. Each unique key can only have one value.
ROW<name type ['description'], ...>A structure of named fields. name is the field name, type is its type, and description is an optional text description of the field. Field definitions can be repeated as many times as necessary.

Other Types

TypeDescription
BOOLEANA true / false value.
INTERVAL
MULTISET

Computed fields

In addition to the types above, a computed field may be used in a Stream schema.

Such a field's type is given as AS, followed by an SQL expression. The field's actual data type is derived automatically from the given expression and does not have to be declared manually.

For example: AS 1 + 1.

The SQL expression may include:

The Decodable service will normalize the expression, and will reject any malformed expression with an error.

One common use is to provide processing time: AS PROCTIME().

Note that Stream preview will show null for computed fields, but the correct values will be provided correctly to consuming Pipelines and Connections.

Constraints

Fields may optionally provide one or more SQL constraints on fields.

ConstraintDescription
NOT NULLTo mark a field as required.
PRIMARY KEYTo indicate that a field is used to uniquely identify a row. This means that the stream created will have type=CHANGE. If a record in the stream has a key that appeared before, the key would be updated with the new value. Currently, only one field may be included in the primary key: composite keys are not currently supported.

Additional constraints are coming soon!

๐Ÿ“˜

This page contains copied contents from the Apache Flinkยฎ documentation.

See Credits page for details.

Physical Schema

In addition to logical schema, users can query the physical schema of a stream. The physical schema of an Append stream is the same as the the logical schema. The physical schema of a Change stream contains the Debezium schema wrapper as change streams are stored in the Debezium format internally. Physical schema is especially useful when to_append() is used to allow a Change stream to be processed as an Append stream.

For example if a change stream has two fields:

schema
   0  id              INTEGER PRIMARY KEY
   1  value           STRING

The physical schema of the stream would be

schema
   0  OP              STRING
   1  before          ROW<id: INTEGER, value: STRING>
   2  after           ROW<id: INTEGER, value: STRING>

Whatโ€™s Next