How To: Set up mutual TLS authentication with Apache Kafka

In the previous post, we’ve learned how to set up 1-way TLS for data encryption. TLS can also be used for authentication, which is usually known as 2-way authentication or mutual TLS(mTLS). This allows Apache Kafka brokers to only allow trusted clients to connect. In this guide, we’ll walk through how to make that happen.

As a prerequisite, your Kafka brokers need to be configured to use TLS authentication (ref).

ssl.client.auth=required should exist in the broker properties.

To use mTLS, you need all the properties of 1-way TLS. In addition, you need to specify the required properties to enable authentication. Decodable supports two methods of enabling TLS authentication:

  • Self-signed Certificate. Decodable generates a self-signed certificate. The certificate is then added to the brokers' trust store.

  • Certificate sign request (CSR). Decodable generates a CSR. Kafka brokers' CA signs the CSR. Decodable uses the signed certificate for authentication.

We will go through both methods below.

Setup Decodable Kafka mTLS connection

We’ll assume here that you already have a Decodable account and have gotten started with the Decodable CLI. If you haven’t done that yet, see The Decodable CLI to learn how to install and setup the Decodable CLI.

Create a stream

decodable stream create --name kafka_mtls_in           \
  --description "input stream"                         \
  --field value=string

The stream is used to hold the output of the Kafka source connection in the next step.

Create a Kafka source mTLS connection

Option 1: Using a self-signed certificate

Specify tls.client.certificate.type=SELF_SIGNED

decodable connection create --connector kafka --type source          \
  --name kafka-mtls-self-signed                                      \
  --description "Kafka source mTLS connection with Self Signed Cert" \
  --stream-id=<stream-id>                                            \
  --field value=STRING                                               \
  --prop bootstrap.servers=<broker_list>                  \
  --prop value.format=raw                                            \
  --prop topic=source_raw                                            \
  --prop security.protocol=TLS                                       \
  --prop tls.client.certificate.type=SELF_SIGNED                     \
  --prop tls.broker.certificate=@<broker_public_cert>

If the connection is created successfully, connection id will be in the output. Then using the id and run

decodable connection get <id_of_kafka-mtls-self-signed>

You should see tls.client.certificate property in the response. This is the generated self-signed certificate from Decodable. The certificate is an X509 certificate in PEM format. It looks like

-----BEGIN CERTIFICATE-----
...
-----END CERTIFICATE-----

First, copy the content of the certificate and store it in a file, for example client_cert.pem. Then, add this certificate to each Kafka broker’s truststore. A common way to do it is using the following command:

keytool -keystore kafka.server.truststore.jks -alias decodable-client -import -file client_cert.pem

Note that Kafka broker configuration changes require a rolling restart. Now the Kafka connection can connect to the brokers using mTLS.

Option 2: Using a Certificate Sign Request

Specify tls.client.certificate.type=CSR

decodable connection create --connector kafka --type source          \
  --name kafka-mtls-csr.                                             \
  --description "Kafka source mTLS connection with CSR"              \
  --field value=STRING                                               \
  --prop bootstrap.servers=<broker_list>                  \
  --prop value.format=raw                                            \
  --prop topic=source_raw                                            \
  --prop security.protocol=TLS                                       \
  --prop tls.client.certificate.type=CSR                             \
  --prop tls.broker.certificate=@<broker_public_cert>

If the connection is created successfully, a connection id will be in the output. Using this id, run

decodable connection get <id_of_kafka-mtls-csr>

You should see tls.client.certificate property in the response. This is the generated CSR from Decodable. The certificate is an X509 certificate in PEM format. It looks like

-----BEGIN CERTIFICATE REQUEST-----
...
-----END CERTIFICATE REQUEST-----

Copy the contents of the certificate and store it in a file, for example client_csr.pem. Then, use the Kafka brokers' CA (Certificate Authority) to sign this CSR. A common way to do it is using the following command:

openssl x509 -req -days 360 -in client_csr.pem -CA ca.crt -CAkey ca.key -CAcreateserial -out signed_cert.pem -sha256
CA key (ca.key) and certificate (ca.crt) can usually be obtained from Kafka broker’s keystore.

Once the request is signed successfully, signed_cert.pem should be created. Then you need to update the kafka-mtls-csr connection with the signed certificate.

decodable connection update <id_of_kafka-mtls-csr>                          \
  --prop tls.broker.signed_client_certificate=<content of signed_cert.pem>

Now the Kafka connection can connect to the brokers using mTLS. To test the connection, you can follow the test-the-Kafka-tls-connection with the mTLS connection and stream above.