Step 5: Build an aggregation pipeline

In the previous step we used SQL to transform a single field that held an HTTP log message into a set of fields. Let’s now take the output of that pipeline and build another one.

In this next pipeline, we’re going to create a new stream that shows how many log events occurred for each type of http_method in a 30 second time frame. This functionality has real-world uses, such as spotting anomalies in application behavior.

Select a field to use as a watermark

To start with, we’ll configure some essential metadata. Watermarks are a core component of the processing logic in time-based aggregations. Although we won’t delve into the details here, you can explore a Windowing in SQL on watermarks and windowing at your convenience.

  1. From the Streams page, select the http_events stream.

  2. Select the Schema tab.

  3. Next to the event_ts field, select the Watermark icon. Ignore the Max Lateness field; we won’t need it for this tutorial.

    An image showing the icon for selecting a field as a watermark. width="900"
  4. Select Save.

Create a Pipeline

With the watermark configured, we’ll build the pipeline itself now.

  1. Navigate to the Pipelines page, and select New Pipeline.

  2. In the "Choose an input stream" window, select the http_events stream and then select Next.

  3. In the Pipeline Editor, paste the following SQL statement.

    INSERT INTO http_events_count
    SELECT window_start, window_end, http_method, COUNT(*) AS `count`
    FROM TABLE(
        TUMBLE(TABLE http_events, descriptor(`event_ts`), INTERVAL '30' SECONDS))
    GROUP BY window_start, window_end, http_method

    This does the following:

    • Every 30 seconds, returns a count of HTTP method invocations.

    • Groups the results by the window start time, window end time, and the http_method used.

  4. Select Run Preview. You should see a count of how many times a specific method was invoked, within a 30-second time window.

    window_start window_end http_method count

    2023-11-13 19:20:30

    2023-11-13 19:21:00

    DELETE /users/1 HTTP/1.1

    1

    2023-11-13 19:20:30

    2023-11-13 19:21:00

    PUT /products/2 HTTP/2.0

    2

    2023-11-13 19:20:30

    2023-11-13 19:21:00

    GET / HTTP/1.1

    1

  5. Select Next, and give the pipeline a name and description.

  6. Select Create Pipeline. The Overview page for the pipeline opens.

  7. Select Start to enable the pipeline. The Start Options menu opens. For this tutorial, you can ignore these settings and simply select Start.

Once your pipeline is running and processing data from the http_events input stream, you should see output record counts and bytes metrics begin to increase. Note that metrics are reset upon each activation. Stopping and restarting a pipeline resets metrics back to zero.