Apache Pulsar source connector

Features

Connector name

pulsar

Compatibility

Pulsar 2.9.x or above

Delivery guarantee

At least once

Supported task sizes

S, M, L

Supported stream types

Append

Configuration properties

Property Description Required Default

Connection

service-url

The URL to connect to your Pulsar broker.

For example, pulsar+ssl://broker.example.com:6651.

Yes

admin-url

The URL to connect to your Pulsar admin endpoint.

For example, <https://broker.example.com>.

Yes

Authentication

token

The secret associated with your Pulsar client that contains the JWT token for authentication. This must be provided as a secret resource.

Required when performing authentication via JWT.

client-id

The ID of the service-account client used for OAuth2 authentication.

Required when performing authentication via OAuth.

client-secret

The secret associated with your Pulsar client. This must be provided as a secret resource.

issuer-url

URL of the authentication provider from which Decodable will OAuth access tokens on behalf of the given service-account client.

Example: https://auth.example.cloud/

audience

An identifier of the resource for which a requested OAuth access token may be used.

Example: urn:sn:pulsar:organization:cluster

Not required by Decodable when performing authentication via OAuth, though Pulsar servers may independently require the property be defined depending on their security requirements.

Data

topic

The fully qualified name of the Pulsar topic. For example, persistent://stream/namespace/topic-name.

Yes

key.fields

A list of fields, delimited by semicolons, that comprise the partition key.

For example: field1;field2.

key.format

The format used to serialize and deserialize the partition key. Must be one of the following:

  • JSON

  • Avro

  • Raw

format

The format for data in the Pulsar topic. Must be one of the following:

  • JSON

  • Avro

  • Raw

  • Debezium (JSON)

If you want to send CDC data through this connector, then you must select Debezium (JSON).

value.fields-include

If set to ALL then the partition key columns will be included in the payload values sent to Decodable.

Set to EXCEPT_KEY if you don’t want the partition key columns to be included in the payload.

For an example of how the key.fields, key.format, and value.fields-include arguments work together, see the examples in the Key and Value Formats section in the Apache Flink documentation.

ALL

scan.startup.mode

Specifies where in the topic to start reading data when the connection is first started, or when it’s restarted with the state discarded.

  • latest: Start reading data from the latest available point in the stream.

  • earliest: Start reading data from the earliest available point in the stream.

latest

Advanced

pulsar.reader.receiverQueueSize

Size of the Pulsar reader’s receiver queue. A value higher than the default value increases consumer throughput, though at the expense of more memory utilization.

1000

Schema Registry

Decodable can automatically populate the connection’s schema using Pulsar’s schema registry. To do this you need to create the connection through the Decodable Web UI.

If you would like to manually enter the schema in the Decodable Web UI, select New Schema or Import Schema.

  1. If you want to automatically populate the connection schema using Pulsar’s schema registry, you’ll also need to provide the name of your Pulsar tenant, the namespace where your topic is, and the version of the schema that you’d like to use.

  2. Select Next when you are finished providing defining the connection’s schema.

  3. Give the newly created connection a Name and Description and select Save.

Working with OAuth: StreamNative example

When using the OAuth authentication method, you need to additionally provide the Client ID, Client Secret, Token Issuer URL, and in some cases, the Token Audience for the service account that Decodable uses to contact your remote Pulsar service. The following example walks through how to find these values in StreamNative, but you can use a different Apache Pulsar service of your choosing.

Perform the following steps in the StreamNative web console to find the Client ID, Client Secret, and the Token Issuer URL.

  1. Select Service Accounts, and download the Key File for the service account that you want to use.

  2. Open the key file, and copy the values for the client_id, client_secret, and issuer_url fields.

Perform the following steps in the StreamNative web console to find the OAuth audience token.

  1. Select Pulsar Clients.

  2. Fill out the Pulsar Client Setup form. It doesn’t matter which programming language you select in the StreamNative web console. The OAuth audience token will be returned as a hard-coded variable.

Connector starting state and offsets

When you create a connection, or restart it and discard state, it will read from the position in the topic as defined by scan startup mode. By default this is latest and will therefore read from the end of the topic.

Learn more about starting state here.