Step 3: Create a pipeline to apply a schema to the data

Currently, the HTTP events are contained within a single field called value. Our next step is to construct a pipeline that applies a structured schema to the data, making it more amenable to further processing.

A pipeline is a set of data processing instructions written in SQL or Java. In this tutorial we are going to create a SQL-based pipeline. When you create a pipeline, you write a streaming SQL query that specifies what stream(s) of data to process, how to process it, and what stream to send that data to.

  1. From the envoy_raw Overview page, select the …​ icon and then Create a pipeline.

    An image showing where to find the Create a pipeline option. width="900"
  2. In the Pipeline Editor, remove the insert into <stream_name> statement so that you’re just left with select * from envoy_raw.

    An image showing the next step in modifying a pipeline’s SQL statement. width="900"

    Note: When you create a pipeline, Decodable constructs a simple INSERT query and automatically generates an output stream name for your pipeline in the format stream_<digits>. SQL pipelines are always insert into ... select ... from ... queries. The insert into clause specifies the pipeline’s output stream, while the select ... from ... specifies the input stream(s) and any transformations.

  3. Select Run Preview to preview the data flowing through the pipeline. Since we have not applied any transformations to this data yet, it will look similar to the data you previewed in Step 4: Activate your pipeline and preview the results.

    [2023-11-02T16:37:15Z] "PUT /products/2 HTTP/2.0" 400 URX 3065 9077 36 72 "-" "Mozilla/5.0 (Linux; Android 10) " "5f84cc54-fa0c-4d18-8133-6d53df4393e9" "auth.default.svc.cluster.local" "10.0.0.2"
    [2023-11-02T16:37:14Z] "PUT /products/2 HTTP/1.1" 400 NC 7196 492 50 87 "-" "Chrome/90.0.4430.212 Safari/537.36" "414dd05d-a5cf-4e7e-bc4d-6ecd0628dc59" "localhost" "10.0.0.2:443"
    [2023-11-02T16:37:14Z] "PUT /users/1 HTTP/2.0" 422 NR 2762 9035 42 98 "-" "curl/7.64.1" "4b08d25b-74a7-4385-b654-21de6928b8dd" "auth.default.svc.cluster.local" "192.168.1.12:3000"
    [2023-11-02T16:37:13Z] "DELETE /products/2 HTTP/1.1" 500 URX 1021 7889 39 56 "-" "curl/7.64.1" "0cacd066-dd24-4df4-8559-74afa0498c07" "locations" "192.168.1.12:3000"

    You’ll notice there are several interesting bits of information in the value field including the HTTP method used, the HTTP response, and a timestamp.

    In the next few steps, we’ll build up a SQL statement in stages to extract these fields of interest into a proper schema. The resulting new stream can be used in subsequent pipelines, or written to a data store using a connector.

  4. We’ll start by using the grok() function to extract fields based on a data pattern. The resulting fields, timestamp, method, and http_status will be stored in a top-level field named envoy_fields. Additionally, the original value field is retained for reference and visual verification of the grok results.

    1. Paste the following SQL statement into the SQL pane.

      SELECT grok(`value`,'\[%{TIMESTAMP_ISO8601:timestamp}\] "%{DATA:method}" %{INT:http_status}') AS envoy_fields
           , `value`
      FROM   envoy_raw

      You can learn more about the syntax of grok here and here.

    2. Select Run Preview and view the results in the Preview pane. They should look something like the following:

      `envoy_fields ` value

      {"method":"POST /users/1 HTTP/1.1","http_status":"400", "timestamp":"2023-11-02T18:52:45Z"}

      [2023-11-02T18:52:45Z] "POST /users/1 HTTP/1.1" 400 UH 8843 5634 47 73 "-" "Mozilla/5.0 (Windows NT 10.0; Win64; x64)" "a9bb2512-2678-40e6-aea1-c7736982719f" "localhost" "10.0.0.2:443"

      {"method":"DELETE /products/3 HTTP/2.0","http_status":"422", "timestamp":"2023-11-02T18:52:45Z"}

      [2023-11-02T18:52:45Z] "DELETE /products/3 HTTP/2.0" 422 UH 2855 1977 41 61 "-" "Chrome/90.0.4430.212 Safari/537.36" "ac3c184e-69be-4412-9749-324271c35f17" "localhost" "10.0.0.2:443"

      {"method":"PUT /users/1 HTTP/2.0","http_status":"200", "timestamp":"2023-11-02T18:52:44Z"}

      [2023-11-02T18:52:44Z] "PUT /users/1 HTTP/2.0" 200 URX 6984 5074 3 25 "-" "AppleWebKit/537.36 (KHTML, like Gecko)" "7da835d5-18dd-4be8-9275-3bacbd389ed7" "localhost" "10.0.0.1"

  5. Now let’s break each of the elements into their own fields.

    1. Paste the following SQL statement into the SQL pane.

      SELECT  envoy_fields['timestamp']   AS event_ts,
              envoy_fields['method']      AS http_method,
              envoy_fields['http_status'] AS http_status
      FROM (SELECT grok(`value`,'\[%{TIMESTAMP_ISO8601:timestamp}\] "%{DATA:method}" %{INT:http_status}') AS envoy_fields
              FROM envoy_raw)
    2. Select Run Preview and view the results in the Preview pane.

      event_ts http_method http_status

      2023-11-02T18:52:45Z

      POST /products/2 HTTP/2.0

      400

      2023-11-02T18:52:45Z

      DELETE /users/1 HTTP/1.1

      422

      2023-11-02T18:52:44Z

      PUT /products/3 HTTP/1.1

      200

  6. Finally, we’ll make sure that our new fields have the correct data types. Currently, the timestamp field is a string, but for better data management, we’ll store it as a TIMESTAMP type. Additionally, we’ll use an INT data type for the http_status field.
    See Decodable data types for more information about Decodable data types.

    1. Paste the following SQL statement in the SQL pane.

      SELECT  TO_TIMESTAMP(envoy_fields['timestamp'], 'yyyy-MM-dd''T''HH:mm:ss''Z''') AS event_ts,
              envoy_fields['method']                                                  AS http_method,
              CAST(envoy_fields['http_status'] AS INT)                                AS http_status
      FROM (SELECT grok(`value`,'\[%{TIMESTAMP_ISO8601:timestamp}\] "%{DATA:method}" %{INT:http_status}') AS envoy_fields
              FROM envoy_raw)
    2. Select Run Preview and view the results in the Preview pane. You’ll notice that it looks basically the same as the previous step—​the difference is the underlying datatype of the timestamp field.

  7. Now that we’ve defined the transformations that we want to apply, let’s make it official! By using INSERT INTO at the beginning of the query, we are defining the stream that we want to write the output of the SELECT operation.

    1. Paste the following SQL statement into the SQL pane. Here we are writing the output to a stream called http_events.

      INSERT INTO http_events
      
      SELECT  TO_TIMESTAMP(envoy_fields['timestamp'], 'yyyy-MM-dd''T''HH:mm:ss''Z''') AS event_ts,
              envoy_fields['method']                                                  AS http_method,
              CAST(envoy_fields['http_status'] AS INT)                                AS http_status
      FROM (SELECT grok(`value`,'\[%{TIMESTAMP_ISO8601:timestamp}\] "%{DATA:method}" %{INT:http_status}') AS envoy_fields
              FROM envoy_raw)
  8. Select Next, and the Configure the Output Stream dialog opens. Take note that the output schema displays the correct fields, data types, and the name of the stream you defined in your SQL statement!

    If you want to, give the stream a description of your choosing.

    An image showing the next step in creating a pipeline
  9. Select Create Stream.

  10. Select Next, and give the pipeline a name and description.

  11. Select Create Pipeline.

Congratulations on creating your first pipeline! Continue to the next part of the Quick Start to preview and activate your pipeline.