How To: Set up TLS encryption with Apache Kafka

To secure the connections between Decodable services and your Apache Kafka brokers, you can configure Transport Layer Security (TLS) to encrypt the connection. This ensures that the data between the Decodable services and Kafka brokers is encrypted. In this guide, we’ll walk you through how.

You can also configure TLS for client authentication, which is usually known as 2-way authentication or mutual TLS. This will be covered in the next topic.

As a prerequisite, your Kafka brokers need to be configured to accept TLS connections (ref). In the context of Kafka TLS connections, Decodable services are the TLS client and the Kafka brokers are the server.

To use TLS encryption, the client (Decodable services) holds the server’s (Kafka brokers) public certificate. Data from the server is encrypted using the server’s private key and the client can decrypt it using the public certificate. Data from the client is encrypted using the public certificate and can only be decrypted using the server’s private key.

Setup Decodable Kafka connection

We’ll assume here that you already have a Decodable account and have gotten started with the Decodable CLI. If you haven’t done that yet, see The Decodable CLI to learn how to install and setup the Decodable CLI.

Create a Stream

decodable stream create --name kafka_tls_in            \
  --description "input stream"                         \
  --field value=string

The stream is used to hold the output of the Kafka source connection in the next step.

Create a Kafka source TLS connection

decodable connection create --connector kafka --type source          \
  --name kafka-tls-source                                            \
  --description "Kafka source connection with TLS encryption"        \
  --stream-id=<stream-id>                                            \
  --field value=STRING                                               \
  --prop bootstrap.servers=<broker_list>                  \
  --prop value.format=raw                                            \
  --prop topic=source_raw                                            \
  --prop security.protocol=TLS                                       \
  --prop tls.broker.certificate=@<path_to_cert_file>

To make sure the connection is configured correctly:

  • broker_list must use the port that accepts TLS connections

  • path_to_cert_file must be an X.509 certificate in PEM format

  • Your Kafka brokers have a topic source_raw that has data with raw value format

If your certificate is in DER format, you can use this command to transform it to PEM format: openssl x509 -in certfile -inform der -outform pem -out cert.pem

Test the Kafka TLS connection

The quickest way to test the connection is to activate it and run a preview job. After activation, we can verify that the connection is activated successfully by checking the actual state.

Note that it can take up to 1 minute for the state to update.

Activate the connection

decodable connection activate <connection_id>

decodable connection get <connection_id>
#sample output
kafka-tls-source
  id                       <connection_id>
  description              Kafka source connection with TLS encryption
  connector                kafka
  type                     source
  stream id                <stream_id>
  schema
    0  body                  STRING
  properties
    bootstrap.servers <broker_list>
    topic                    source_raw
    value.format             raw
    security.protocol        TLS
    tls.broker.certificate   <content_of_cert_file>
  target state             RUNNING
  actual state             RUNNING
  create time              2021-10-13T21:23:33Z
  update time              2021-10-13T21:23:33Z

Create a preview job

Run a preview to read from the stream the source Kafka TLS connection writes into. If you produce raw strings to the source_raw topic, you should see sample data coming out from the preview command output.

Note that it may take up to 1 minute for the data to appear.

decodable pipeline preview "SELECT * FROM kafka_tls_in"