Apache Iceberg sink connector

Features

Connector name

iceberg

Compatibility (Catalog)

AWS Glue

Compatibility (Object store)

Amazon S3

Delivery guarantee

Exactly once

Supported task sizes

M, L

Multiplex capability

A single instance of this connector can write to a single Iceberg table

Supported stream types

If you are sending data from a change stream to an Iceberg table, then upsert mode is used. Otherwise, append mode is used.

Configuration properties

Property Description Required Default

warehouse

The file path to the S3 bucket or folder to which you want to send data.

For example: s3://bucket/folder.

Yes

catalog-database

The name of the database in your Iceberg catalog. This is the name that you added permissions for as part of the prerequisites.

If a database with this name doesn’t exist, Decodable creates it.

Yes

catalog-type

The catalog responsible for managing the metadata associated with Iceberg tables.

Must be glue since currently only AWS Glue is supported.

Yes

role-arn

The AWS ARN of the IAM role.

For example, arn:aws:iam::111222333444:role/decodable-s3-access.

Yes

region

The AWS region of the AWS Glue catalog.

Yes

format

The format in which to write the data to the object store.

The following formats are supported:

  • parquet

  • avro

  • orc

parquet

format-version

The Iceberg table specification version used for output files:

  • 1

  • 2

See the note below regarding versions.

2

Prerequisites

Access to your AWS resources

Decodable interacts with resources in AWS on your behalf. To do this you need an IAM role configured with a trust policy that allows access from Decodable’s AWS account, and a permission policy as detailed below.

For more details on how this works, how to configure the trust policy, and example steps to follow see here.

To use this connector you must associate a permissions policy with the IAM role. This policy must have the following permissions:

  • Read/Write access to the S3 bucket path to which you’re writing data.

    s3:PutObject
    s3:GetObject
    s3:DeleteObject

    If you want to send data directly at the root level of the bucket, then leave the path blank with the trailing /* included.

  • List access on the bucket to which you’re writing data

    s3:ListBucket
  • Read and write permissions for AWS Glue.

    glue:CreateDatabase
    glue:GetDatabase
    glue:CreateTable
    glue:UpdateTable
    glue:GetTable
  • Sample Permission Policy
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "glue:CreateDatabase",
                    "glue:GetDatabase",
                    "glue:CreateTable",
                    "glue:UpdateTable",
                    "glue:GetTable"
                ],
                "Resource": [
                    "arn:aws:glue:<region>:<AWS account id>:catalog",
                    "arn:aws:glue:<region>:<AWS account id>:database/<catalog-database>",
                    "arn:aws:glue:<region>:<AWS account id>:table/<catalog-database>/*"
                ]
            },
            {
                "Effect": "Allow",
                "Action": [
                    "s3:PutObject",
                    "s3:GetObject",
                    "s3:DeleteObject",
                    "s3:ListBucket"
                ],
                "Resource": [
                    "arn:aws:s3:::<s3 warehouse path>/*"
                ]
            }
        ]
    }

    In this example, replace:

    • <region> with the AWS region of the Glue catalog.

    • <AWS account id> with your AWS account ID.

    • <s3 warehouse path> and <catalog-database> with appropriate values.

Existing tables in Iceberg

If the sink table exists, its schema must exactly match the schema of the table that the connector would have created itself.

If the schema of the existing table doesn’t match, you’ll get this error:

Failure: table already exists with different schema.

To resolve this, you have two options:

  1. Remove the table from Iceberg and let Decodable recreate it in the schema that matches the stream. This may not be desirable if you have data in the table already.

  2. Make the source stream’s schema match that of the target Iceberg table by using a pipeline to process the existing stream into a new one which is then used as the source for your connection to Iceberg.

Removing streams from an existing connection

If you remove streams from an existing connection, when the connection is next restarted, it will fail to start.

To mitigate this issue, you have several options.

  1. You can restart the connection and discard the current state, and choose earliest or latest to read data from. The connection will then read all records from the beginning of the source streams, or the end of the source streams respectively.

    The impact of this is that with earliest you reprocess data and so can end up with duplicates. With latest you may skip data.

  2. If duplicate or missing data is unacceptable, do the following:

    1. Pause the connection/pipeline that writes to the streams that you want to retain in the connection

    2. Make sure that the connection has caught up on all records.

    3. Create a new version of the connection that only reads from the remaining streams.

    4. Start the new version of the connection from latest

    5. Start the source connection/pipelines that you previously paused.

Iceberg table spec version

This connector defaults to using version 2 of the Iceberg table specification.

Support for version 1 is provided for backwards compatibility. Note that version 1 has limited support for change streams. It doesn’t support consuming deletion records and will fail at runtime if it encounters one. In addition, updates cause entire files to be rewritten, increasing cost and decreasing performance. We therefore recommend using version 2 when working with change streams. If necessary, use a Pipeline to convert the stream into an append stream with the TO_APPEND function.

Connector starting state and offsets

A new sink connection will start reading from the Latest point in the source Decodable stream. This means that only data that’s written to the stream when the connection has started will be sent to the external system. You can override this when you start the connection to Earliest if you want to send all the existing data on the source stream to the target system, along with all new data that arrives on the stream.

When you restart a sink connection it will continue to read data from the point it most recently stored in the checkpoint before the connection stopped. You can also opt to discard the connection’s state and restart it afresh from Earliest or Latest as described above.

Learn more about starting state here.

Data types mapping

The following table describes the mapping of Decodable data types to their Iceberg data type counterparts.

Decodable Type Iceberg Type

BOOLEAN

boolean

TINYINT

integer

SMALLINT

integer

INTEGER

integer

BIGINT

long

FLOAT

float

DOUBLE

double

CHAR

string

VARCHAR

string

STRING

string

BINARY

binary

VARBINARY

fixed

DECIMAL

decimal

DATE

date

TIME

time

TIMESTAMP

timestamp without timezone

TIMESTAMP_LTZ

timestamp with timezone

ARRAY

list

MAP

map

MULTISET

map

ROW

struct

RAW

Not supported

INTERVAL

Not supported

STRUCTURED

Not supported

TIMESTAMP WITH TIMEZONE

Not supported

DISTINCT

Not supported

NULL

Not supported

SYMBOL

Not supported

LOGICAL

Not supported