About Decodable Pipelines

A pipeline is a set of data processing instructions written in SQL or expressed as an Apache Flink job. Pipelines can perform a range of processing including simple filtering and column selection, joins for data enrichment, time-based aggregations, and even pattern detection. When you create a pipeline, you define what data to process, how to process it, and where to send that data to in either a SQL query or a JVM-based programming language of your choosing such as Java or Scala.

Any data transformations that Decodable does happens in a pipeline. To configure Decodable to transform streaming data, you must insert a pipeline between streams. Pipelines aren’t required simply to move or replicate data in real-time. Pipelines don’t start processing data until they’re activated. You activate a pipeline to start it, and deactivate a pipeline to stop it.

Decodable supports a subset of Flink SQL functions with some additional functionality. See SQL functions for a detailed list of functions available for use by your pipeline queries.

Reserved words

There are certain words that can’t be used in pipeline SQL for identifiers such as field names, stream names, and so forth.

However, you can use a reserved word if you enclose the word in backticks (`). For example, you can’t use value for a field name but you can use `value`. For a list of all reserved words, see Reserved Keywords in the Apache Flink documentation.

Data types

Pipelines use the same data types as streams. See Decodable data types for more information.

Processing guarantees

Pipelines process data exactly once. This means that, even in the case of temporary failures or pipeline restarts, every record will be processed once and only once. For example, if you have a COUNT() in your pipeline, it will never under- or over-count the records in its input stream, nor will it generate duplicates in its output stream. That said, some connectors might not guarantee exactly once delivery, in which case duplicate records might be introduced as data is moved on or off the Decodable platform through a connection.

Under the hood, Decodable uses a distributed checkpointing system that takes consistent snapshots of the pipeline’s state at regular intervals. This includes which data has already been processed, intermediate calculation results, and many other details. When a pipeline recovers from a failure or restart, it loads the last successful snapshot, rewinds the stream to when the snapshot was taken, and begins processing again.

Time in Pipelines

When implementing any stream processing use cases, one of the most important things you should consider is the notion of time. Depending on your use case, you should consider how you want your pipeline to handle time. Decodable can process records data based on different notions of time.

  • Event time: The time present in the record itself, which represents the time at which an event actually occurred in the real world or when the event originated from an upstream system.

  • Processing time: The "wall clock" time when Decodable processes the record.

Best practices are to use event time when performing joins or aggregations, because normally you want to group records based on when the event happened rather than when they’re processed by Decodable.

You must identify the field that contains the event time by specifying a watermark. You specify a watermark in the incoming stream. See Manage schemas for more information.

If you want to use processing time instead, you must identify or add a field that contains processing time with the PROCTIME() function. You can define this field in the pipeline itself, or in the incoming stream. For example, if you wanted to add the processing time as a field in the schema of the incoming stream, select Streams in Decodable web and select the stream that you’d like to modify. Then, select Schema and either add or modify an existing field to use the PROCTIME() function.

The schema view for a stream containing a field with the data type PROCTIME(). When specifying PROCTIME() as the data type for a field