Time-based lookup joins

A time-based lookup join is a type of join that joins two data streams based on the temporal (time-based) relationship between their records. Specifically, it joins records from one stream with records from another stream based on a time window or interval. This type of join is sometimes referred to as a temporal join.

In Decodable, time-based lookup joins can be performed based on either event time (historical lookup join) or processing time (latest value lookup join).

  • Historical lookup join: In historical lookup joins, the join is performed based on the timestamps in each record that represent when the record’s event took place. In other words, historical lookup joins are joins based on event time. Historical lookup joins enable you to lookup historical values, so you can match the timestamp of a record in one stream with the timestamp of another record in another stream.

  • Latest value lookup join: In latest value lookup joins, for each incoming record from an append stream, Decodable looks up the latest value from a change stream. This means that the join operation is performed based on the system time at which the records are processed, which can be different from the time at which the record’s event actually occurred.

Example

In this example, we’ll use a historical lookup join to join two streams of data. One stream, an append stream called taxi_fares, contains transaction events with varying currencies. The other stream, called exchange_rates, contains a temporal table from a change stream with exchange rates that fluctuate over time. You want to join these two streams together so you have one change stream that contains the transaction with the currency exchange rate in effect at the time of the transaction.

Table 1. taxi_fares
fare_time price currency

9:50

2

USD

10:15

1

EUR

10:30

100

YEN

11:15

3

USD

11:50

5

EUR

Table 2. exchange_rates
rate_time currency rate

9:00

YEN

1

9:15

EUR

141

9:30

USD

136

10:45

USD

140

11:30

EUR

143

You can join these two streams and output a single stream of converted fares.

Table 3. converted_fares
fare_time converted_price

9:50

272

10:15

141

10:30

100

11:15

420

11:50

715

An example of what the pipeline looks like:

INSERT INTO converted_fares SELECT
    fare_time,
    (price * rate) AS converted_price
FROM taxi_fares
JOIN exchange_rates FOR SYSTEM_TIME AS OF fare_time
ON taxi_fares.currency = exchange_rates.currency

The following diagram illustrates how each incoming transaction is matched with the correct exchange rate at the time of the transaction.

This diagram illustrates the example where a lookup is performed to a stream containing exchange rates. The result is a unified stream containing accurate exchange rates corresponding to the transaction times.