Pipelines examples for the most popular data transformations Most pipelines fit into one or more established patterns. Here, we’ll describe some of these patterns and provide example SQL to help you get started. With these building blocks, Decodable users can build a series or network of pipelines to deliver sophisticated data flows across teams, regions, and use cases. Filter Filtering pipelines are often used to remove unwanted records. Their defining characteristic is the SQL where clause in which predicates can be thought of as “rules” that apply to the data. This makes them common when a team is creating curated feeds for different audiences. Specifically, filtering refers to removing records from a stream, not fields from a record. A filtering pipeline will, at most, emit all of its input records. This means its input/output ratio is 1:n where n <= 1. They’re examples of a “map-style” or “map-only” pipeline. Filtering pipelines use no block operators and therefore introduce no artificial delay into output data. Unless a filtering pipeline is also acting as a transformer or uses excessively complex predicates, it uses little resources and can process high volumes of data. In most cases, filtering pipelines have no stateful operators beyond the sources. Common use cases: Remove records to ensure downstream teams or services remain compliant with various data access policies Remove unneeded records to make downstream processing and/or storage more efficient -- Filter only records pertaining to the application insert into application_events select * from http_events where hostname = 'app.decodable.co' -- Filter only records that modify the inventory insert into inventory_updates select * from http_events where hostname = 'api.mycompany.com' and path like '/v1/inventory%' and `method` in ( 'POST', 'PUT', 'DELETE', 'PATCH' ) Route Records can be routed to one or more sinks by creating multiple filtering pipelines. Each pipeline handles a single sink with a different where clause. This works because streams are pub/sub - multiple pipelines receive a full copy of the stream without impacting one another - and because multiple pipelines can insert into the same stream. -- Route security-related HTTP events insert into security_events select * from http_events where path like '/login%' or path like '/billing/cc%' -- Route app-related HTTP events insert into application_events select * from http_events where hostname = 'app.decodable.co' -- Route requests to CS if it looks like the user needs help insert into cs_alerts select * from http_events where response_code between 500 and 599 or -- any server failure ( path = '/signup' and response_code != 200 ) -- failed to sign up for any reason Transform Transforming pipelines alter the shape of their input records in some way, outputting records with a different schema. The defining characteristic of a transforming pipeline is the set of projection expressions that appear in the select clause. Any pipeline whose output projection doesn’t match its input is a transforming pipeline. A purely transforming pipeline will always output at least one record for each input record making its input/output ratio 1:n where n is >= 1. Transforming pipelines tend to be map-only, and typically don’t contain blocking operators. Their performance, however, can vary significantly based on the complexity of the projection expressions and other operators used to alter the data. As a result, the amount of data a single task can process will depend on the complexity of the transformation as well as the size and rate of the data. In some cases, transforming pipelines use stateful operators, although this is rare. Common use cases: Remove specific fields, or alter their content, to ensure downstream teams and/or services remain compliant with various data access policies. Add additional fields to records from a reference (dimension) table. Remove unneeded fields to make downstream processing and/or storage more efficient. Make field names, data types, or contents consistent across systems. Transforming pipelines tend to be some of the most complex pipelines with a number of subcategories that carry different connotations. Parse and extract Pipelines that derive new fields from existing fields are called parsing or extraction pipelines. In both cases, the defining characteristic is the set of functions that are used. For example, a pipeline containing a JSON-parsing function is called a parsing pipeline. An example of extraction can be a pipeline that contains a set of regular expressions that match and extract new fields from the result. -- Parse timestamp and action insert into user_events select to_date(fields['ts'], 'YYYY-MM-DD''T''HH:MI:SS') as ts, fields['user_id'] as user_id, fields['path'] as path, case fields['method'] when 'GET' then 'read' when 'POST', 'PUT' then 'modify' when 'DELETE' then 'delete' end as action from ( select grok( body, '\[${ISO8661_DATETIME:ts} ${DATA:method} "${PATH:path}" uid:${DATA:user_id}' ) as fields from http_event ) Cleanse and normalize Cleaning or normalizing pipelines focus on fixing data quality issues or normalizing a schema to match what a downstream system or service expects. In many cases, normalizing pipelines contain field renames, data type conversions, nullity checks, and conditional value replacements. -- Cleanse incoming data for downstream processes insert into sensor_readings select cast(ifnull(sensor_id, '0') as bigint) as sensor_id, lower(trim(name)) as name, cast(`value` as bigint) as reading from raw_sensor_readings Mask and anonymize Anonymization or masking pipelines omit or obfuscate sensitive fields for compliance, regulatory, or other policy reasons. A common pattern of access control enforcement is to omit sensitive fields prior to serving a stream to other teams or services. -- Anonymize SSNs and zip codes insert into user_events_masked select user_id, username, overlay(ssn placing '*' from 1 for 12) as ssn, substring(zip_code from 1 for 2) as zip_code_1, action from user_events Aggregate Aggregation pipelines are characterized by the use of aggregate functions. In streaming, almost all aggregation pipelines use a window function to divide the continuous data stream into discrete intervals or segments, where the aggregation operations are then applied. The most common aggregate functions are count, min, max, avg, and sum, but others do exist. Aggregations result in fewer output records than input records making their input/output ratio n:1 where n >= 1. While there isn’t a clear or formal distinction between “stream processing” and “streaming analytics,” the latter is often associated with aggregation pipelines. Common use cases: Perform summary statistics on streams to support real-time data quality visualizations or alerts. Extract and aggregate features for insertion into a low-latency feature store for online ML/AI systems. Derive time-series metrics from discrete events for alerting systems. -- Count the number of events by path and status every 10 seconds. insert into site_activity select window_start, window_end, path, status, count(1) as `count` from table( tumble( table http_events, descriptor(_time), interval '10' seconds ) ) group by window_start, window_end, path, status -- Count the number of records by actions and user every 10 seconds over 5 minutes. insert into feature_store select window_start, window_end, user_id, action, count(1), first(_time) as first_event_time, last(_time) as last_event_time from table( hop( table user_events, descriptor(_time), interval '10' seconds, -- Slide every 10 seconds interval '5' minutes -- Look at 5 minutes ) ) group by window_start, window_end, user_id, action Trigger Triggering pipelines emit a special record to an external system to indicate an action should be performed. The role of a triggering pipeline is to express the conditions under which the action should be performed (the where clause) and create the record the action system is expecting when the conditions are met (the select clause). In some cases, it’s necessary to detect a complex condition over multiple records. This can be done using aggregate functions wrapped by an outer query with a where clause or even complex patterns using SQL’s match/recognize clauses. Common use cases: Sending push notifications for select user actions. Adapting internal streams to integrate with SaaS products. Firing alerts or automated processes when conditions are met in a stream. -- Build hourly usage data for a Stripe integration on the output stream insert into stripe_product_usage select window_start as _time, customer_id, 'abcd1234' as price_id, sum(bytes_sent) / 1024 / 1024 as mb_sent from table( tumble( table document_downloads, descriptor(_time), interval '1' hour ) ) group by window_start, customer_id Process a Change Stream as an Append Stream Pipelines process change streams based on the logical schema under normal circumstances. There are use cases where the processing is done over the physical schema. For example, output all the users that made location changes in a change stream. In other words, we want to process the change stream as an append type stream rather than interpreting the changes. Table function TO_APPEND can turn a change stream into an append stream. Process an Append Stream as a Change Stream When an append stream is partitioned, the stream can be turned into a change stream based on the partition key, which emits the change records per key. In other words, a value of a key that appears later in the stream will be encoded as an update. Table function TO_CHANGE can turn an append stream into a change stream based on the append stream’s partition key.