Apache Pulsar source connector

Use the Apache Pulsar connector to get data from a Pulsar topic. If you want instructions on how to connect to Apache Pulsar as a sink, see Apache Pulsar sink connector.

Features

Delivery guarantee

At least once

Compatibility

Pulsar 2.9.x or above

Prerequisites

If you want to connect to Pulsar using JWT or OAuth2 authentication, you will need the following information before creating the connection.

For JWT Authentication:

  • A JSON web token to use for authentication between Decodable and the Pulsar client.

For OAuth2 authentication:

  • The Client ID, Client Secret, Token Issuer URL, and in some cases, the Token Audience for the service account that Decodable uses to contact your remote Pulsar service. Where you obtain these values depends on the Apache Pulsar service you are using. See Working with OAuth: StreamNative example for an example on where to find these values in StreamNative.

Steps

If you want to use the Decodable CLI or API to create the connection, you can refer to the Property Name column for information about what the underlying property names are. The connector name is pulsar.
  1. From the Connections page, select the Pulsar Connector and complete the following fields.

    UI Field Property Name Description

    Connection Type

    N/A

    Select Source for Decodable to read data from a Pulsar topic.

    Broker Service URL

    service-url

    The URL to connect to your Pulsar broker.

    For example, pulsar+ssl://broker.example.com:6651.

    Admin Web Service URL

    admin-url

    The admin web service URL.

    For example, https://broker.example.com.

    Topic

    topic

    The name of the Pulsar topic.

    Authentication Method

    N/A

    The authorization method to use when connecting to the Pulsar client. Must be one of the following:

    • JWT

    • OAuth

    • No Authentication

    Authentication Token

    token

    Conditional: Required when performing authentication via JWT.

    The secret associated with your Pulsar client that contains the JWT token for authentication.

    If you are using the Decodable CLI, this is the ID of a secret resource in your account. Run decodable secret list to view available secrets or decodable secret --help for help with creating a new secret.

    Note: For security purposes, Decodable will never display secret values in plaintext. You can manage which users have permissions to create, delete, or modify secrets in the Access Control management view. See Roles, groups, and permissions for more information.

    OAuth Client ID

    client-id

    Conditional: Required when performing authentication via OAuth.

    The ID of the service-account client used for OAuth2 authentication.

    OAuth Client Secret

    client-secret

    Conditional: Required when performing authentication via OAuth.

    The secret associated with your Pulsar client. If you are using the Decodable CLI, this is the ID of a secret resource in your account. Run decodable secret list to view available secrets or decodable secret --help for help with creating a new secret.

    Note: For security purposes, Decodable will never display secret values in plaintext. You can manage which users have permissions to create, delete, or modify secrets in the Access Control management view. See Roles, groups, and permissions for more information.

    OAuth Token Issuer URL

    issuer-url

    Conditional: Required when performing authentication via OAuth.

    URL of the authentication provider from which Decodable will OAuth access tokens on behalf of the given service-account client.

    Example: https://auth.example.cloud/

    OAuth Token Audience

    audience

    An identifier of the resource for which a requested OAuth access token may be used.

    Example: urn:sn:pulsar:organization:cluster

    Conditional: Not required by Decodable when performing authentication via OAuth, though Pulsar servers may independently require the property be defined depending on their security requirements.

    Value Format

    format

    The format for data in the Pulsar topic. Must be one of the following:

    • JSON

    • AVRO

    • Debezium (JSON)

    If you want to send CDC data through this connector, then you must select Debezium (JSON).

    Partition Key Columns

    key.fields

    A list of fields, delimited by semicolons, that comprise the partition key.

    For example: field1;field2.

    Partition Key Format

    key.format

    Optional. The format used to serialize and deserialize the partition key. Must be one of the following:

    • No Format

    • Optional Raw

    • Avro

    • JSON

    Partition Key Columns Inclusion Policy

    value.fields-include

    Specify whether the partition key columns should be present in the payload values sent to Pulsar.

    For an example of how the key.fields, key.format, and value.fields-include arguments work together, see the examples in the Key and Value Formats section in the Apache Flink documentation.

    Scan Startup Mode

    scan.startup.mode

    Specifies where in the topic to start reading data when the connection is first started, or when it’s restarted with the state discarded.

    • latest: Start reading data from the latest available point in the stream.

    • earliest: Start reading data from the earliest available point in the stream.

    pulsar.reader.receiverQueueSize

    Size of the Pulsar reader’s receiver queue. A value higher than the default value increases consumer throughput, though at the expense of more memory utilization.

    Defaults to 1000.

    This configuration is currently available only through the CLI.
  2. Select the stream that you’d like to connect to this connector. Then, select Next.

  3. Define the connection’s schema. Decodable can automatically populate the connection’s schema using Pulsar’s schema registry. In most cases, you’ll want to select Schema Registry to automatically populate the connection’s schema. However, if you would like to manually enter the schema, select New Schema or Structured Schema Definition.

    1. If you want to automatically populate the connection schema using Pulsar’s schema registry, you’ll also need to provide the name of your Pulsar tenant, the namespace where your topic is, and the version of the schema that you’d like to use.

  4. Select Next when you are finished providing defining the connection’s schema.

  5. Give the newly created connection a Name and Description and select Save.

Connector starting state and offsets

When you create a connection, or restart it and discard state, it will read from the position in the topic as defined by scan startup mode. By default this is latest and will therefore read from the end of the topic.

Learn more about starting state here.

Working with OAuth: StreamNative example

When using the OAuth authentication method, you need to additionally provide the Client ID, Client Secret, Token Issuer URL, and in some cases, the Token Audience for the service account that Decodable uses to contact your remote Pulsar service. The following example walks through how to find these values in StreamNative, but you can use a different Apache Pulsar service of your choosing.

Perform the following steps in the StreamNative web console to find the Client ID, Client Secret, and the Token Issuer URL.

  1. Select Service Accounts, and download the Key File for the service account that you want to use.

  2. Open the key file, and copy the values for the client_id, client_secret, and issuer_url fields.

Perform the following steps in the StreamNative web console to find the OAuth audience token.

  1. Select Pulsar Clients.

  2. Fill out the Pulsar Client Setup form. It doesn’t matter which programming language you select in the StreamNative web console. The OAuth audience token will be returned as a hard-coded variable in Step 5.