Regular joins You can join data from two or more streams together with a pipeline, regardless of when the data arrives. This is called a regular join. Because regular joins don’t have defined time boundaries for matching records, they’re sometimes referred to as unbounded joins. Regular joins are the most generic and flexible join type, but they can come at a high resource cost. In regular joins, every time a new record is received from one stream it must be compared against every previous record from the streams that it’s joined against. Regular joins can be costly and resource-constrained depending on how large the tables or streams are. That being said, the cost of regular joins is controllable if you are joining two change streams and the number of unique keys of each of the change stream’s primary key fields are limited. In this scenario, Decodable only needs to store the latest value for each key in each change stream. In contrast, if you join an append stream, Decodable needs to store all incoming records. See About Change Data Capture and Change Streams for more information about change data capture records and change streams. Regular joins always output a change stream. The exception to this rule is when you are doing an inner join of two or more append streams. To illustrate how to perform regular joins in Decodable, let’s assume that you have the following data. Transactions (Left table) order_id customer_id transaction_date 73929 1 02-19-2022 93916 2 02-20-2022 82730 39 02-20-2022 36473 47 02-21-2022 Users (Right table) customer_id name email country 2 Joe Smith joesmith@example.com Canada 47 Alice Cooper alicecooper@example.com USA 58 Alex Kim akim@example.com UK You can join these two streams together to produce one enhanced stream. The following sections describe the results that you would see with different types of regular joins. Inner join An inner join combines records from two or more streams whenever there are matching values in a field common to the streams. When you are performing an inner join on two or more change streams, this means that the output stream contains a change data capture record to update the downstream sink table, if there was a match. Pipeline SQL example INSERT INTO Final_Stream SELECT * FROM Transactions AS L + INNER JOIN Users AS R on L.customer_id=R.customer_id Results Using the data in the previous tables, the resulting stream would look something like this: order_id transaction_date customer_id name email country 93916 02-20-2022 2 Joe Smith joesmith@example.com Canada 36473 02-21-2022 47 Alice Cooper alicecooper@example.com USA Note: A record is emitted only when there is a match. Left join A left join returns all records from the left stream, and the matching records from the right stream. If there is no match on the right stream, those fields return "null" as the value. Pipeline SQL example INSERT INTO Final_Stream SELECT * from Transactions as L LEFT JOIN Users as R on L.customer_id=R.customer_id Results Using the data in the previous tables, the results would look something like this. order_id transaction_date customer_id name email country 73939 02-19-2022 1 null null null 93916 02-20-2022 2 Joe Smith joesmith@example.com Canada 82730 02-20-2022 39 null null null 36473 02-21-2022 47 Alice Cooper alicecooper@example.com USA A record from the right stream is emitted only when there is a match in the left stream. Right join A right join returns all records from the right stream, and the matching records from the left stream. If there is no match on the left stream, those fields return the value "null." Pipeline SQL example INSERT INTO Final_Stream SELECT * from Transactions as L + RIGHT JOIN Users as R on L.customer_id=R.customer_id Results Using the data in the previous tables, the results would look something like. order_id transaction_date customer_id name email country 93916 02-20-2022 2 Joe Smith joesmith@example.com Canada 36473 02-21-2022 47 Alice Cooper alicecooper@example.com USA null null 58 Alex Kim akim@example.com UK Note: A record is emitted only when there is a match. Full outer join A full outer join returns all records from both the left and right streams. Pipeline SQL example INSERT INTO Final_Stream SELECT * from Transactions as L FULL OUTER JOIN Users as R on L.customer_id=R.customer_id Results Using the data in the previous table, the results would look something like this. order_id transaction_date customer_id name email country 73939 02-19-2022 1 null null null 93916 02-20-2022 2 Joe Smith joesmith@example.com Canada 82730 02-20-2022 39 null null null 36473 02-21-2022 47 Alice Cooper alicecooper@example.com USA null null 58 Alex Kim akim@example.com UK Joins that produce nulls Regular joins produce "eventually consistent" guarantees. Because regular joins don’t include a time limit for how long to wait for corresponding data to arrive from the other stream, there can be a situation where the records from one stream are processed before the records from the other stream. This means that you might temporarily see out-of-date results, but eventually the output arrives at the correct result. Using the example in the "Left join," if this situation occurs you might see null for the order_id 93916 and 36473 even though there is a match for those order_id values. However, the join will eventually output another row with the match when the corresponding record arrives from the other stream. Because of this caveat, the outputs of a left, right, and full outer join are always a change stream, which means that it can intelligently handle updates and deletions and eventually represent the final result of the join. Example: Join Change Streams together to produce one enriched Stream You can join two or more change streams together to combine the changes from multiple different streams into a single stream of records. See About Change Data Capture and Change Streams for more information about change streams. Let’s assume that you have the following change streams, and you want to join these three change streams to make one enriched stream that contains what product was ordered from your store, who ordered that product, and what the order status is. Orders Let’s assume that you have a change stream called orders that contains records that resemble the following. { "before":{ "id":2, "order_date":"2022-11-16 22:11:09", "user_id":2, "price":19.95, "product_id":3, "order_status":"SHIPPED", "update_ts":"2022-11-16 23:25:37" }, "after":{ "id":2, "order_date":"2022-11-16 22:11:09", "user_id":2, "price":19.95, "product_id":3, "order_status":"DELIVERED", "update_ts":"2022-11-17 19:27:54" }, "op":"u", "ts_ms":1668713274615 } { "before":null, "after":{ "id":4, "order_date":"2022-11-17 08:15:21", "user_id":1, "price":49.95, "product_id":2, "order_status":"ACCEPTED", "update_ts":"2022-11-15 22:45:43" }, "op":"r", "ts_ms":1668705684227 } { "before":null, "after":{ "id":3, "order_date":"2022-11-17 08:12:11", "user_id":1, "price":19.99, "product_id":1, "order_status":"ACCEPTED", "update_ts":"2022-11-15 22:45:29" }, "op":"r", "ts_ms":1668705684227 } { "before":null, "after":{ "id":1, "order_date":"2022-11-15 10:08:27", "user_id":3, "price":150, "product_id":4, "order_status":"DELIVERED", "update_ts":"2022-11-15 23:13:48" }, "op":"r", "ts_ms":1668705684225 } { "before":null, "after":{ "id":2, "order_date":"2022-11-16 22:11:09", "user_id":2, "price":19.95, "product_id":3, "order_status":"SHIPPED", "update_ts":"2022-11-16 23:25:37" }, "op":"r", "ts_ms":1668705684226 } Products Let’s assume that you have a change stream called products that contains records that resemble the following. { "before":{ "id":1, "name":"Tabletop lamp", "description":"An adorable option for brightening a bedside table." }, "after":{ "id":1, "name":"Orb tabletop lamp", "description":"An adorable option for brightening a bedside table." }, "op":"u", "ts_ms":1668713425104 } { "before":null, "after":{ "id":5, "name":"Checkerboard vase", "description":"Your favorite picnic print now on a vase." }, "op":"r", "ts_ms":1668705685614 } { "before":null, "after":{ "id":1, "name":"Tabletop lamp", "description":"An adorable option for brightening a bedside table." }, "op":"r", "ts_ms":1668705685612 } { "before":null, "after":{ "id":2, "name":"New York City candle", "description":"The distinctive smell of NYC in a candle." }, "op":"r", "ts_ms":1668705685614 } { "before":null, "after":{ "id":3, "name":"Forest incense", "description":"Assortment of three fragrances: maple, fir, cypress." }, "op":"r", "ts_ms":1668705685614 } { "before":null, "after":{ "id":4, "name":"Sheepskin rug", "description":"Winter ready room accessory" }, "op":"r", "ts_ms":1668705685614 } Users Let’s assume that you have a change stream called users that contains records that resemble the following. { "before":{ "id":2, "name":"Samantha Garcia", "zip_code":"94105" }, "after":{ "id":2, "name":"Samantha Garcia", "zip_code":"94107" }, "op":"u", "ts_ms":1668713723815 } { "before":{ "id":1, "name":"Roger Collins", "zip_code":"10002" }, "after":{ "id":1, "name":"Roger Collins", "zip_code":"10013" }, "op":"u", "ts_ms":1668711427697 } { "before":null, "after":{ "id":2, "name":"Samantha Garcia", "zip_code":"94105" }, "op":"r", "ts_ms":1668705685039 } { "before":null, "after":{ "id":1, "name":"Roger Collins", "zip_code":"10002" }, "op":"r", "ts_ms":1668705685038 } { "before":null, "after":{ "id":3, "name":"David Lee", "zip_code":"28358" }, "op":"r", "ts_ms":1668705685034 } Pipeline SQL The following SQL joins all three of these streams together and emits a single output stream named enriched_orders insert into enriched_orders select O.id as order_id, O.order_status as order_status, O.update_ts as order_update_ts, O.product_id as product_id, P.name as product_name, O.price as price, U.id as user_id, U.name as user_name, U.zip_code as zip_code from `orders` O join `products` P on O.product_id = P.id join `users` U on O.user_id = U.id Results The resultant enriched_orders stream would look something like the following. { "before":null, "after":{ "order_id":2, "order_status":"DELIVERED", "order_update_ts":"2022-11-17 19:27:54", "product_id":3, "product_name":"Forest incense", "price":19.95, "user_id":2, "user_name":"Samantha Garcia", "zip_code":"94107" }, "op":"c" } { "before":{ "order_id":2, "order_status":"DELIVERED", "order_update_ts":"2022-11-17 19:27:54", "product_id":3, "product_name":"Forest incense", "price":19.95, "user_id":2, "user_name":"Samantha Garcia", "zip_code":"94105" }, "after":null, "op":"d" } { "before":null, "after":{ "order_id":3, "order_status":"ACCEPTED", "order_update_ts":"2022-11-15 22:45:29", "product_id":1, "product_name":"Orb tabletop lamp", "price":19.99, "user_id":1, "user_name":"Roger Collins", "zip_code":"10000" }, "op":"c" } { "before":{ "order_id":3, "order_status":"ACCEPTED", "order_update_ts":"2022-11-15 22:45:29", "product_id":1, "product_name":"Tabletop lamp", "price":19.99, "user_id":1, "user_name":"Roger Collins", "zip_code":"10000" }, "after":null, "op":"d" } { "before":null, "after":{ "order_id":2, "order_status":"DELIVERED", "order_update_ts":"2022-11-17 19:27:54", "product_id":3, "product_name":"Forest incense", "price":120, "user_id":2, "user_name":"Samantha Garcia", "zip_code":"94105" }, "op":"c" } { "before":{ "order_id":2, "order_status":"SHIPPED", "order_update_ts":"2022-11-16 23:25:37", "product_id":3, "product_name":"Forest incense", "price":19.99, "user_id":2, "user_name":"Samantha Garcia", "zip_code":"94105" }, "after":null, "op":"d" }