Imply Polaris sink connector

Features

Connector name

polaris

Delivery guarantee

At least once

Supported task sizes

S, M, L

Multiplex capability

A single instance of this connector can write to a single Polaris table

Supported stream types

Configuration properties

Property Description Required Default

polaris.connection-name

The name of the push_streaming connection associated with the Polaris table you want to send data to. If you want to send data to a Polaris v1 table, then this is the same as the Table ID.

Yes

polaris.org-name

The name of the organization containing the Polaris table you want to send data to.

Yes

polaris.client-id

The OAuth client ID obtained from Polaris.

Yes

polaris.client-secret

The client secret associated with the OAuth client ID.

Yes

polaris.cloud

The cloud service provider for your Polaris project. Currently, only aws is supported.

Yes

polaris.region

The region that the Polaris project is in.

The following regions are supported:

  • us-east-1

  • eu-central-1

Yes

Prerequisites

Before you send data from Decodable into Imply Polaris, do the following in your Polaris account to make sure that you are able to create a connection to it.

  • Create a table in Polaris. See Create a table by API for instructions on how to create a table.

  • Create a Polaris push_streaming connection and a streaming job to your Polaris table. The input schema specified in the streaming job and the schema of the Decodable stream that’s sending data to Polaris must match. Otherwise, your data may not appear in the Polaris table. See Create a table by API for instructions on how to create Polaris connections and jobs.

    • Make sure that an event timestamp field exists in the Decodable stream schema. You must map the timestamp field in Decodable to the timestamp field in the Polaris streaming job.

Example

In this example, assume that we’ve already created a Polaris table called http_requests with the following schema:

       |        |               |               |
__time | method | original_path | response_code | bytes_rcvd
       |        |               |               |

In addition, we’ve also already created a Decodable stream called http_requests_stream with the following schema:

          |        |               |               |
timestamp | method | original_path | response_code | bytes_rcvd
          |        |               |               |
Notice in this example that the timestamp field is called _time in the Polaris table and timestamp in the Decodable stream. We will create a mapping between these two fields in the streaming job as part of step 1.
  1. Create a push_streaming connection and a streaming job using the Polaris REST API.

    curl --location --request POST 'https://ORGANIZATION_NAME.api.imply.io/v2/connections' \
         --header 'Authorization: Bearer $IMPLY_TOKEN' \
         --header 'Content-Type: application/json' \
         --data-raw '{
           "type": "push_streaming",
           "name": "demo_connection"
         }'
    curl --location --request POST 'https://ORGANIZATION_NAME.api.imply.io/v2/jobs' \
          --header "Authorization: Bearer $IMPLY_TOKEN" \
          --header 'Content-Type: application/json' \
          --data-raw '{
              "type": "streaming",
              "target": {
                  "type": "table",
                  "tableName": "http_requests"
              },
              "source": {
                  "type": "connection",
                  "connectionName": "demo_connection",
                  "inputSchema": [
                      {
                          "name": "timestamp",
                          "dataType": "string"
                      },
                      {
                          "name": "method",
                          "dataType": "string"
                      },
                      {
                          "name": "original_path",
                          "dataType": "string"
                      },
                      {
                          "name": "response_code",
                          "dataType": "long"
                      },
                      {
                          "name": "bytes_rcvd",
                          "dataType": "long"
                      }
                  ],
                  "formatSettings": {
                      "format": "nd-json"
                  }
              },
              "mappings": [
                  {
                      "columnName": "__time",
                      "expression": "TIME_PARSE(\"timestamp\")"
                  },
                  {
                      "columnName": "method",
                      "expression": "\"method\""
                  },
                  {
                      "columnName": "original_path",
                      "expression": "\"original_path\""
                  },
                  {
                      "columnName": "response_code",
                      "expression": "\"response_code\""
                  },
                  {
                      "columnName": "bytes_rcvd",
                      "expression": "\"bytes_rcvd\""
                  }
              ]
          }'
  2. Create a connection to Imply Polaris using the Decodable API:

    curl --request POST \
         --url https://<your_decodable_organization>.api.decodable.co/v1alpha2/connections \
         --header 'Accept: application/json' \
         --header 'Authorization: Bearer $DECODABLE_AUTH_TOKEN' \
         --header 'Content-Type: application/json' \
         --data '
          {
               "connector": "polaris",
               "properties": {
                    "polaris.client-id": "<your_client_ID>",
                    "polaris.connection-name": "<your_connection_name>",
                    "polaris.org-name": "<your_org_name>",
                    "polaris.client-secret": "<your_client_secret>",
                    "polaris.cloud": "<cloud_provider">,
                    "polaris.region": "<region>"
               },
               "name": "imply_polaris_demo",
               "description": "Send data to Imply Polaris from Decodable",
               "type": "sink",
               "stream_id": "<http_requests_stream ID>"
          }
          '
  3. Start the imply_polaris_demo connection.

Connector starting state and offsets

A new sink connection will start reading from the Latest point in the source Decodable stream. This means that only data that’s written to the stream when the connection has started will be sent to the external system. You can override this when you start the connection to Earliest if you want to send all the existing data on the source stream to the target system, along with all new data that arrives on the stream.

When you restart a sink connection it will continue to read data from the point it most recently stored in the checkpoint before the connection stopped. You can also opt to discard the connection’s state and restart it afresh from Earliest or Latest as described above.

Learn more about starting state here.