Using the Decodable CLI tutorial

This guide steps you through an end-to-end example of how to use Decodable to parse and structure Envoy logs in real-time. It should give you enough exposure to the platform that you can get rolling with a wide array of use cases in short order. For your convenience, we’ve created some resources in your account for use with this guide.

We’ll use the CLI in this guide, but you can follow along in the app if you prefer.

Connections

Connections are reusable connections to your data infrastructure. They pipe data from an external system to a Decodable stream or vice versa.

Let’s list the connections in your account to see what has been pre-created for you:

decodable connection list
# Output:
#
# id               name                      connector  type    state    create time           update time
# <connection id>  datagen_envoy_connection  datagen    source  STOPPED  2024-03-05T13:01:34Z  2024-03-05T14:06:54Z

The connection we see here uses the Datagen connector, which generates test data. Let’s get some more information about it:

decodable connection get <connection id>
# Output:
#
# datagen_envoy_connection
#   id                       <connection id>
#   description              A source connection that generates envoy-style logs
#   connector                datagen
#   type                     source
#   stream id                <stream id>
#   schema
#     0  value                 STRING
#   properties
#     data.type                envoy
#   target state             STOPPED
#   actual state             STOPPED
#   create time              2021-11-03T16:14:06Z
#   update time              2021-11-05T04:08:41Z

Here we see that the type is 'source', meaning this connector feeds new data from an external system into a stream. Let’s try activating it to get some data flowing:

decodable connection activate <connection id>
# Output:
#
# datagen_envoy_connection
#   id                       <connection id>
#   description              A source connection that generates envoy-style logs
#   connector                datagen
#   type                     source
#   stream id                <stream id>
#   schema
#     0  value                 STRING
#   properties
#     data.type                envoy
#   target state             RUNNING
#   actual state             STARTING
#   create time              2021-11-03T16:14:06Z
#   update time              2021-11-05T04:16:36Z

If you wait a moment and run a get on that connection again, you should see the actual state move from 'STARTING' to 'RUNNING'.

Let’s get some more information about the stream that we just output to:

decodable stream get <stream id>
# Output:
#
# envoy_raw
#   id                       <stream id>
#   description              A stream of records in envoy format
#   schema
#     0  value                 STRING
#   create time              2021-11-03T16:14:06Z
#   update time              2021-11-03T16:14:06Z

Now that we’ve some data in this stream, we can write a preview pipeline to take a look. We’ll describe preview pipelines in more detail later, but a simple SELECT query referencing the stream’s name will do the trick:

decodable pipeline preview "SELECT * from envoy_raw"
# Output:
#
# Submitting query... done! (took 8.79s)
# Waiting for results...
# {"value":"[2021-11-05T04:48:03Z] \"GET /products/3 HTTP/1.1\" 500 URX 2001 6345 82 32 \"-\" \"Mozilla/5.0 (Windows NT 10.0; Win64; x64)\" \"5c70092a-ed05-4d0c-9d2b-f7bf361ad17e\" \"localhost\" \"192.168.0.11:443\""}
# {"value":"[2021-11-05T04:48:03Z] \"DELETE /users/1 HTTP/2.0\" 200 NC 4044 3860 41 39 \"-\" \"Mozilla/5.0 (Linux; Android 10) \" \"5ca9fd79-afee-44db-9352-2ee9949dc6df\" \"aws.gateway\" \"10.0.0.1\""}
# {"value":"[2021-11-05T04:48:03Z] \"DELETE /products/2 HTTP/2.0\" 500 UH 3826 8831 14 33 \"-\" \"Mozilla/5.0 (Linux; Android 10) \" \"5f0ae73d-c76b-471f-9458-3efc45128509\" \"aws.gateway\" \"10.0.0.1\""}
# {"value":"[2021-11-05T04:48:03Z] \"POST /users/1 HTTP/1.1\" 500 - 8303 6274 0 15 \"-\" \"curl/7.64.1\" \"b45ce679-1cdd-4de8-965f-b9a4d821b2bd\" \"locations\" \"127.0.0.1:8080\""}
# {"value":"[2021-11-05T04:48:03Z] \"PATCH /products/2 HTTP/1.1\" 422 URX 8246 2097 68 84 \"-\" \"Mozilla/5.0 (Linux; Android 10) \" \"2508af73-93c8-4ab8-bac2-8f6aab2b0292\" \"localhost\" \"10.0.0.2\""}
# {"value":"[2021-11-05T04:48:03Z] \"DELETE /products/3 HTTP/2.0\" 400 UF 3386 659 9 83 \"-\" \"Chrome/90.0.4430.212 Safari/537.36\" \"00eeed10-c956-49bf-a56c-a2511cd033fb\" \"auth.default.svc.cluster.local\" \"192.168.0.12\""}
# {"value":"[2021-11-05T04:48:03Z] \"PATCH /users/1 HTTP/2.0\" 422 UO 4730 6425 62 37 \"-\" \"Mozilla/5.0 (Linux; Android 10) \" \"97964510-ef91-40bc-b3c6-d094c10618e4\" \"aws.gateway\" \"192.168.0.11:443\""}
# {"value":"[2021-11-05T04:48:03Z] \"DELETE /products/3 HTTP/1.1\" 422 UO 5857 2137 17 47 \"-\" \"curl/7.64.1\" \"3b539fe4-7b5c-4593-b2d2-13fe621a794d\" \"envoy.app.mesh\" \"10.0.0.2:443\""}
# {"value":"[2021-11-05T04:48:04Z] \"PUT /products/3 HTTP/2.0\" 201 UO 8430 4054 53 51 \"-\" \"Mozilla/5.0 (Windows NT 10.0; Win64; x64)\" \"112ea83e-df10-4f13-aae2-f1d651f24221\" \"locations\" \"192.168.0.11:443\""}
# {"value":"[2021-11-05T04:48:04Z] \"DELETE /products/2 HTTP/2.0\" 201 UO 846 3398 98 42 \"-\" \"Mobile Safari/537.36\" \"9bdc6885-628d-4b98-8018-553aa5b12704\" \"locations\" \"10.0.0.1\""}
# Records received:      10
# Time to first record:  26.85s
# Total time:            28.06s

Pipelines

Once we can see our raw data flowing, we can try to apply some structure to it with a pipeline.
A pipeline is a streaming SQL query that processes data from one (or more) input streams and writes the results to an output stream. First, we’ll need to make an output stream for the pipeline:

decodable stream create \
  --name http_events                                                     \
  --description "Parsed Envoy proxy logs from production"                \
  --watermark "timestamp=\`timestamp\` - INTERVAL '0.001' SECOND"        \
  --field timestamp="timestamp(3)"                                       \
  --field method=string                                                  \
  --field original_path=string                                           \
  --field protocol=string                                                \
  --field response_code=int                                              \
  --field response_flags=string                                          \
  --field bytes_recv=int                                                 \
  --field bytes_sent=int                                                 \
  --field duration=int                                                   \
  --field upstream_svc_time=int                                          \
  --field x_forwarded_for=string                                         \
  --field useragent=string                                               \
  --field request_id=string                                              \
  --field authority=string                                               \
  --field upstream_host=string
# Output:
#
# Created stream http_events (06064c97)

The watermark argument specifies that the "timestamp" field represents the event time and we’ll allow 1 millisecond for late arriving data.

When we create a pipeline we can supply the SQL inline like we did for our preview earlier, or we can store it in a separate file. Let’s put it in a file called parse_envoy_logs.sql:

-- sql parse_envoy_logs.sql
-- Extract Envoy fields from a map as top level fields and insert them into the
-- http_events stream.
INSERT INTO http_events
SELECT
  TO_TIMESTAMP(CAST(envoy['timestamp'] AS STRING), 'yyyy-MM-dd''T''HH:mm:ss''Z''') AS `timestamp`,
  CAST(envoy['method']            AS STRING) AS `method`,
  CAST(envoy['original_path']     AS STRING) AS original_path,
  CAST(envoy['protocol']          AS STRING) AS protocol,
  CAST(envoy['response_code']     AS INT)    AS response_code,
  CAST(envoy['response_flags']    AS STRING) AS response_flags,
  CAST(envoy['bytes_rcvd']        AS INT)    AS bytes_rcvd,
  CAST(envoy['bytes_sent']        AS INT)    AS bytes_sent,
  CAST(envoy['duration']          AS INT)    AS duration,
  CAST(envoy['upstream_svc_time'] AS INT)    AS upstream_svc_time,
  CAST(envoy['x_forwarded_for']   AS STRING) AS x_forwarded_for,
  CAST(envoy['useragent']         AS STRING) AS useragent,
  CAST(envoy['request_id']        AS STRING) AS request_id,
  CAST(envoy['authority']         AS STRING) AS authority,
  CAST(envoy['upstream_host']     AS STRING) AS upstream_host
FROM (
    -- Match and parse Envoy records in the value field of the envoy_raw stream.
    -- grok() produces a map<field name, value> we call envoy.
    SELECT
      grok(
        `value`,
        '\[%{TIMESTAMP_ISO8601:timestamp}\] "%{DATA:method} %{DATA:original_path} %{DATA:protocol}" %{DATA:response_code} %{DATA:response_flags} %{NUMBER:bytes_rcvd} %{NUMBER:bytes_sent} %{NUMBER:duration} %{DATA:upstream_svc_time} "%{DATA:x_forwarded_for}" "%{DATA:useragent}" "%{DATA:request_id}" "%{DATA:authority}" "%{DATA:upstream_host}"'
      ) AS envoy
    FROM envoy_raw
)

The create command normally takes the SQL statement as an argument, but we can replace it with - (a single dash) which causes the command to read from standard input. We use this feature to read SQL from the file we just created. Feel free to use whatever works best for you!

decodable pipeline create --name parse_envoy_logs             \
  --description "Parse and structure Envoy logs for analysis" \
  - < parse_envoy_logs.sql
# Output:
#
# Created pipeline parse_envoy_logs (f2de95de)

Using the pipeline id returned, we can get the pipeline definition back from Decodable to make sure it looks right before we activate it.

decodable pipeline get <pipeline id>
# Output:
#
# parse_envoy_logs
#  id                       <pipeline id>
#  version                  1
#  is latest                true
#  target state             STOPPED
#  actual state             STOPPED
#  description              Parse and structure Envoy logs for analysis
#  create time              2021-08-06 20:03:41.42 +0000 +0000
#
# -- Extract Envoy fields from a map as top level fields and insert them into the
# -- http_events stream.
# INSERT INTO http_events
# SELECT
#   CAST(envoy['timestamp']         AS STRING) AS `timestamp`,
#   CAST(envoy['method']            AS STRING) AS `method`,
#   CAST(envoy['original_path']     AS STRING) AS original_path,
#   CAST(envoy['protocol']          AS STRING) AS `protocol`,
#   CAST(envoy['response_code']     AS INT)    AS response_code,
#   CAST(envoy['response_flags']    AS STRING) AS response_flags,
#   CAST(envoy['bytes_rcvd']        AS INT)    AS bytes_rcvd,
#   CAST(envoy['bytes_sent']        AS INT)    AS bytes_sent,
#   CAST(envoy['duration']          AS INT)    AS `duration`,
#   CAST(envoy['upstream_svc_time'] AS INT)    AS upstream_svc_time,
#   CAST(envoy['x_forwarded_for']   AS STRING) AS x_forwarded_for,
#   CAST(envoy['useragent']         AS STRING) AS useragent,
#   CAST(envoy['request_id']        AS STRING) AS request_id,
#   CAST(envoy['authority']         AS STRING) AS authority,
#   CAST(envoy['upstream_host']     AS STRING) AS upstream_host
# FROM (
#     -- Match and parse Envoy records in the value field of the envoy_raw stream.
#     -- grok() produces a map<field name, value> we call envoy.
#     SELECT
#       grok(
#         `value`,
#         '\[%{TIMESTAMP_ISO8601:timestamp}\] "%{DATA:method} %{DATA:original_path} %{DATA:protocol}" %{DATA:response_code} %{DATA:response_flags} %{NUMBER:bytes_rcvd} %{NUMBER:bytes_sent} %{NUMBER:duration} %{DATA:upstream_svc_time} "%{DATA:x_forwarded_for}" "%{DATA:useragent}" "%{DATA:request_id}" "%{DATA:authority}" "%{DATA:upstream_host}"'
#       ) AS envoy
#     FROM envoy_raw
# )

Activate the Pipeline

Like connections, pipelines must be activated in order to start the flow of data. Let’s activate our pipeline:

decodable pipeline activate <pipeline id>
# Output:
#
# id         version    target state
# <id>       1          RUNNING

Pipeline activation involves provisioning fault-tolerant infrastructure within the Decodable platform and can take up to 30 seconds to begin processing data. Feel free to make a few get pipeline calls until you see the actual state move to 'RUNNING'.

See your data

Next, let’s preview our pipeline to make sure the SQL is processing the data as expected.

decodable pipeline preview "select * from http_events"

Previews display up to 30 records from the data stream and can be used to see how your pipeline changes the incoming data. Previews run until 30 records are seen or until 2 minutes have passed, whichever comes first.

Submit any SQL using the same grammar as you would for a pipeline, but the output will be sent to your command line output instead. For example, here we select the HTTP events from the stream our connection pointed at before.

Previews are short-lived, lighter weight, and more responsive than full pipelines, making them perfect for iterative development on pipelines or just to see the structure of your input and output streams.

Cleanup

Active connections and pipelines both consume resources while they’re running. If you’re not using them, it’s best to deactivate them.

Use the decodable connection deactivate command to deactivate the Datagen connection:

decodable connection deactivate <connection id>
# Output:
#
# datagen_envoy_connection
#   id                       <connection id>
#   description              A source connection that generates envoy-style logs
#   connector                datagen
#   type                     source
#   stream id                <stream id>
#   schema
#     0  value                 STRING
#   properties
#     data.type                envoy
#   target state             STOPPED
#   actual state             STOPPED
#   create time              2021-11-03T16:14:06Z
#   update time              2021-11-05T04:16:36Z

Similarly, use decodable pipeline deactivate to deactivate the pipeline:

decodable pipeline deactivate <pipeline id>
# Output:
#
# id         version    target state
# <id>       1          STOPPED