Create a curated stream of data for other teams

This guide walks you through creating curated streams of data for different teams within your organization to use. Decodable includes Role-Based Access Control (RBAC) capabilities that allow you to control access to Decodable resources based on predefined roles. As an account administrator, you have control over who has access to particular types of data within Decodable and the level of functionality that an individual Decodable user has.

This guide uses Apache Kafka as the data source and Amazon S3 as the data sink, but you can substitute these for your platforms of choice. For example, you can use a different event streaming platform such as Redpanda, Apache Pulsar, AWS Kinesis, Confluent Cloud, or others. Similarly, other common sinks are Amazon S3, DeltaLake, ClickHouse, Apache Pinot, and others.

You can find a full list of available connectors here, or in the navigation menu on the left.

We’ll ingest data from Apache Kafka, sanitize it, and send it to Amazon S3. We’ll use role-based access control to create one group with the roles and permissions to modify the flow of data from Apache Kafka to Amazon S3 and another group that has only has the role and permissions to view it.

Let’s go! 🚀

Prerequisites

Before you get started, here are a few pieces of information you’ll need to have handy.

  • Information about the Kafka brokers that you want to receive data from. This includes the bootstrap brokers, authentication method supported by your deployment, and the authentication credentials required to connect.

  • The topic names from which you want to ingest data from.

  • Information about the Amazon S3 bucket that you want to send data to. This includes the path to the S3 bucket that you want to send data to, the ARN of the IAM Role that has the policy necessary for accessing the S3 bucket, and the region that the S3 bucket is located in.

You will also need to have the following configured.

  • You must have allowed connections from Decodable’s IPs so that Decodable can connect to your sources and sinks.

  • You must have an IAM user or role with an identity-based policy that allows Decodable to access the S3 bucket. See the Amazon S3 connector documentation for more information.

Guide

Step 1: Create and start a source connection

Go to Connections and select New Connection. Connections allow data to flow from data sources or sinks into a Decodable stream.

  1. Configure the connection

    Select Apache Kafka (or your system of choice). Then, choose Source as the connection type and provide the required connection information such as the list of bootstrap brokers, authentication method, credentials, and any other optional connection parameters.

    Depending on the security protocol that you want to use to connect to Apache Kafka, you may need to provide additional information such as your certificates and keys or your SASL username and password.

    The Value Format parameter controls how Decodable deserializes data from the Kafka topic. Decodable supports popular serialization formats like JSON, Avro, and Debezium (JSON) schemas. Make sure to select the format that matches your data.

    If Decodable encounters data in a format other than what’s configured, it will follow the Parse Error Policy method you select. Selecting Fail causes Decodable to fail the connection and stop processing until the issue is corrected, while Ignore causes the system to skip and discard the malformed record.

    By default, Kafka connections operate in exactly once delivery mode in Decodable. For more information about the other Apache Kafka options, see Apache Kafka.

    Select Next.

  2. Choose or create a stream

    Streams transport records within Decodable. You can select a preexisting stream to which data should be written, or you can create a new stream.

    If you’re creating a new stream, give it a name. Names must be unique within an account and can contain letters, numbers, and underscores ([a-z0-9\_]+). This is how you’ll refer to your data in SQL queries and custom pipelines written in Java. Best practices are to give streams a description that helps you and your team remember what data it contains and how it should be used.

    Next, you must define the schema of the stream. The stream’s schema must match the schema of the data in the Postgres database. This is how Decodable validates and enables processing of data. The schema can be as simple as having a single text field or can fully describe a complex nested record. All records in a stream must have the same schema. If you already have a schema defined in JSON Schema or Apache Avro format, you can upload these to Decodable.

    To learn more about streams, see About Streams for an overview on the types of streams available, About Change Data Capture and Change Streams for information about change streams, and Create a Stream for information about creating streams, including how to specify event time and watermark information as well as how to use computed or metadata fields.

  3. Name your connection

    Give your connection a meaningful name and description for future reference.

    Selecting Create Connection performs a connectivity test. If Decodable successfully connects to your Postgres table, a connection is created and the Connection Overview page opens.

    If Decodable is unable to validate the connection, an error is shown with more information. The most common type of error is a network connectivity error because Decodable is unable to connect to your database because a firewall or security group prevents access. This often results in a timeout while trying to connect. Make sure you’ve allowed connections from Decodable’s IPs.

  4. Start your new connection

    Once your connection is created, you can start it by selecting Start. You choose the maximum task size and the maximum number of tasks when activating a connector or pipeline. A task is a compute unit that includes CPU, memory, network bandwidth, and local storage. Most connectors and all pipelines support scale-out parallel execution. The number of tasks controls the maximum parallelism of a connection or pipeline. A medium task is sufficient for most connections and should provide between 1000 and 30,000 records per second depending on record size, serialization format, and similar factors.

    Activation can take about a minute while Decodable allocates resources. Your connection should display Starting, and eventually transition to Running. If there’s data on the topic, you should see output record counts and bytes metrics begin to increase. Note that metrics are reset upon each activation. Stopping and restarting a connection resets metrics back to zero.

    If you don’t see metrics increasing, make sure that the connection has the status Running.

    You can follow the lineage trail by selecting Outbound to 1 stream - your stream name to preview data in the source connection’s output stream. This is the stream you selected or created in step 1.2. Stream preview is updated whenever you navigate to a stream, but it can be manually refreshed by selecting Run Preview.

    If you don’t see data in your output stream:

    • Check that your connection has the status Running.

    • Try selecting Run Preview.

Step 2: Process your data (Optional)

Now that your data is flowing into a stream, you can optionally add processing by creating a pipeline in either SQL or by writing Java. For this guide, we’ll filter our data using a standard where clause in SQL. The exact fields you can reference are governed by the schema in the stream you created earlier.

  1. Create a pipeline

    Select Pipelines, and then New Pipeline. If you are participating in the public tech preview for custom pipelines, which allows you write pipelines in Java and other JVM-based languages using the full power of the Apache Flink APIs, you can select between SQL and a custom pipeline. For now, select SQL.

    Choose your Input Stream. This should be the stream you created or selected in step 1.2. You can always change your mind later while editing the pipeline SQL.

  2. Write your SQL

    In the SQL editor you can browse all of your existing streams in the Data Catalog panel, edit SQL, and preview the results of your SQL query.

    SQL pipelines are always insert into …​ select …​ from …​ queries. The insert into clause specifies the pipeline’s output stream, while the select …​ from …​ specifies the input streams and any transformations. Decodable automatically generates an output stream name for you (for example stream_<digits>), but you’re encouraged to change it to something meaningful to you. Decodable supports standard SQL including joins, windowed and global aggregation, Common Table Expressions (CTE), and even pattern detection with match_recognize.

    For example, let’s assume that our data contained order events. We can filter our data by configuring the following pipeline.

    insert into failed_orders -- Our output stream will be failed_orders.
    select *                  -- Select all fields, no transformation.
    from orders               -- Our input stream.
    where status = 'failed'   -- Filter events for failed orders.

    If necessary, we can keep only the necessary fields and mask sensitive data.

    insert into masked_failed_orders
    select
       order_id,
       total,
       line_items,
       -- address,   Omit the address field.
       postal_code,
       country,
    
       -- Mask card digits except the first 1 and the last 4.
       overlay(cc_number placing '***********' from 2 for 11) as card_masked
    from orders
    where status = 'failed'

    See the Function Reference to learn more about supported SQL functions. If you’re interested in windowed aggregations, those are supported too. There are also a number of SQL pipeline examples to help get you started with common patterns.

    Next, let’s preview our pipeline to make sure the SQL is processing the data as expected. Previews display up to 30 records from the data stream and can be used to see how your pipeline changes the incoming data. Previews run until 30 records have been seen or until 2 minutes have passed, whichever comes first.

    Start the preview by selecting Run Preview. You can optionally run the preview from the earliest position in the input streams or the latest position. By default, preview runs from the latest position.

    Not seeing preview output? Here are a few things to check:

    • Make sure your input stream contains data.

    • If the input stream isn’t currently receiving data, select the Earliest position to run a preview over data that’s already in the stream.

    Once your data looks correct, select Next to continue.

  3. Create an output stream or validate an existing output stream’s schema

    If you’ve specified an output stream that doesn’t exist, Decodable prompts you to create a new stream. The stream’s schema is automatically inferred from the SQL query, including field names and their types. The stream’s name is defined by the SQL query, so it isn’t editable while displayed. As always, you should give your stream a meaningful description.

    Once everything looks correct, select Create Stream.

    Alternatively, if you specified a stream that already exists as an output stream in your SQL query, the stream’s schema must match the output of the select …​ from …​ portion of the query. You’ll get an error if this isn’t the case.

  4. Name your pipeline

    Finally, just like our connection, give the new pipeline a name and description.

    Select Create Pipeline to complete the process.

  5. Start your pipeline

    Like connections, pipelines must be started in order to begin processing data.

    Select Start to activate your pipeline. Select the appropriate maximum task size and maximum task count just as we did for our connection. Like preview, select whether the pipeline should start from the earliest or latest position in the input stream.

    Pipeline metrics work identically to connection metrics with a few noteworthy exceptions:

    • Both input and output metrics exist for pipelines. This can help you understand whether or not a pipeline is filtering all records by accident, if it’s keeping up with the rate of input data, and other common operational concerns.

    • An additional metric displaying lag. This is the number of records that are ready for processing, but haven’t yet been processed. In most cases, this metric should be zero which means the pipeline is keeping up with the rate of new data on its input stream. Since input streams are partitioned, this metric reflects the maximum lag across all partitions. Note that it’s not the sum of lag over all partitions.

    Select Output to 1 stream in the lineage bar to navigate to the pipeline’s output stream and confirm that your pipeline is processing and sending data.

Step 3: Create a sink connection

The final step is to create a connection to the data sink that you’d like to send data to. Connections made to sinks are also called sink connections. Here, we’ll use Amazon S3, but the process is the same for other systems.

At this point, make sure that you’ve completed the prerequisites for using the Amazon S3 Connector as described in the Prerequisites section.

We can create our sink connection the same way we did with our source: by selecting Connections and New Connection. But, we can also use the lineage bar while viewing a stream.

Select the ellipsis (…​) on the outbound right-hand side of the lineage bar and select Create a Sink Connection. This creates a connection that uses the selected stream as its input.

Find the Amazon S3 connector and select Connect.

  1. Configure required connection information

    Complete the required fields for the connection to Amazon S3. This includes the file path where your bucket is located, the AWS ARN of the IAM role with sufficient permissions to access the S3 bucket, and the region that your S3 bucket is in.

    For more information about the Amazon S3 Connector, including how Decodable constructs object key names in Amazon S3, see the Amazon S3 connector documentation.

    Select Next to validate the connection parameters.

  2. Start the connection

    Select Start to activate the sink connection. Choose the appropriate maximum task size and maximum number of tasks for your data volume.

  3. Create a group with a read-only role

    Go to Access Control and select Roles. Roles define the permissions that the users of a specific group have. As a Decodable administrator, you can assign roles to any group in your account.

  4. Create a new role with read-only permissions

    Select New Role, and give your role a descriptive name like read-only-role. Then, select Create role.

    Next, you must assign some permissions to the role. Permissions are represented as permission strings, which have the following pattern: <type>:<action>:<qualified-name>. These permission placeholders are defined as follows:

    • Type: The resource type that you want to set permissions for. Must be one of the following: account, connection, secret, stream, pipeline, group, role, user, or *.

    • Action: The operation that you want to grant permissions for. Must be one of the following: read, write, delete, activate, or *.

    • Qualified name: The name of the specific object that you want to set permissions for. The qualified name for the connection, stream, or pipeline resource types must be prepended with the word default. For example, if you want to grant write permissions to any stream that begins with the word marketing, then the permission string looks like: stream:write:default.marketing.

    Select Permissions and enter :read: for the permission string. Then, select the plus (+) icon to add the permission to the role.

    Now, we’ll need to create a group to assign this role to. A group is a collection of users who have been grouped together for administrative or security purposes. We’ll create a group and assign the read-only role that you created earlier to it. Then, you can assign users to this group who need read-only access to Decodable, but don’t need to make any updates to it themselves. For example, this can be a group of users who need access to Decodable in order to monitor that their data is making it into the sink.

  5. Assign the role to the group.

    Select New Group, and give your group a descriptive name like read-only-analysts. Then, select Create group.

    After you create the group, you can add users to the group and assign roles to it. If you haven’t already invited the users you want to add to the account, select Account Management > Invite User to send an invitation to join your account. Once the user has accepted, you can add them to the group. When you invite users to join the account, they’re automatically added to the new-users group which has full read and write permissions to the account. Make sure that any users who only need read access to the account are removed from this group.

    When you are done adding users to the group, select Roles and assign the role that you created previously to the group.

Step 4: Celebrate! 🥳 🥳

You’re now ingesting data into your data warehouse in real-time. You’ve also set up a new group with read-only access, so that you can manage the levels of access to Decodable for your organization.

Here are a few things to do next: