Pipelines examples for the most popular data transformations

Most pipelines fit into one or more established patterns. Here, we’ll describe some of these patterns and provide example SQL to help you get started. With these building blocks, Decodable users can build a series or network of pipelines to deliver sophisticated data flows across teams, regions, and use cases.

Filter

Filtering pipelines generally exist to remove unwanted records. Their defining characteristic is the SQL where clause in which predicates can be thought of as “rules” that apply to the data. This makes them common when a team is creating curated feeds for different audiences. Specifically, filtering refers to removing records from a stream, not fields from a record.

A filtering pipeline will, at most, emit all of its input records. This means its input/output ratio is 1:n where n <= 1. They are examples of a “map-style” or “map-only” pipeline. Filtering pipelines use no block operators and therefore introduce no artificial delay into output data. Unless a filtering pipeline is also acting as a transformer or uses excessively complex predicates, it uses very little resources and can easily process high volumes of data. In most cases, filtering pipelines have no stateful operators beyond the sources.

Common use cases:

  • Remove records to ensure downstream teams or services remain compliant with various data access policies

  • Remove unneeded records to make downstream processing and/or storage more efficient

-- Filter only records pertaining to the application
insert into application_events
select * from http_events
where hostname = 'app.decodable.co'
-- Filter only records that modify the inventory
insert into inventory_updates
select * from http_events
where hostname = 'api.mycompany.com' and
  path like '/v1/inventory%' and
  `method` in ( 'POST', 'PUT', 'DELETE', 'PATCH' )

Route

Records can be routed to one or more destinations by creating multiple filtering pipelines. Each pipeline handles a single destination with a different where clause. This works because streams are pub/sub - multiple pipelines receive a full copy of the stream without impacting one another - and because multiple pipelines can insert into the same stream.

-- Route security-related HTTP events
insert into security_events
select * from http_events
where path like '/login%' or
  path like '/billing/cc%'
-- Route app-related HTTP events
insert into application_events
select * from http_events
where hostname = 'app.decodable.co'
-- Route requests to CS if it looks like the user needs help
insert into cs_alerts
select * from http_events
where response_code between 500 and 599 or -- any server failure
  ( path = '/signup' and response_code != 200 ) -- failed to sign up for any reason

Transform

Transforming pipelines alter the shape of their input records in some way, outputting records with a different schema. The defining characteristic of a transforming pipeline is the set of projection expressions that appear in the select clause. Any pipeline whose output projection does not match its input is a transforming pipeline.

A purely transforming pipeline will always output at least one record for each input record making its input/output ratio 1:n where n is >= 1. Transforming pipelines tend to be map-only, and typically do not contain blocking operators. Their performance, however, can vary significantly based on the complexity of the projection expressions and other operators used to alter the data. As a result, the amount of data a single task can process will depend on the complexity of the transformation as well as the size and rate of the data. In some cases, transforming pipelines use stateful operators, although this is rare.

Common use cases:

  • Remove specific fields, or alter their content, to ensure downstream teams and/or services remain compliant with various data access policies.

  • Add additional fields to records from a reference (dimension) table.

  • Remove unneeded fields to make downstream processing and/or storage more efficient.

  • Make field names, data types, or contents consistent across systems.

Transforming pipelines tend to be some of the most complex pipelines with a number of subcategories that carry different connotations.

Parse and extract

Pipelines that derive new fields from existing fields are called parsing or extraction pipelines. In both cases, the defining characteristic is the set of functions that are used. For example, a pipeline containing a JSON-parsing function is called a parsing pipeline. An example of extraction can be a pipeline that contains a set of regular expressions that match and extract new fields from the result.

-- Parse timestamp and action
insert into user_events
select
  to_date(fields['ts'], 'YYYY-MM-DD''T''HH:MI:SS') as ts,
  fields['user_id']    as user_id,
  fields['path']       as path,
  case fields['method']
    when 'GET'         then 'read'
    when 'POST', 'PUT' then 'modify'
    when 'DELETE'      then 'delete'
  end as action
from (
  select
    grok(
      body,
      '\[${ISO8661_DATETIME:ts} ${DATA:method} "${PATH:path}" uid:${DATA:user_id}'
    ) as fields
  from http_event
)

Cleanse and normalize

Cleaning or normalizing pipelines focus on fixing data quality issues or normalizing a schema to match what a downstream system or service expects. In many cases, normalizing pipelines contain field renames, data type conversions, nullity checks, and conditional value replacements.

-- Cleanse incoming data for downstream processes
insert into sensor_readings
select
  cast(ifnull(sensor_id, '0') as bigint) as sensor_id,
  lower(trim(name))                      as name,
  cast(`value` as bigint)                as reading
from raw_sensor_readings

Mask and anonymize

Anonymization or masking pipelines omit or obfuscate sensitive fields for compliance, regulatory, or other policy reasons. A common pattern of access control enforcement is to omit sensitive fields prior to serving a stream to other teams or services.

-- Anonymize SSNs and zip codes
insert into user_events_masked
select
  user_id,
  username,
  overlay(ssn placing '*' from 1 for 12) as ssn,
  substring(zip_code from 1 for 2)       as zip_code_1,
  action
from user_events

Aggregate

Aggregation pipelines are characterized by the use of aggregate functions. In streaming, almost all aggregation pipelines contain a window function to discretize the stream into bins on which the aggregation is performed. The most common aggregate functions are count, min, max, avg, and sum, but others do exist. Aggregations naturally result in fewer output records than input records making their input/output ratio n:1 where n >= 1. While there isn’t a clear or formal distinction between “stream processing” and “streaming analytics,” the latter is more closely associated with aggregation pipelines.

Common use cases:

  • Perform summary statistics on streams to support real-time data quality visualizations or alerts.

  • Extract and aggregate features for insertion into a low-latency feature store for online ML/AI systems.

  • Derive time-series metrics from discrete events for alerting systems.

-- Count the number of events by path and status every 10 seconds.
insert into site_activity
select
  window_start,
  window_end,
  path,
  status,
  count(1) as `count`
from table(
  tumble(
    table http_events,
    descriptor(_time),
    interval '10' seconds
  )
)
group by window_start, window_end, path, status
-- Count the number of records by actions and user every 10 seconds over 5 minutes.
insert into feature_store
select
  window_start,
  window_end,
  user_id,
  action,
  count(1),
  first(_time) as first_event_time,
  last(_time) as last_event_time
from table(
  hop(
    table user_events,
    descriptor(_time),
    interval '10' seconds, -- Slide every 10 seconds
    interval '5' minutes   -- Look at 5 minutes
  )
)
group by window_start, window_end, user_id, action

Trigger

Triggering pipelines emit a special record to an external system to indicate an action should be performed. The role of a triggering pipeline is to express the conditions under which the action should be performed (the where clause) and create the record the action system is expecting when the conditions are met (the select clause). In some cases, it’s necessary to detect a complex condition over multiple records. This can be done using aggregate functions wrapped by an outer query with a where clause or even complex patterns using SQL’s match/recognize clauses.

Common use cases:
Sending push notifications for select user actions.
Adapting internal streams to integrate with SaaS products.
Firing alerts or automated processes when conditions are met in a stream.

-- Build hourly usage data for a Stripe integration on the output stream
insert into stripe_product_usage
select
  window_start as _time,
  customer_id,
  'abcd1234' as price_id,
  sum(bytes_sent) / 1024 / 1024 as mb_sent
from table(
  tumble(
    table document_downloads,
    descriptor(_time),
    interval '1' hour
  )
)
group by window_start, customer_id

Process a change stream as an append stream

Pipelines process change streams based on the logical schema under normal circumstances.

There are use cases where the processing is done over the physical schema. For example, output all the users that made location changes in a change stream. In other words, we want to process the change stream as an append type stream rather than interpreting the changes.

Table function to_append(<change-stream>) can turn a change stream into an append stream. Imagine a change stream user_info has the logical schema with fields: user_id, password, etc, the stream can be processed over the physical schema like below:

insert into password_changed
select
    after.user_id as user_id
from table(
  to_append(user_info) -- Use the table function to_append() here
)
where
    op='u' and
    before.password <> after.password -- Access the fields over the physical schema

Process an append stream as a change stream

When an append stream is partitioned, the stream can be turned into a change stream based on the partition key, which emits the change records per key. In other words, a value of a key that appears later in the stream will be encoded as an update.

Table function to_change(<append_stream>) can turn an append stream into a change stream based on the <append_stream>'s partition key. Imagine there are two append streams:

  • user_locations that has fields user_id, and location where a new record is created whenever there is an update in a user’s location.

  • user_info that has fields user_id, and name where a new record is created whenever there is an update in a user’s name.

We can build a pipeline to output a change stream that allows sink connections to materialize the latest user info with the latest location. To do it, first turn user_locations and user_info into change streams.

-- user_location_materialized is a change stream that encodes
-- new location per user_id as updates
insert into user_location_materialized
select
    user_id,
    location
from table(
  to_change(user_locations) -- user_locations is partitioned by user_id
)

-- user_info_materialized is a change stream that encodes
-- new name per user_id as updates
insert into user_info_materialized
select
    user_id,
    name
from table(
  to_change(user_info) -- user_info is partitioned by user_id
)

Then, join user_location_materialized stream with the stream user_info_materialized

-- user_info_with_location is a change stream that would emit
-- a new update record when a user's name or location changes
insert into user_info_with_location
select
	user_id,
  user_name,
  location
from user_info_materialized UI
join user_location_materialized UL
on UI.user_id = UL.user_id