Apache Pulsar sink connector Features Connector name pulsar Compatibility Pulsar 2.9.x or above Delivery guarantee At least once Supported task sizes S, M, L Multiplex capability A single instance of this connector can write to a single topic. Supported stream types Append Configuration properties Property Description Required Default Connection service-url The URL to connect to your Pulsar broker. For example, pulsar+ssl://broker.example.com:6651. Yes admin-url The URL to connect to your Pulsar admin endpoint. For example, <https://broker.example.com>. Yes Authentication token The secret associated with your Pulsar client that contains the JWT token for authentication. This must be provided as a secret resource. Required when performing authentication via JWT. client-id The ID of the service-account client used for OAuth2 authentication. Required when performing authentication via OAuth. client-secret The secret associated with your Pulsar client. This must be provided as a secret resource. issuer-url URL of the authentication provider from which Decodable will OAuth access tokens on behalf of the given service-account client. Example: https://auth.example.cloud/ audience An identifier of the resource for which a requested OAuth access token may be used. Example: urn:sn:pulsar:organization:cluster Not required by Decodable when performing authentication via OAuth, though Pulsar servers may independently require the property be defined depending on their security requirements. Data topic The fully qualified name of the topic. For example, persistent://stream/namespace/topic-name. Yes key.fields A list of fields, delimited by semicolons, that comprise the partition key. For example: field1;field2. — key.format The format used to serialize and deserialize the partition key. Must be one of the following: JSON Avro Raw — — format The format for data in the topic. Must be one of the following: JSON Avro Raw Debezium (JSON) If you want to send CDC data through this connector, then you must select Debezium (JSON). — — value.fields-include If set to ALL then the partition key columns will be included in the payload. Set to EXCEPT_KEY if you don’t want the partition key columns to be included in the payload. For an example of how the key.fields, key.format, and value.fields-include arguments work together, see the examples in the Key and Value Formats section in the Apache Flink documentation. — ALL Working with OAuth: StreamNative example When using the OAuth authentication method, you need to additionally provide the Client ID, Client Secret, Token Issuer URL, and in some cases, the Token Audience for the service account that Decodable uses to contact your remote Pulsar service. The following example walks through how to find these values in StreamNative, but you can use a different Apache Pulsar service of your choosing. Perform the following steps in the StreamNative web console to find the Client ID, Client Secret, and the Token Issuer URL. Select Service Accounts, and download the Key File for the service account that you want to use. Open the key file, and copy the values for the client_id, client_secret, and issuer_url fields. Perform the following steps in the StreamNative web console to find the OAuth audience token. Select Pulsar Clients. Fill out the Pulsar Client Setup form. It doesn’t matter which programming language you select in the StreamNative web console. The OAuth audience token will be returned as a hard-coded variable. Connector starting state and offsets A new sink connection will start reading from the Latest point in the source Decodable streams. This means that only data that’s written to the stream when the connection has started will be sent to the external system. You can override this when you start the connection to Earliest if you want to send all the existing data on the source streams to the target system, along with all new data that arrives on the streams. When you restart a sink connection it will continue to read data from the point it most recently stored in the checkpoint before the connection stopped. You can also opt to discard the connection’s state and restart it afresh from Earliest or Latest as described above. Learn more about starting state here.