Regular joins

You can join data from two or more streams together with a pipeline, regardless of when the data arrives. This is called a regular join. Because regular joins don’t have defined time boundaries for matching records, they are sometimes referred to as unbounded joins. Regular joins are the most generic and flexible join type, but they come at a potentially huge resource cost. In regular joins, every time a new record is received from one stream it must be compared against every previous record from the streams that it is joined against. Regular joins can be extremely costly and resource-constrained depending on how large the tables or streams are. That being said, the cost of regular joins is controllable if you are joining two change streams and the number of unique keys of each of the change stream’s primary key fields are limited. In this scenario, Decodable only needs to store the latest value for each key in each change stream. In contrast, if you join an append stream, Decodable needs to store all incoming records. See About change data capture and change streams for more information about change data capture records and change streams.

Regular joins always output a change stream. The exception to this rule is when you are doing an inner join of two or more append streams.

To illustrate how to perform regular joins in Decodable, let’s assume that you have the following data.

Transactions (Left table)

order_id customer_id transaction_date

73929

1

02-19-2022

93916

2

02-20-2022

82730

39

02-20-2022

36473

47

02-21-2022

Users (Right table)

customer_id name email country

2

Joe Smith

joesmith@example.com

Canada

47

Alice Cooper

alicecooper@example.com

USA

58

Alex Kim

akim@example.com

UK

You can join these two streams together to produce one enhanced stream. The following sections describe the results that you would see with different types of regular joins.

Inner join

An inner join combines records from two or more streams whenever there are matching values in a field common to the streams. When you are performing an inner join on two or more change streams, this means that the output stream contains a change data capture record to update the downstream destination table, if there was a match.

Pipeline SQL example

INSERT INTO Final_Stream SELECT * FROM Transactions AS L +
INNER JOIN Users AS R on L.customer_id=R.customer_id

Results

Using the data in the previous tables, the resulting stream would look something like this:

order_id transaction_date customer_id name email country

93916

02-20-2022

2

Joe Smith

joesmith@example.com

Canada

36473

02-21-2022

47

Alice Cooper

alicecooper@example.com

USA

Note: A record is emitted only when there is a match.

Left join

A left join returns all records from the left stream, and the matching records from the right stream. If there is no match on the right stream, those fields return "null" as the value.

Pipeline SQL example

INSERT INTO Final_Stream SELECT * from Transactions as L
LEFT JOIN  Users as R on L.customer_id=R.customer_id

Results

Using the data in the previous tables, the results would look something like this.

order_id transaction_date customer_id name email country

73939

02-19-2022

1

null

null

null

93916

02-20-2022

2

Joe Smith

joesmith@example.com

Canada

82730

02-20-2022

39

null

null

null

36473

02-21-2022

47

Alice Cooper

alicecooper@example.com

USA

A record from the right stream is emitted only when there is a match in the left stream.

Right join

A right join returns all records from the right stream, and the matching records from the left stream. If there is no match on the left stream, those fields return the value "null".

Pipeline SQL example

INSERT INTO Final_Stream SELECT * from Transactions as L +
RIGHT JOIN  Users as R on L.customer_id=R.customer_id

Results

Using the data in the previous tables, the results would look something like.

order_id transaction_date customer_id name email country

93916

02-20-2022

2

Joe Smith

joesmith@example.com

Canada

36473

02-21-2022

47

Alice Cooper

alicecooper@example.com

USA

null

null

58

Alex Kim

akim@example.com

UK

Note: A record is emitted only when there is a match.

Full outer join

A full outer join returns all records from both the left and right streams.

Pipeline SQL example

INSERT INTO Final_Stream SELECT * from Transactions as L
FULL OUTER JOIN Users as R on L.customer_id=R.customer_id

Results

Using the data in the previous table, the results would look something like this.

order_id transaction_date customer_id name email country

73939

02-19-2022

1

null

null

null

93916

02-20-2022

2

Joe Smith

joesmith@example.com

Canada

82730

02-20-2022

39

null

null

null

36473

02-21-2022

47

Alice Cooper

alicecooper@example.com

USA

null

null

58

Alex Kim

akim@example.com

UK

Why do I sometimes see nulls even when there should be a match?

Regular joins produce "eventually consistent" guarantees. Because regular joins do not include a time limit for how long to wait for corresponding data to arrive from the other stream, there can be a situation where the records from one stream are processed before the records from the other stream. This means that you might temporarily see out-of-date results, but eventually the output arrives at the correct result.

Using the example in the "Left join", if this situation occurs you might see null for the order_id 93916 and 36473 even though there is a match for those order_id values. However, the join will eventually output another row with the match when the corresponding record arrives from the other stream. Because of this caveat, the outputs of a left, right, and full outer join are always a change stream, which means that it can intelligently handle updates and deletions and eventually represent the final result of the join.

Example: Join change streams together to produce one enriched stream

You can join two or more change streams together to combine the changes from multiple different streams into a single stream of records. See About change data capture and change streams for more information about change streams.

Let’s assume that you have the following change streams, and you want to join these three change streams to make one enriched stream that contains what product was ordered from your store, who ordered that product, and what the order status is.

Orders Let’s assume that you have a change stream called orders that contains records that resemble the following.

{
   "before":{
      "id":2,
      "order_date":"2022-11-16 22:11:09",
      "user_id":2,
      "price":19.95,
      "product_id":3,
      "order_status":"SHIPPED",
      "update_ts":"2022-11-16 23:25:37"
   },
   "after":{
      "id":2,
      "order_date":"2022-11-16 22:11:09",
      "user_id":2,
      "price":19.95,
      "product_id":3,
      "order_status":"DELIVERED",
      "update_ts":"2022-11-17 19:27:54"
   },
   "op":"u",
   "ts_ms":1668713274615
}

{
   "before":null,
   "after":{
      "id":4,
      "order_date":"2022-11-17 08:15:21",
      "user_id":1,
      "price":49.95,
      "product_id":2,
      "order_status":"ACCEPTED",
      "update_ts":"2022-11-15 22:45:43"
   },
   "op":"r",
   "ts_ms":1668705684227
}

{
   "before":null,
   "after":{
      "id":3,
      "order_date":"2022-11-17 08:12:11",
      "user_id":1,
      "price":19.99,
      "product_id":1,
      "order_status":"ACCEPTED",
      "update_ts":"2022-11-15 22:45:29"
   },
   "op":"r",
   "ts_ms":1668705684227
}

{
   "before":null,
   "after":{
      "id":1,
      "order_date":"2022-11-15 10:08:27",
      "user_id":3,
      "price":150,
      "product_id":4,
      "order_status":"DELIVERED",
      "update_ts":"2022-11-15 23:13:48"
   },
   "op":"r",
   "ts_ms":1668705684225
}

{
   "before":null,
   "after":{
      "id":2,
      "order_date":"2022-11-16 22:11:09",
      "user_id":2,
      "price":19.95,
      "product_id":3,
      "order_status":"SHIPPED",
      "update_ts":"2022-11-16 23:25:37"
   },
   "op":"r",
   "ts_ms":1668705684226
}

Products Let’s assume that you have a change stream called products that contains records that resemble the following.

{
   "before":{
      "id":1,
      "name":"Tabletop lamp",
      "description":"An adorable option for brightening a bedside table."
   },
   "after":{
      "id":1,
      "name":"Orb tabletop lamp",
      "description":"An adorable option for brightening a bedside table."
   },
   "op":"u",
   "ts_ms":1668713425104
}

{
   "before":null,
   "after":{
      "id":5,
      "name":"Checkerboard vase",
      "description":"Your favorite picnic print now on a vase."
   },
   "op":"r",
   "ts_ms":1668705685614
}

{
   "before":null,
   "after":{
      "id":1,
      "name":"Tabletop lamp",
      "description":"An adorable option for brightening a bedside table."
   },
   "op":"r",
   "ts_ms":1668705685612
}

{
   "before":null,
   "after":{
      "id":2,
      "name":"New York City candle",
      "description":"The distinctive smell of NYC in a candle."
   },
   "op":"r",
   "ts_ms":1668705685614
}

{
   "before":null,
   "after":{
      "id":3,
      "name":"Forest incense",
      "description":"Assortment of three fragrances: maple, fir, cypress."
   },
   "op":"r",
   "ts_ms":1668705685614
}

{
   "before":null,
   "after":{
      "id":4,
      "name":"Sheepskin rug",
      "description":"Winter ready room accessory"
   },
   "op":"r",
   "ts_ms":1668705685614
}

Users Let’s assume that you have a change stream called users that contains records that resemble the following.

{
   "before":{
      "id":2,
      "name":"Samantha Garcia",
      "zip_code":"94105"
   },
   "after":{
      "id":2,
      "name":"Samantha Garcia",
      "zip_code":"94107"
   },
   "op":"u",
   "ts_ms":1668713723815
}

{
   "before":{
      "id":1,
      "name":"Roger Collins",
      "zip_code":"10002"
   },
   "after":{
      "id":1,
      "name":"Roger Collins",
      "zip_code":"10013"
   },
   "op":"u",
   "ts_ms":1668711427697
}

{
   "before":null,
   "after":{
      "id":2,
      "name":"Samantha Garcia",
      "zip_code":"94105"
   },
   "op":"r",
   "ts_ms":1668705685039
}

{
   "before":null,
   "after":{
      "id":1,
      "name":"Roger Collins",
      "zip_code":"10002"
   },
   "op":"r",
   "ts_ms":1668705685038
}

{
   "before":null,
   "after":{
      "id":3,
      "name":"David Lee",
      "zip_code":"28358"
   },
   "op":"r",
   "ts_ms":1668705685034
}

Pipeline SQL

The following SQL joins all three of these streams together and emits a single output stream named enriched_orders

insert into enriched_orders
select
  O.id as order_id,
  O.order_status as order_status,
  O.update_ts as order_update_ts,
  O.product_id as product_id,
  P.name as product_name,
  O.price as price,
  U.id as user_id,
  U.name as user_name,
  U.zip_code as zip_code
from `orders` O
join `products` P on
  O.product_id = P.id
join `users` U on
  O.user_id = U.id

Results

The resultant enriched_orders stream would look something like the following.

{
   "before":null,
   "after":{
      "order_id":2,
      "order_status":"DELIVERED",
      "order_update_ts":"2022-11-17 19:27:54",
      "product_id":3,
      "product_name":"Forest incense",
      "price":19.95,
      "user_id":2,
      "user_name":"Samantha Garcia",
      "zip_code":"94107"
   },
   "op":"c"
}

{
   "before":{
      "order_id":2,
      "order_status":"DELIVERED",
      "order_update_ts":"2022-11-17 19:27:54",
      "product_id":3,
      "product_name":"Forest incense",
      "price":19.95,
      "user_id":2,
      "user_name":"Samantha Garcia",
      "zip_code":"94105"
   },
   "after":null,
   "op":"d"
}

{
   "before":null,
   "after":{
      "order_id":3,
      "order_status":"ACCEPTED",
      "order_update_ts":"2022-11-15 22:45:29",
      "product_id":1,
      "product_name":"Orb tabletop lamp",
      "price":19.99,
      "user_id":1,
      "user_name":"Roger Collins",
      "zip_code":"10000"
   },
   "op":"c"
}

{
   "before":{
      "order_id":3,
      "order_status":"ACCEPTED",
      "order_update_ts":"2022-11-15 22:45:29",
      "product_id":1,
      "product_name":"Tabletop lamp",
      "price":19.99,
      "user_id":1,
      "user_name":"Roger Collins",
      "zip_code":"10000"
   },
   "after":null,
   "op":"d"
}

{
   "before":null,
   "after":{
      "order_id":2,
      "order_status":"DELIVERED",
      "order_update_ts":"2022-11-17 19:27:54",
      "product_id":3,
      "product_name":"Forest incense",
      "price":120,
      "user_id":2,
      "user_name":"Samantha Garcia",
      "zip_code":"94105"
   },
   "op":"c"
}

{
   "before":{
      "order_id":2,
      "order_status":"SHIPPED",
      "order_update_ts":"2022-11-16 23:25:37",
      "product_id":3,
      "product_name":"Forest incense",
      "price":19.99,
      "user_id":2,
      "user_name":"Samantha Garcia",
      "zip_code":"94105"
   },
   "after":null,
   "op":"d"
}