Apache Iceberg sink connector

Use the Apache Iceberg connector to send data to an Iceberg table.

Features

Delivery guarantee

Exactly once

Compatibility

This connector currently supports the following catalog and data warehouse options:

  • Catalog: AWS Glue

  • Data Warehouse: Amazon S3

Prerequisites

Access to your AWS resources

Decodable interacts with resources in AWS on your behalf. To do this you need an IAM role configured with a trust policy that allows access from Decodable’s AWS account, and a permission policy as detailed below.

For more details on how this works, how to configure the trust policy, and example steps to follow see here.

To use this connector you must associate a permissions policy with the IAM role. This policy must have the following permissions:

  • Read/Write access to the S3 bucket path to which you’re writing data.

    s3:PutObject
    s3:GetObject
    s3:DeleteObject

    If you want to send data directly at the root level of the bucket, then leave the path blank with the trailing /* included.

  • List access on the bucket to which you’re writing data

    s3:ListBucket
  • Read and write permissions for AWS Glue.

    glue:CreateDatabase
    glue:GetDatabase
    glue:CreateTable
    glue:UpdateTable
    glue:GetTable
  • Sample Permission Policy
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "glue:CreateDatabase",
                    "glue:GetDatabase",
                    "glue:CreateTable",
                    "glue:UpdateTable",
                    "glue:GetTable"
                ],
                "Resource": [
                    "arn:aws:glue:<region>:<AWS account id>:catalog",
                    "arn:aws:glue:<region>:<AWS account id>:database/<catalog-database>",
                    "arn:aws:glue:<region>:<AWS account id>:table/<catalog-database>/*"
                ]
            },
            {
                "Effect": "Allow",
                "Action": [
                    "s3:PutObject",
                    "s3:GetObject",
                    "s3:DeleteObject",
                    "s3:ListBucket"
                ],
                "Resource": [
                    "arn:aws:s3:::<s3 warehouse path>/*"
                ]
            }
        ]
    }

    In this example, replace:

    • <region> with the AWS region of the Glue catalog.

    • <AWS account id> with your AWS account ID.

    • <s3 warehouse path> and <catalog-database> with appropriate values.

Steps

If you want to use the Decodable CLI or API to create the connection, you can refer to the Property Name column for information about what the underlying property names are. The connector name is iceberg.
  1. From the Connections page, select the Iceberg Connector and complete the following fields.

    UI Field Property Name Description

    Connection Type

    N/A

    Select Sink to use this connector to send data into the database provided.

    Warehouse path

    warehouse

    The file path to the S3 bucket or folder that you want to send data to.

    For example: s3://bucket/folder.

    Database name

    catalog-database

    The name of the database in your Iceberg catalog. This is the name that you added permissions for as part of the prerequisites.

    If a database with this name doesn’t exist, Decodable creates it.

    Catalog type

    catalog-type

    The catalog responsible for managing the metadata associated with Iceberg tables.

    Currently, only AWS Glue is supported. If you are using the Decodable CLI to create this connection, enter glue for this value.

    IAM Role ARN

    role-arn

    The AWS ARN of the IAM role.

    For example, arn:aws:iam::111222333444:role/decodable-s3-access.

    AWS Region

    region

    The AWS region of the AWS Glue catalog.

    Format

    format

    The format for data in Amazon S3. The following formats are supported:

    • parquet

    • avro

    • orc

    Defaults to parquet.

    Table Format Version

    format-version

    The Iceberg table specification version used for output files:

    • 1

    • 2 (default)

    See the note below regarding versions.

  2. Select the streams containing the data that you’d like to send to Iceberg.

    Decodable automatically creates an Iceberg table for each stream selected, and performs schema mapping between the Decodable stream and the Iceberg table. See Data types mapping for how Decodable types map to Iceberg types.

    Then, select Next.

  3. (Optional) Give the destination tables a name.

    By default, Decodable uses the same name for the destination table as the stream. For example, if you are sending data from a stream called users, then the destination Iceberg table is given the name users.

  4. Give the newly created connection a Name and Description and select Save.

Upon starting this connection, you can use it to send data to your Iceberg table. If you are sending data from a change stream to an Iceberg table, then upsert mode is used. Otherwise, append mode is used.

Connector notes

Existing tables in Iceberg

If the destination table exists, its schema must exactly match the schema of the table that the connector would have created itself.

If the schema of the existing table doesn’t match, you’ll get this error:

Failure: table already exists with different schema.

To resolve this, you have two options:

  1. Remove the table from Iceberg and let Decodable recreate it in the schema that matches the stream. This may not be desirable if you have data in the table already.

  2. Make the source stream’s schema match that of the target Iceberg table by using a pipeline to process the existing stream into a new one which is then used as the source for your connection to Iceberg.

Removing streams from an existing connection

If you remove streams from an existing connection, when the connection is next restarted, it will fail to start.

To mitigate this issue, you have several options.

  1. You can restart the connection and discard the current state, and choose earliest or latest to read data from. The connection will then read all records from the beginning of the source streams, or the end of the source streams respectively.

    The impact of this is that with earliest you reprocess data and so can end up with duplicates. With latest you may skip data.

  2. If duplicate or missing data is unacceptable, do the following:

    1. Pause the connection/pipeline that writes to the streams that you want to retain in the connection

    2. Make sure that the connection has caught up on all records.

    3. Create a new version of the connection that only reads from the remaining streams.

    4. Start the new version of the connection from latest

    5. Start the source connection/pipelines that you previously paused.

Iceberg table spec version

This connector defaults to using version 2 of the Iceberg table specification.

Support for version 1 is provided for backwards compatibility. Note that version 1 has limited support for change streams. It doesn’t support consuming deletion records and will fail at runtime if it encounters one. In addition, updates cause entire files to be rewritten, increasing cost and decreasing performance. We therefore recommend using version 2 when working with change streams. If necessary, use a Pipeline to convert the stream into an append stream with the TO_APPEND function.

Connector starting state and offsets

A new sink connection will start reading from the Latest point in the source Decodable stream. This means that only data that’s written to the stream when the connection has started will be sent to the external system. You can override this when you start the connection to Earliest if you want to send all the existing data on the source stream to the target system, along with all new data that arrives on the stream.

When you restart a sink connection it will continue to read data from the point it most recently stored in the checkpoint before the connection stopped. You can also opt to discard the connection’s state and restart it afresh from Earliest or Latest as described above.

Learn more about starting state here.

Data types mapping

The following table describes the mapping of Decodable data types to their Iceberg data type counterparts.

Decodable Type Iceberg Type

BOOLEAN

boolean

TINYINT

integer

SMALLINT

integer

INTEGER

integer

BIGINT

long

FLOAT

float

DOUBLE

double

CHAR

string

VARCHAR

string

STRING

string

BINARY

binary

VARBINARY

fixed

DECIMAL

decimal

DATE

date

TIME

time

TIMESTAMP

timestamp without timezone

TIMESTAMP_LTZ

timestamp with timezone

ARRAY

list

MAP

map

MULTISET

map

ROW

struct

RAW

Not supported

INTERVAL

Not supported

STRUCTURED

Not supported

TIMESTAMP WITH TIMEZONE

Not supported

DISTINCT

Not supported

NULL

Not supported

SYMBOL

Not supported

LOGICAL

Not supported