Using the Decodable CLI tutorial This guide steps you through an end-to-end example of how to use Decodable to parse and structure Envoy logs in real-time. It should give you enough exposure to the platform that you can get rolling with a wide array of use cases in short order. For your convenience, we’ve created some resources in your account for use with this guide. We’ll use the CLI in this guide, but you can follow along in the app if you prefer. Connections Connections are reusable connections to your data infrastructure. They pipe data from an external system to a Decodable stream or vice versa. Let’s list the connections in your account to see what has been pre-created for you: decodable connection list # Output: # # id name connector type state create time update time # <connection id> datagen_envoy_connection datagen source STOPPED 2024-03-05T13:01:34Z 2024-03-05T14:06:54Z The connection we see here uses the Datagen connector, which generates test data. Let’s get some more information about it: decodable connection get <connection id> # Output: # # datagen_envoy_connection # id <connection id> # description A source connection that generates envoy-style logs # connector datagen # type source # stream id <stream id> # fields # 0 value STRING # primary key fields - # properties # data.type envoy # target state STOPPED # actual state STOPPED # requested tasks 1 # actual tasks 0 # requested task size M # actual task size M # create time 2021-11-03T16:14:06Z # update time 2021-11-05T04:08:41Z # last runtime error - # metrics - Here we see that the type is source, meaning this connector feeds new data from an external system into a stream. Let’s try activating it to get some data flowing: decodable connection activate <connection id> # Output: # # datagen_envoy_connection # id <connection id> # description A source connection that generates envoy-style logs # connector datagen # type source # stream id <stream id> # fields # 0 value STRING # primary key fields - # properties # data.type envoy # target state RUNNING # actual state STARTING # requested tasks 1 # actual tasks 1 # requested task size M # actual task size M # create time 2021-11-03T16:14:06Z # update time 2021-11-05T04:16:36Z # last runtime error - # metrics - If you wait a moment and run a decodable connection get <connection id> on that connection, you should see the actual state move from STARTING to RUNNING. Let’s get some more information about the stream that we just output to: decodable stream get <stream id> # Output: # # envoy_raw # id <stream id> # description A stream of records in envoy format # type APPEND # fields # 0 value STRING # partition key fields - # primary key fields - # watermarks - # properties # compaction.enable false # partition.count 1 # properties.compression.type zstd # create time 2021-11-03T16:14:06Z # update time 2021-11-03T16:14:06Z Now that we’ve some data in this stream, we can write a preview pipeline to take a look. We’ll describe preview pipelines in more detail later, but a simple SELECT query referencing the stream’s name envoy_raw will do the trick for now. By default, it will retrieve up to 30 records from the stream. decodable pipeline preview "SELECT * from envoy_raw" # Output: # Submitting query... done! (took 3.74s) # Waiting for results... # {"value":"[2025-08-13T12:47:31Z] \"GET / HTTP/2.0\" 200 NC 4570 1499 95 38 \"-\" \"AppleWebKit/537.36 (KHTML, like Gecko)\" \"8434e166-4448-408d-b9e8-6967ce25ec92\" \"envoy.app.mesh\" \"192.168.1.12:3000\""} # {"value":"[2025-08-13T12:47:31Z] \"DELETE / HTTP/2.0\" 400 NC 5325 4807 53 64 \"-\" \"AppleWebKit/537.36 (KHTML, like Gecko)\" \"dc1acb2c-e79e-4be1-b407-b3e750a85e7a\" \"localhost\" \"10.0.0.2\""} # {"value":"[2025-08-13T12:47:32Z] \"POST /products/3 HTTP/1.1\" 400 NR 5170 1098 17 92 \"-\" \"Mobile Safari/537.36\" \"344d2413-5e9c-4e01-ba78-bdf3cf7dc16a\" \"auth.default.svc.cluster.local\" \"10.0.0.2\""} # [...] # {"value":"[2025-08-13T12:47:44Z] \"GET / HTTP/1.1\" 200 URX 4487 6780 48 0 \"-\" \"AppleWebKit/537.36 (KHTML, like Gecko)\" \"804805eb-36e2-4470-93b6-e1f8d86a2860\" \"envoy.app.mesh\" \"192.168.1.12:3000\""} # {"value":"[2025-08-13T12:47:45Z] \"POST /products/3 HTTP/2.0\" 422 UH 6614 584 64 17 \"-\" \"Mobile Safari/537.36\" \"8ab10b43-cd7b-45f9-a4d5-637901427e6b\" \"aws.gateway\" \"192.168.0.11:443\""} # {"value":"[2025-08-13T12:47:45Z] \"POST /products/2 HTTP/1.1\" 422 NC 2682 8774 56 45 \"-\" \"AppleWebKit/537.36 (KHTML, like Gecko)\" \"4ce96390-c018-40ad-aec6-82eca8888e18\" \"envoy.app.mesh\" \"192.168.0.11:443\""} # Records received: 30 # Time to first record: 6.40s # Total time: 20.52s Pipelines Once we can see our raw data flowing, we can try to apply some structure to it with a pipeline. A pipeline is a streaming SQL query that processes data from one (or more) input streams and writes the results to an output stream. First, we’ll need to make an output stream for the pipeline: decodable stream create \ --name http_events \ --description "Parsed Envoy proxy logs from production" \ --watermark "timestamp=\`timestamp\` - INTERVAL '0.001' SECOND" \ --field timestamp="timestamp(3)" \ --field method=string \ --field original_path=string \ --field protocol=string \ --field response_code=int \ --field response_flags=string \ --field bytes_recv=int \ --field bytes_sent=int \ --field duration=int \ --field upstream_svc_time=int \ --field x_forwarded_for=string \ --field useragent=string \ --field request_id=string \ --field authority=string \ --field upstream_host=string # Output: # # Created stream http_events (<stream id>) The watermark argument specifies that the "timestamp" field represents the event time and we’ll allow 1 millisecond for late arriving data. When we create a pipeline we can supply the SQL inline like we did for our preview earlier, or we can store it in a separate file. Let’s put it in a file called parse_envoy_logs.sql: -- sql parse_envoy_logs.sql -- Extract Envoy fields from a map as top level fields and insert them into the -- http_events stream. INSERT INTO http_events SELECT TO_TIMESTAMP(CAST(envoy['timestamp'] AS STRING), 'yyyy-MM-dd''T''HH:mm:ss''Z''') AS `timestamp`, CAST(envoy['method'] AS STRING) AS `method`, CAST(envoy['original_path'] AS STRING) AS original_path, CAST(envoy['protocol'] AS STRING) AS protocol, CAST(envoy['response_code'] AS INT) AS response_code, CAST(envoy['response_flags'] AS STRING) AS response_flags, CAST(envoy['bytes_rcvd'] AS INT) AS bytes_rcvd, CAST(envoy['bytes_sent'] AS INT) AS bytes_sent, CAST(envoy['duration'] AS INT) AS duration, CAST(envoy['upstream_svc_time'] AS INT) AS upstream_svc_time, CAST(envoy['x_forwarded_for'] AS STRING) AS x_forwarded_for, CAST(envoy['useragent'] AS STRING) AS useragent, CAST(envoy['request_id'] AS STRING) AS request_id, CAST(envoy['authority'] AS STRING) AS authority, CAST(envoy['upstream_host'] AS STRING) AS upstream_host FROM ( -- Match and parse Envoy records in the value field of the envoy_raw stream. -- grok() produces a map<field name, value> we call envoy. SELECT grok( `value`, '\[%{TIMESTAMP_ISO8601:timestamp}\] "%{DATA:method} %{DATA:original_path} %{DATA:protocol}" %{DATA:response_code} %{DATA:response_flags} %{NUMBER:bytes_rcvd} %{NUMBER:bytes_sent} %{NUMBER:duration} %{DATA:upstream_svc_time} "%{DATA:x_forwarded_for}" "%{DATA:useragent}" "%{DATA:request_id}" "%{DATA:authority}" "%{DATA:upstream_host}"' ) AS envoy FROM envoy_raw ) The create command normally takes the SQL statement as an argument, but we can replace it with - (a single dash) which causes the command to read from standard input. We use this feature to read SQL from the file we just created. Feel free to use whatever works best for you! decodable pipeline create --name parse_envoy_logs \ --description "Parse and structure Envoy logs for analysis" \ - < parse_envoy_logs.sql # Output: # # Created pipeline parse_envoy_logs (<pipeline id>) Using the pipeline id returned, we can get the pipeline definition back from Decodable to make sure it looks right before we activate it. decodable pipeline get <pipeline id> # Output: # # parse_envoy_logs # id <pipeline id> # type SQL # version 1 # active version - # latest version 1 # is latest true # target state STOPPED # actual state STOPPED # requested tasks 1 # actual tasks 0 # requested task size - # actual task size - # description Parse and structure Envoy logs for analysis # create time 2025-08-13T13:08:50Z # update time 2025-08-13T13:08:50Z # last activated time - # last runtime error - # input metrics - # output metrics - # properties - # scheduled snapshots # enabled false # # -- sql parse_envoy_logs.sql # -- Extract Envoy fields from a map as top level fields and insert them into the # -- http_events stream. # INSERT INTO http_events # SELECT # TO_TIMESTAMP(CAST(envoy['timestamp'] AS STRING), 'yyyy-MM-dd''T''HH:mm:ss''Z''') AS `timestamp`, # CAST(envoy['method'] AS STRING) AS `method`, # CAST(envoy['original_path'] AS STRING) AS original_path, # CAST(envoy['protocol'] AS STRING) AS protocol, # CAST(envoy['response_code'] AS INT) AS response_code, # CAST(envoy['response_flags'] AS STRING) AS response_flags, # CAST(envoy['bytes_rcvd'] AS INT) AS bytes_rcvd, # CAST(envoy['bytes_sent'] AS INT) AS bytes_sent, # CAST(envoy['duration'] AS INT) AS duration, # CAST(envoy['upstream_svc_time'] AS INT) AS upstream_svc_time, # CAST(envoy['x_forwarded_for'] AS STRING) AS x_forwarded_for, # CAST(envoy['useragent'] AS STRING) AS useragent, # CAST(envoy['request_id'] AS STRING) AS request_id, # CAST(envoy['authority'] AS STRING) AS authority, # CAST(envoy['upstream_host'] AS STRING) AS upstream_host # FROM ( # -- Match and parse Envoy records in the value field of the envoy_raw stream. # -- grok() produces a map<field name, value> we call envoy. # SELECT # grok( # `value`, # '\[%{TIMESTAMP_ISO8601:timestamp}\] "%{DATA:method} %{DATA:original_path} %{DATA:protocol}" %{DATA:response_code} %{DATA:response_flags} %{NUMBER:bytes_rcvd} %{NUMBER:bytes_sent} %{NUMBER:duration} %{DATA:upstream_svc_time} "%{DATA:x_forwarded_for}" "%{DATA:useragent}" "%{DATA:request_id}" "%{DATA:authority}" "%{DATA:upstream_host}"' # ) AS envoy # FROM envoy_raw # ) Activate the Pipeline Like connections, pipelines must be activated in order to start the flow of data. Let’s activate our pipeline: decodable pipeline activate <pipeline id> # Output: # # id version target state # <pipeline id> 1 RUNNING Pipeline activation involves provisioning fault-tolerant infrastructure within the Decodable platform and can take up to 1 or 2 minutes to begin processing data. Feel free to make a few decodable pipeline get <pipeline id> calls until you see the actual state move to RUNNING. See your data Next, let’s preview our pipeline to make sure the SQL is processing the data as expected. decodable pipeline preview "select * from http_events" # Output: # Submitting query... done! (took 2.76s) # Waiting for results... # {"authority":"envoy.app.mesh","bytes_recv":7013,"bytes_sent":8291,"duration":2,"method":"PUT","original_path":"/","protocol":"HTTP/2.0","request_id":"2a5ba6a8-2ca0-4a2f-9f18-d68f34d25528","response_code":400,"response_flags":"URX","timestamp":"2025-08-13 13:27:48","upstream_host":"10.0.0.2","upstream_svc_time":44,"useragent":"AppleWebKit/537.36 (KHTML, like Gecko)","x_forwarded_for":"-"} # {"authority":"envoy.app.mesh","bytes_recv":7617,"bytes_sent":9192,"duration":62,"method":"PATCH","original_path":"/products/3","protocol":"HTTP/2.0","request_id":"a3dcdb49-342c-4fe0-8e92-899e015c9431","response_code":201,"response_flags":"UO","timestamp":"2025-08-13 13:27:49","upstream_host":"10.0.0.1","upstream_svc_time":76,"useragent":"Mobile Safari/537.36","x_forwarded_for":"-"} # {"authority":"aws.gateway","bytes_recv":1927,"bytes_sent":9348,"duration":66,"method":"PATCH","original_path":"/users/1","protocol":"HTTP/1.1","request_id":"5ac98a08-2408-4f18-ae63-f04ff050c3b1","response_code":422,"response_flags":"-","timestamp":"2025-08-13 13:27:49","upstream_host":"127.0.0.1:8080","upstream_svc_time":64,"useragent":"Mobile Safari/537.36","x_forwarded_for":"-"} # [...] # {"authority":"envoy.app.mesh","bytes_recv":2055,"bytes_sent":8447,"duration":82,"method":"PUT","original_path":"/products/3","protocol":"HTTP/2.0","request_id":"f4f73823-f2d6-4c6f-b69d-825c73b56297","response_code":422,"response_flags":"UO","timestamp":"2025-08-13 13:28:02","upstream_host":"192.168.0.11:443","upstream_svc_time":26,"useragent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64)","x_forwarded_for":"-"} # {"authority":"localhost","bytes_recv":7648,"bytes_sent":3261,"duration":74,"method":"DELETE","original_path":"/","protocol":"HTTP/1.1","request_id":"dadad70b-3442-49b2-9547-137888467115","response_code":201,"response_flags":"URX","timestamp":"2025-08-13 13:28:02","upstream_host":"10.0.0.2:443","upstream_svc_time":62,"useragent":"Chrome/90.0.4430.212 Safari/537.36","x_forwarded_for":"-"} # {"authority":"localhost","bytes_recv":6114,"bytes_sent":4555,"duration":4,"method":"PUT","original_path":"/products/2","protocol":"HTTP/2.0","request_id":"1dc37492-14de-4128-8fe4-aaff0f900e32","response_code":201,"response_flags":"NR","timestamp":"2025-08-13 13:28:03","upstream_host":"192.168.0.11:443","upstream_svc_time":27,"useragent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64)","x_forwarded_for":"-"} # Records received: 30 # Time to first record: 11.74s # Total time: 22.46s Previews display up to 30 records from the data stream and can be used to see how your pipeline changes the incoming data. Previews run until 30 records are seen or until 2 minutes have passed, whichever comes first. Submit any SQL using the same grammar as you would for a pipeline, but the output will be sent to your command line output instead. For example, here we select the HTTP events from the stream our connection pointed at before. Previews are short-lived, lighter weight, and more responsive than full pipelines, making them a better fit for iterative development on pipelines or just to see sample records showing the structure of your input and output streams' data. Cleanup Active connections and pipelines both consume resources while they’re running. If you’re not using them, it’s best to deactivate them. Use the decodable connection deactivate command to deactivate the Datagen connection: decodable connection deactivate <connection id> # Output: # # datagen_envoy_connection # id <connection id> # description A source connection that generates envoy-style logs # connector datagen # type source # stream id <stream id> # fields # 0 value STRING # primary key fields - # properties # data.type envoy # target state STOPPED # actual state STOPPING # requested tasks 1 # actual tasks 1 # requested task size M # actual task size M # create time 2024-01-11T07:21:30Z # update time 2024-01-11T07:21:30Z # last runtime error - # metrics - Similarly, use decodable pipeline deactivate to deactivate the pipeline: decodable pipeline deactivate <pipeline id> # Output: # # id version target state # <pipeline id> 1 RUNNING