Amazon S3 sink connector

Features

Connector name

s3-v2

Delivery guarantee

Exactly once

Supported task sizes

S, M, L

Multiplex capability

A single instance of this connector can write to all files in a single S3 bucket

Supported stream types

Append Stream

Configuration properties

Property Description Required Default

region

The AWS region that your S3 bucket is located in. For example, us-west-2.

If not specified, defaults to your Decodable account region.

path

The file path to the bucket or directory that you want to write data to.

For example, s3://bucket/directory.

Yes

role-arn

The AWS ARN of the IAM role that has permissions to access the S3 bucket.

For example, arn:aws:iam::111222333444:role/decodable-s3-access.

Yes

format

The format of data to write to S3. Must be one of the following:

  • json

  • parquet

  • avro

Yes

partition-cols

The field names that partition the data.

For example, if you want to partition your data based on the datetime field, then enter datetime.

JSON-specific configuration

json.timestamp-format.standard

Specify the timestamp format for TIMESTAMP and TIMESTAMP_LTZ types.

  • SQL will use a yyyy-MM-dd HH:mm:ss.SSS format, e.g "2020-12-30 12:13:14.123"

  • ISO-8601 will parse input TIMESTAMP in yyyy-MM-ddTHH:mm:ss.SSS format, e.g "2020-12-30T12:13:14.123"

SQL

json.encode.decimal-as-plain-number

When true, always encode numbers without scientific notation.
For example, a number encoded 2.7E-8 by default would be encoded 0.000000027.

false

Parquet-specific configuration

Other parquet options are available. See ParquetOutputFormat for more information.

parquet.compression

One of:

  • SNAPPY

  • GZIP

  • LZO

If no value is specified then the connector will default to no compression.

Writing data to S3

sink.rolling-policy.file-size

The maximum file size in S3.

If a file reaches this maximum size while Decodable is streaming data to it, then the file closes and a new file with the same object prefix is created.

128MB

sink.rolling-policy.rollover-interval

The maximum amount of time that a file in an S3 bucket can stay open.

If a file has been open for this length of time while Decodable is streaming data to it, then the file closes and a new file with the same object name prefix is created.

30 min

auto-compaction

When enabled, compacts many small files into fewer large files.

false

compaction.file-size

The maximum file size that compacted files can be.

The value of rolling-policy.file-size

Prerequisites

Access to your AWS resources

Decodable interacts with resources in AWS on your behalf. To do this you need an IAM role configured with a trust policy that allows access from Decodable’s AWS account, and a permission policy as detailed below.

For more details on how this works, how to configure the trust policy, and example steps to follow see here.

To use this connector you must associate a permissions policy with the IAM role. This policy must have the following permissions:

  • Write access on the S3 bucket path to which you’re writing data.

    s3:PutObject
    s3:GetObject
    s3:DeleteObject

    If you want to write data directly at the root level of the bucket, then leave the path blank with the trailing /* included.

  • List access on the bucket from which you’re writing data

    s3:ListBucket
  • Sample Permission Policy
    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::my_bucket"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:PutObject", "s3:GetObject", "s3:DeleteObject"],
          "Resource": "arn:aws:s3:::my_bucket/some/dir/*"
        }
    
      ]
    }

S3 object keys

S3 object key partitioning

You can partition the S3 Object key paths by value using the partition-cols connection property. When you specify a field to partition with, that field will be used as a prefix to organize the data in the S3 bucket.

For example, a common pattern is to partition based on a date, and a subsequent partition for an hour of the day. Given a schema with datetime=DATE and hour=INTEGER fields, by setting the partition-cols property to be datetime,hour the resulting bucket entries will look like:

s3://my-path/datetime=2023-01-01/hour=0/<file>
s3://my-path/datetime=2023-01-01/hour=1/<file>
...
s3://my-path/datetime=2023-01-01/hour=23/<file>
s3://my-path/datetime=2023-01-02/hour=0/<file>
The fields that are used as partition columns will be removed from the resulting payload in the file in S3. If you are using any query systems downstream that are relying on those fields, you will need to configure them to read the value from the file path instead.

S3 object key formation

The following is an example of what your object keys look like in S3. Let’s assume that your S3 Connection has the following configuration:

  • S3 bucket or path: my-awesome-bucket

  • Format: JSON

  • Partition template or partition-cols: datetime

  • Sink.rolling-policy.file-size: 5 minutes

  • Compression isn’t set.

When you start the connection, the S3 Connector opens a file with a name like part-123e4567-e89b-12d3-a456-426614174000-0.json in the datetime=01-25-2023 subfolder in the my-awesome-bucket S3 bucket and starts streaming data to that file. Once 5 minutes have elapsed, then the part-123e4567-e89b-12d3-a456-426614174000-0.json file is closed and a new file named part-123e4567-e89b-12d3-a456-426614174000-1.json is opened. The S3 Connector then starts sending data to this newly opened file instead.

In summary, the S3 object parts are joined as: <path>/<partition-col>=<value>/part-<unique-id>-<N>

Connector starting state and offsets

A new sink connection will start reading from the Latest point in the source Decodable streams. This means that only data that’s written to the stream when the connection has started will be sent to the external system. You can override this when you start the connection to Earliest if you want to send all the existing data on the source streams to the target system, along with all new data that arrives on the streams.

When you restart a sink connection it will continue to read data from the point it most recently stored in the checkpoint before the connection stopped. You can also opt to discard the connection’s state and restart it afresh from Earliest or Latest as described above.

Learn more about starting state here.