Stream conversion functions

SQL Function Description

TO_APPEND(change_stream_name)

Convert the change stream change_stream_name into an append stream.

Imagine a change stream user_info has the logical schema with fields: user_id, password, etc, the stream can be processed over the physical schema like below:

INSERT INTO password_changed
SELECT
    after.user_id AS user_id
FROM TABLE(
  TO_APPEND(user_info) -- Use the table function to_append() here
)
WHERE
    op='u' AND
    before.password <> after.password -- Access the fields over the physical schema

See more details here.

TO_CHANGE(append_stream_name)

Convert the append stream append_stream_name into a change stream.

You must have a partition key in the append stream in order to convert it into a change stream. When converted, the partition key becomes the change stream’s primary key.

Imagine there are two append streams:

  • user_locations that has fields user_id, and location where a new record is created whenever there is an update in a user’s location.

  • user_info that has fields user_id, and name where a new record is created whenever there is an update in a user’s name.

We can build a pipeline to output a change stream that allows sink connections to materialize the latest user info with the latest location. To do it, first turn user_locations and user_info into change streams.

-- user_location_materialized is a change stream that encodes
-- new location per user_id as updates
INSERT INTO user_location_materialized
SELECT
    user_id,
    location
FROM TABLE(
  TO_CHANGE(user_locations) -- user_locations is partitioned by user_id
)

-- user_info_materialized is a change stream that encodes
-- new name per user_id as updates
INSERT INTO user_info_materialized
SELECT
    user_id,
    name
FROM TABLE(
  TO_CHANGE(user_info) -- user_info is partitioned by user_id
)

Then, join user_location_materialized stream with the stream user_info_materialized

-- user_info_with_location is a change stream that would emit
-- a new update record when a user's name or location changes
INSERT INTO user_info_with_location
SELECT
	user_id,
  user_name,
  location
FROM user_info_materialized ui
JOIN user_location_materialized ul
ON ui.user_id = ul.user_id

See more details here.