OpenSearch sink connector

Features

Connector name

opensearch

Compatibility

  • OpenSearch 1.x

  • OpenSearch 2.x

Delivery guarantee

At least once

Supported task sizes

S, M, L

Multiplex capability

A single instance of this connector can write a single Decodable stream to one or more OpenSearch indices.

Supported stream types

Configuration properties

Property Description Required Default

hosts

A semicolon-delimited list of OpenSearch hosts to connect to.

For example: http://node-1:9092;http://node-2:9093

Yes

aws.opensearch-serverless

Option to indicate if you are connecting to an AWS OpenSearch Serverless collection. Either true or false.

See Data Access Policy for AWS Opensearch Serverless for how to set up access to your Serverless collection.

index

The name of the OpenSearch index to send data to.

If the index doesn’t exist, Decodable will create it for you.

Decodable supports static and dynamic index creation. See Types of Index.

Yes

username

The username to connect to your OpenSearch cluster.

password

The authentication token associated with your OpenSearch password.

This must be provided as a secret resource.

aws.iam-role-arn

ARN of the IAM Role in your AWS account used to connect to OpenSearch.

See Set up IAM Authentication to AWS Opensearch for how to set up your IAM role.

Only required when aws.opensearch-serverless = true

aws.region

The AWS region for your OpenSearch cluster/collection.

Only required when aws.opensearch-serverless = true

Advanced

sink.bulk-flush.max-actions

Maximum number of buffered records per request.

1000

sink.bulk-flush.max-size

Maximum size of buffered records per request (in MB)

2

sink.bulk-flush.interval

The interval to flush buffered records (in seconds).

1

Types of Index

Static Index

A single target index that all records will be continuously written into represented by a plain string index name. For example, users.

Dynamic Index

A dynamic index includes {field_name} in the index name to reference a field value and dynamically generate a target index for each record.

The dynamic index name can also be of the form {field_name|date_format_string}, where date_format_string is a format pattern compatible with Java’s DateTimeFormatter.

For example:

  • users-at-{ts|yyyy-MM-dd}, where ts is a timestamp field in the connector schema and yyyy-MM-dd is an optional date format pattern.

  • index-for-user-{user}

When using a dynamic index generated by the current system time with a change stream, there is no guarantee that the records with the same primary key can generate the same index name. Therefore, if you are using a dynamic index based on the system time you should only use an append stream.

Connector starting state and offsets

A new sink connection will start reading from the Latest point in the source Decodable streams. This means that only data that’s written to the stream when the connection has started will be sent to the external system. You can override this when you start the connection to Earliest if you want to send all the existing data on the source streams to the target system, along with all new data that arrives on the streams.

When you restart a sink connection it will continue to read data from the point it most recently stored in the checkpoint before the connection stopped. You can also opt to discard the connection’s state and restart it afresh from Earliest or Latest as described above.

Learn more about starting state here.

Set up IAM Authentication to AWS Opensearch

Access to your AWS resources

Decodable interacts with resources in AWS on your behalf. To do this you need an IAM role configured with a trust policy that allows access from Decodable’s AWS account, and a permission policy as detailed below.

For more details on how this works, how to configure the trust policy, and example steps to follow see here.

To use this connector you must associate a permissions policy with the IAM role. This policy must have the following permissions:

  • Full API access - for Opensearch Service, this would be:

    es:ESHttpPost
    es:ESHttpDelete
    es:ESHttpPut
    es:ESHttpPatch
    es:ESHttpGet
    es:ESHttpHead

    for OpenSearch Serverless, this would be:

    aoss:APIAccessAll
Sample Permission Policy (Opensearch Service)
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "es:ESHttpPost",
        "es:ESHttpDelete",
        "es:ESHttpPut",
        "es:ESHttpPatch",
        "es:ESHttpGet",
        "es:ESHttpHead"
      ],
      "Resource": "arn:aws:es:<region>:<account>:domain/my-domain"
    }
  ]
}
Sample Permission Policy (Opensearch Serverless)
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "aoss:APIAccessAll"
      ],
      "Resource": "arn:aws:aoss:<region>:<account>:collection/my-collection"
    }
  ]
}

Data Access Policy for AWS Opensearch Serverless

You must grant the following permissions to your IAM role via a data access policy to allow Decodable to send data to your collection:

aoss:CreateCollectionItems
aoss:UpdateCollectionItems
aoss:DescribeCollectionItems
aoss:CreateIndex
aoss:UpdateIndex
aoss:DescribeIndex
aoss:WriteDocument