How To: Set up mutual TLS (mTLS) authentication with Apache Kafka As well as using TLS for data encryption with Apache Kafka, TLS can also be used for authentication. This is usually known as 2-way authentication or mutual TLS (mTLS). This allows Apache Kafka brokers to only allow trusted clients to connect. Consult the Apache Kafka source connector page for full details and configuration reference. As a prerequisite, your Kafka brokers need to be configured to use TLS authentication. ssl.client.auth=required should exist in the broker properties. Decodable supports two methods of enabling TLS authentication: Self-signed Certificate. Decodable generates a self-signed certificate. The certificate is then added to the brokers' trust store. Certificate Signing Request (CSR). Decodable generates a CSR. Kafka brokers' CA signs the CSR. Decodable uses the signed certificate for authentication. We will go through both methods below. We’ll assume here that you already have a Decodable account and have gotten started with the Decodable CLI. If you haven’t done that yet, see The Decodable CLI to learn how to install and setup the Decodable CLI. Create a Stream decodable stream create --name kafka_mtls_in \ --description "input stream" \ --field value=string The stream is used to hold the output of the Kafka source connection in the next step. Create a Kafka source mTLS connection Option 1: Using a self-signed certificate Specify tls.client.certificate.type=SELF_SIGNED decodable connection create --connector kafka --type source \ --name kafka-mtls-self-signed \ --description "Kafka source mTLS connection with Self Signed Cert" \ --stream-id=<stream-id> \ --field value=STRING \ --prop bootstrap.servers=<broker_list> \ --prop value.format=raw \ --prop topic=source_raw \ --prop security.protocol=TLS \ --prop tls.client.certificate.type=SELF_SIGNED \ --prop tls.broker.certificate=@<broker_public_cert> If the connection is created successfully, connection id will be in the output. Using the id run: decodable connection get <id_of_kafka-mtls-self-signed> You should see tls.client.certificate property in the response. This is the generated self-signed certificate from Decodable. The certificate is an X509 certificate in PEM format. It looks like -----BEGIN CERTIFICATE----- ... -----END CERTIFICATE----- First, copy the content of the certificate and store it in a file, for example client_cert.pem. Then, add this certificate to each Kafka broker’s truststore. A common way to do it’s using the following command: keytool -keystore kafka.server.truststore.jks -alias decodable-client -import -file client_cert.pem Note that Kafka broker configuration changes require a rolling restart. Now the Kafka connection can connect to the brokers using mTLS. Option 2: Using a Certificate Sign Request (CSR) Specify tls.client.certificate.type=CSR decodable connection create --connector kafka --type source \ --name kafka-mtls-csr. \ --description "Kafka source mTLS connection with CSR" \ --field value=STRING \ --prop bootstrap.servers=<broker_list> \ --prop value.format=raw \ --prop topic=source_raw \ --prop security.protocol=TLS \ --prop tls.client.certificate.type=CSR \ --prop tls.broker.certificate=@<broker_public_cert> If the connection is created successfully, a connection id will be in the output. Using this id, run decodable connection get <id_of_kafka-mtls-csr> You should see tls.client.certificate property in the response. This is the generated CSR from Decodable. The certificate is an X509 certificate in PEM format. It looks like -----BEGIN CERTIFICATE REQUEST----- ... -----END CERTIFICATE REQUEST----- Copy the contents of the certificate and store it in a file, for example client_csr.pem. Then, use the Kafka brokers' CA (Certificate Authority) to sign this CSR. A common way to do is using the following command: openssl x509 -req -days 360 -in client_csr.pem -CA ca.crt -CAkey ca.key -CAcreateserial -out signed_cert.pem -sha256 CA key (ca.key) and certificate (ca.crt) can usually be obtained from Kafka broker’s keystore. Once the request is signed successfully, signed_cert.pem should be created. Then you need to update the kafka-mtls-csr connection with the signed certificate. decodable connection update <id_of_kafka-mtls-csr> \ --prop tls.broker.signed_client_certificate=<content of signed_cert.pem> Now the Kafka connection can connect to the brokers using mTLS. To test the connection, follow the steps shown here, with the mTLS connection and stream above.