Windowing in SQL In batch processing, a SQL query that summarizes a table might require the server to read through every row in the table before returning a result. How’s a streaming pipeline able to summarize a series of events that can continue forever? The answer is windowing. Windows create subsets of your streams with a start time and end time. A pipeline can then count or sum the windowed data and output the final totals for each time period. These windows can be created based on either event time (when the event originally occurred) or processing time (when the pipeline reads the event). Event time is usually preferable. See the time in pipelines section of the pipelines documentation to understand why. To use event time, a watermark must be set on the input stream. See watermarks section of the streams documentation for more information. This page assumes that you are already familiar with how to use the GROUP BY clause in SQL to aggregate data in a batch setting. Table-Valued Functions Window aggregations in Decodable SQL look much like standard SQL aggregations. They use functions like count and sum in the select portion of the statement and then define how records should be grouped using the group by clause. The difference is that the source stream in the from clause is replaced with a table supplied by a Table-Valued Function: insert into <output-stream> select window_start, window_end, count(*) as count from table(<table-valued-function>(...)) group by window_start, window_end A Table-Valued Function (TVF) transforms a stream into time-bounded buckets of events that can be aggregated. It must be wrapped in the table() function when used in a pipeline’s SQL. While Decodable doesn’t support outputting the results of a TVF directly, you can think about them as a version of the input data with the additional fields window_start and window_end. Decodable supports Table-Valued Functions that implement a number of windowing strategies including tumble, hop, and cumulate. Tumble Tumble creates adjoining windows of equal size. It counts each input event in exactly one window. Tumbling windows are useful for reporting where you want events to belong to a single window, like taking the aggregate of customer orders in the last hour. tumble(table <data>, descriptor(<timecol>), <size>) Argument Description data your input stream name timecol the name of your watermarked event time field size a duration specifying the width of the windows The duration that a window should be open is specified using interval syntax. For example interval '5' seconds. -- Tumble use case SQL insert into orders_hourly select window_start, window_end, count(*) as orders, sum(payment) as payment from table( tumble(table orders, descriptor(`timestamp`), interval '1' hours)) group by window_start, window_end Hop Hop creates windows of equal size which can overlap. For this reason, an input event can be present in more than one window. Hopping windows are also known as "sliding windows." A classic example of a hopping window implementation is to summarize recent web requests from various IP address ranges for 1-minute windows in the last 15 minutes. hop(table <data>, descriptor(<timecol>), <slide>, <size>) Argument Description data your input stream name timecol the name of your watermarked event time field slide a duration specifying the time between the start of sequential hopping windows size a duration specifying the width of the windows The duration that a window should be open is specified using interval syntax. For example interval '5' seconds. -- Hop use case SQL insert into recent_requests select window_start, window_end, ip_range, count(*) as requests, sum(bytes) as bytes from table( hop(table orders, descriptor(`timestamp`), interval '1' minutes, interval '15' minutes)) group by window_start, window_end, ip_range Cumulate Cumulate creates cumulative windows of increasing size from the same start time until a maximum window size is reached. At that point, the previous window is closed and a new one is opened. Input events can be counted in more than one window. A classic example of a cumulate window implementation is to provide an updated count of unique daily users every 5 minutes. cumulate(table <data>, descriptor(<timecol>), <step>, <size>) Argument Description data your input stream name timecol the name of your watermarked event time field step a duration specifying the difference in window size between the end of sequential cumulating windows size a duration specifying the maximum size of the cumulating windows (must be an integer multiple of step) The duration that a window should be open is specified using interval syntax. For example interval '5' seconds. -- Cumulate use case SQL insert into current_daily_active_users select window_start, window_end, count(distinct user_id) as unique_users from table( cumulate(table orders, descriptor(`timestamp`), interval '5' minutes, interval '1' day)) group by window_start, window_end Key takeaways A window is a subset of your stream bounded by a start time and an end time. To perform aggregation functions like count and sum, you must specify a window to perform the aggregation over using a Table-Valued Function (TVF). To aggregate using event time, your input stream must have watermarks enabled. See here for more on watermarks. Decodable currently supports three windowing strategies: Tumble Hop Cumulate For an example on building an aggregation pipeline, see Step 5: Build an aggregation pipeline in the Quickstart. 📘 This page contains copied contents from the Apache Flink® documentation. See Credits page for details.