Windowing Reference
Prior Knowledge
In this page, we'll assume that the reader is already familiar with how to use SQL's "group by" clause to aggregate data in a batch setting.
In batch processing, a SQL query that summarizes a table may require the server to read through every row in the table before returning a result. How is a streaming pipeline able to summarize a series of events that can continue forever? The answer is windowing. Windows create subsets of your streams with a start time and end time. A pipeline can then count or sum the windowed data and output the final totals for each time period.
These windows can be created based on either event time (when the event originally occurred) or processing time (when the pipeline reads the event). Event time is usually preferable. See the time in pipelines section of the pipelines documentation to understand why. To use event time, a watermark must be set on the input stream. See the watermarks section of the streams documentation for more information.
Table-Valued Functions (TVF)
Window aggregations in Decodable SQL look much like standard SQL aggregations. They use functions like count
and sum
in the select
portion of the statement and then define how records should be grouped using the group by
clause. The difference is that the source stream in the from
clause is replaced with a table supplied by a Table-Valued Function:
insert into <output-stream>
select window_start, window_end, count(*) as count
from table(<table-valued-function>(...))
group by window_start, window_end
A TVF transforms a stream into time-bounded buckets of events that can be aggregated. It must be wrapped in the table()
function when used in a pipeline's SQL. While Decodable doesn't support outputting the results of a TVF directly, you can think about them as a version of the input data with the additional fields window_start
and window_end
.
Decodable supports Table-Valued Functions that implement a number of windowing strategies including tumble, hop, and cumulate:
Tumble
Tumble creates adjoining windows of equal size. It counts each input event in exactly one window.
tumble(table <data>, descriptor(<timecol>), <size>)
Argument | Description |
---|---|
data | your input stream name |
timecol | the name of your watermarked event time field |
size | a duration specifying the width of the windows |
note: durations are specified using interval syntax (e.g. interval '5' seconds)
Use Case
A tumble window can be used to summarize customer orders by hour while preserving the total count of items ordered and the total sum of payments received.
-- Tumble use case SQL
insert into orders_hourly
select window_start, window_end, count(*) as orders, sum(payment) as payment
from table(
tumble(table orders, descriptor(`timestamp`), interval '1' hours))
group by window_start, window_end
Hop
Hop creates windows of equal size which may overlap. For this reason, an input event may be present in more than one window. Hopping windows are also known as "sliding windows".
hop(table <data>, descriptor(<timecol>), <slide>, <size>)
Argument | Description |
---|---|
data | your input stream name |
timecol | the name of your watermarked event time field |
slide | a duration specifying the time between the start of sequential hopping windows |
size | a duration specifying the width of the windows |
note: durations are specified using interval syntax (e.g. interval '5' seconds)
Use Case
Hopping windows can be used to create a summary of recent web requests from various IP address ranges.
-- Hop use case SQL
insert into recent_requests
select window_start, window_end, ip_range, count(*) as requests, sum(bytes) as bytes
from table(
hop(table orders, descriptor(`timestamp`), interval '1' minutes, interval '15' minutes))
group by window_start, window_end, ip_range
Cumulate
Cumulate creates cumulative windows of increasing size from the same start time until a maximum window size is reached. At that point, the previous window is closed and a new one is opened. Input events may be counted in more than one window.

cumulate(table <data>, descriptor(<timecol>), <step>, <size>)
Argument | Description |
---|---|
data | your input stream name |
timecol | the name of your watermarked event time field |
step | a duration speficying the difference in window size between the end of sequential cumulating windows |
size | a duration specifying the maximum size of the cumulating windows (must be an integer multiple of step) |
note: durations are specified using interval syntax (e.g. interval '5' seconds)
Use Case
Cumulative windows can be used to provide an updated count of unique daily users every 5 minutes.
-- Cumulate use case SQL
insert into current_daily_active_users
select window_start, window_end, count(distinct user_id) as unique_users
from table(
cumulate(table orders, descriptor(`timestamp`), interval '5' minutes, interval '1' day))
group by window_start, window_end
Key Takeaways:
- A window is a subset of your stream bounded by a start time and an end time.
- To perform aggregation functions like
count
andsum
, you must specify a window to perform the aggregation over using a Table-Valued Function (TVF).- To aggregate using event time, your input stream must have watermarks enabled. See Managing Streams for more on watermarks.
- Decodable currently supports three windowing strategies:
- Tumble
- Hop
- Cumulate
Web UI Example
Here's an example showing how we can use windowing to aggregate logs of http activity using the tumble TVF:



-- SQL text box contents from screenshot provided here for your convenience
insert into http_aggregated
select window_start, window_end, original_path, sum(bytes_rcvd) as bytes_rcvd
from table(
tumble(table http_parsed, descriptor(`timestamp`), interval '5' seconds))
group by window_start, window_end, original_path
CLI Example
Here is the same example using the Decodable CLI:
> decodable stream get cafb4d28
http_parsed
id cafb4d28
description auto created output stream for pipeline [parse_http]
schema
0 timestamp TIMESTAMP(3)
1 method STRING
2 original_path STRING
3 response_code INT
4 bytes_rcvd INT
watermark `timestamp` as `timestamp`
create time 2022-06-08T18:36:20Z
update time 2022-06-08T18:36:53Z
> decodable pipeline preview "select * from http_parsed"
Submitting query... done! (took 4.71s)
Waiting for results...
{"bytes_recv":8515,"method":"DELETE","original_path":"/products/2","response_code":201,"timestamp":"2022-06-07 22:24:16"}
{"bytes_recv":1399,"method":"GET","original_path":"/products/2","response_code":200,"timestamp":"2022-06-07 22:24:17"}
{"bytes_recv":384,"method":"PATCH","original_path":"/products/2","response_code":500,"timestamp":"2022-06-07 22:24:17"}
...
Records received: 10
Time to first record: 21.56s
Total time: 21.56so
> decodable pipeline preview "insert into http_aggregated
select window_start, window_end, original_path, sum(bytes_rcvd) as bytes_rcvd
from table(
tumble(table http_parsed, descriptor(\`timestamp\`), interval '5' seconds))
group by window_start, window_end, original_path"
Submitting query... done! (took 7.51s)
Waiting for results...
{"bytes_rcvd":43233,"original_path":"/","window_end":"2022-06-09 20:17:30","window_start":"2022-06-09 20:17:25"}
{"bytes_rcvd":14714,"original_path":"/products/3","window_end":"2022-06-09 20:17:30","window_start":"2022-06-09 20:17:25"}
{"bytes_rcvd":15108,"original_path":"/users/1","window_end":"2022-06-09 20:17:30","window_start":"2022-06-09 20:17:25"}
{"bytes_rcvd":15938,"original_path":"/products/2","window_end":"2022-06-09 20:17:30","window_start":"2022-06-09 20:17:25"}
{"bytes_rcvd":56841,"original_path":"/products/3","window_end":"2022-06-09 20:17:35","window_start":"2022-06-09 20:17:30"}
{"bytes_rcvd":49969,"original_path":"/products/2","window_end":"2022-06-09 20:17:35","window_start":"2022-06-09 20:17:30"}
{"bytes_rcvd":48778,"original_path":"/","window_end":"2022-06-09 20:17:35","window_start":"2022-06-09 20:17:30"}
{"bytes_rcvd":54100,"original_path":"/users/1","window_end":"2022-06-09 20:17:35","window_start":"2022-06-09 20:17:30"}
{"bytes_rcvd":71614,"original_path":"/products/2","window_end":"2022-06-09 20:17:40","window_start":"2022-06-09 20:17:35"}
{"bytes_rcvd":53903,"original_path":"/","window_end":"2022-06-09 20:17:40","window_start":"2022-06-09 20:17:35"}
Records received: 10
Time to first record: 28.74s
Total time: 36.87s
This page contains copied contents from the Apache Flink® documentation.
See Credits page for details.
Updated 11 months ago