Apache Pulsar

Use the Apache Pulsar connector to get data from or send data to a Pulsar topic.

Overview

Connector namepulsar
Typesource, sink
Delivery guaranteeat least once
CompatibilityPulsar 2.9.x or above

Create a connection to an Apache Pulsar topic with the Apache Pulsar Connector

Follow these steps to send data from Decodable into Apache Pulsar.

  1. From the Connections page, select the Apache Pulsar Connector and complete the following fields.
FieldDescription
Connection TypeSelect whether you want to connect to a Pulsar topic as a source or a sink.

If you select source, then Decodable will read data from the Pulsar topic provided.
If you select sink, then Decodable will send data to the Pulsar topic provided.
Broker Service URLThe URL to connect to your Pulsar broker.

For example, pulsar+ssl://broker.example.com:6651.
Admin Web Service URLThe URL to connect to your Astra Streaming admin endpoint.

For example, <https://broker.example.com>.
TopicThe name of the Pulsar topic.
Value FormatThe format for data in the Pulsar topic. Must be one of the following:
- JSON
- AVRO
- Debezium (JSON)

If you want to send CDC data through this connector, then you must select Debezium (JSON).
  1. Select an Authentication Method, which can be one of the following: JWT, OAuth, No Authentication.

    • When using JWT, you must also provide:

      • A JWT web token for authentication to your remote Pulsar server.
    • When using OAuth, you must also provide:

      • The ID of the service-account client that Decodable will use to contact your remote Pulsar service.
      • The client secret for the service-account that Decodable will use to contact your remote Pulsar service.
      • The token-issuer URL, which Decodable will use to request an access token for the given service account.
      • Optionally, you may specify the audience for your access token, if your remote Pulsar service requires it.
      • Where you find these values varies depending on the remote Pulsar service that you are using. For an example on how to find these values in StreamNative, see Working with OAuth: StreamNative example.
  2. Select the stream that you’d like to connect to this connector. Then, select Next.

  3. Define the connection’s schema. Decodable can auto-populate the connection’s schema using Pulsar's schema registry. In most cases, you’ll want to select Schema Registry to automatically populate the connection’s schema. However, if you would like to manually enter the schema, select New Schema or Structured Schema Definition.

    1. If you want to auto-populate the connection schema using Pulsar's schema registry, you’ll also need to provide the name of your Pulsar tenant, the namespace where your topic is, and the version of the schema that you’d like to use.
  4. Select Next when you are finished providing defining the connection’s schema.

  5. Give the newly created connection a Name and Description and select Save.

Working with OAuth: StreamNative example

When using the OAuth authentication method, you need to additionally provide the Client ID, Client Secret, Token Issuer URL, and in some cases, the Token Audience for the service account that Decodable uses to contact your remote Pulsar service. The following example walks through how to find these values in StreamNative, but you can use a different Apache Pulsar service of your choosing.

Perform the following steps in the StreamNative web console to find the Client ID, Client Secret, and the Token Issuer URL.

  1. Select Service Accounts, and download the Key File for the service account that you want to use.
  2. Open the key file, and copy the values for the client_id, client_secret, and issuer_url fields.

Perform the following steps in the StreamNative web console to find the OAuth audience token.

  1. Select Pulsar Clients.
  2. Fill out the Pulsar Client Setup form. It does not matter which programming language you select in Step 1. The OAuth audience token will be returned as a hardcoded variable in Step 5.

Properties

The following properties are supported by the Apache Pulsar connector.

PropertyRequired?Description
topicrequiredFully qualified pulsar topic name

Example: persistent://decodable/default/test
service-urlrequiredThe broker service URL.

Example: pulsar+ssl://broker.example.com:6651
admin-urlrequiredThe admin web service URL.

Example: <https://broker.example.com>
formatrequiredThe format for data in the Pulsar topic. Must be one of the following:
- JSON
- AVRO
- Debezium (JSON)

If you want to send CDC data through this connector, then you must select Debezium (JSON).
tokenoptionalJWT token for authentication.

Required when performing authentication via JWT. Not compatible with any OAuth properties.

Example: eyJhbGciOiJIUzI1NiJ9...
client-idoptionalID of the service-account client used for OAuth authentication.

Required when performing authentication via OAuth. Not compatible with JWT properties.
client-secretoptionalSecret for the service-account client used for OAuth authentication.

Required when performing authentication via OAuth. Not compatible with JWT properties.
issuer-urloptionalURL of the authentication provider from which Decodable will OAuth access tokens on behalf of the given service-account client.

Required when performing authentication via OAuth. Not compatible with JWT properties.

Example: https://auth.example.cloud/
audienceoptionalIdentifier of the resource for which a requested OAuth access token may be used.

Not strictly required by Decodable when performing authentication via OAuth, though pulsar servers may independently require the property be defined, depending on their security requirements.

Example: urn:sn:pulsar:organization:cluster
scan.startup.modeoptionalApplicable for source connections only, whether to start from the earliest or latest offset. Defaults to latest

JSON format properties

When using format=json, the following properties are optionally allowed:

PropertyDescription
json.timestamp-format.standardSpecify the timestamp format for TIMESTAMP and TIMESTAMP_LTZ types.
  • SQL will use a "yyyy-MM-dd HH:mm:ss.SSS" format, e.g "2020-12-30 12:13:14.123"
  • ISO-8601 will parse input TIMESTAMP in "yyyy-MM-ddTHH:mm:ss.SSS" format, e.g "2020-12-30T12:13:14.123"
json.encode.decimal-as-plain-numberMust be true or false, defaults to false.
When true, always encode numbers without scientific notation.
For example, a number encoded 2.7E-8 by default would be encoded 0.000000027.
Apache® Pulsar™, Apache, Pulsar, and associated open source project names are either registered trademarks or trademarks of The Apache Software Foundation.