Apache Iceberg sink connector Use the Apache Iceberg connector to send data to an Iceberg table. Features Delivery guarantee Exactly once Compatibility This connector currently supports the following catalog and data warehouse options: Catalog: AWS Glue Data Warehouse: Amazon S3 Prerequisites Access to your AWS resources Decodable interacts with resources in AWS on your behalf. To do this you need an IAM role configured with a trust policy that allows access from Decodable’s AWS account, and a permission policy as detailed below. For more details on how this works, how to configure the trust policy, and example steps to follow see here. To use this connector you must associate a permissions policy with the IAM role. This policy must have the following permissions: Read/Write access to the S3 bucket path to which you’re writing data. s3:PutObject s3:GetObject s3:DeleteObject If you want to send data directly at the root level of the bucket, then leave the path blank with the trailing /* included. List access on the bucket to which you’re writing data s3:ListBucket Read and write permissions for AWS Glue. glue:CreateDatabase glue:GetDatabase glue:CreateTable glue:UpdateTable glue:GetTable Sample Permission Policy { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "glue:CreateDatabase", "glue:GetDatabase", "glue:CreateTable", "glue:UpdateTable", "glue:GetTable" ], "Resource": [ "arn:aws:glue:<region>:<AWS account id>:catalog", "arn:aws:glue:<region>:<AWS account id>:database/<catalog-database>", "arn:aws:glue:<region>:<AWS account id>:table/<catalog-database>/*" ] }, { "Effect": "Allow", "Action": [ "s3:PutObject", "s3:GetObject", "s3:DeleteObject", "s3:ListBucket" ], "Resource": [ "arn:aws:s3:::<s3 warehouse path>/*" ] } ] } In this example, replace: <region> with the AWS region of the Glue catalog. <AWS account id> with your AWS account ID. <s3 warehouse path> and <catalog-database> with appropriate values. Steps If you want to use the Decodable CLI or API to create the connection, you can refer to the Property Name column for information about what the underlying property names are. The connector name is iceberg. From the Connections page, select the Iceberg Connector and complete the following fields. UI Field Property Name Description Connection Type N/A Select Sink to use this connector to send data into the database provided. Warehouse path warehouse The file path to the S3 bucket or folder that you want to send data to. For example: s3://bucket/folder. Database name catalog-database The name of the database in your Iceberg catalog. This is the name that you added permissions for as part of the prerequisites. If a database with this name doesn’t exist, Decodable creates it. Catalog type catalog-type The catalog responsible for managing the metadata associated with Iceberg tables. Currently, only AWS Glue is supported. If you are using the Decodable CLI to create this connection, enter glue for this value. IAM Role ARN role-arn The AWS ARN of the IAM role. For example, arn:aws:iam::111222333444:role/decodable-s3-access. AWS Region region The AWS region of the AWS Glue catalog. Format format The format for data in Amazon S3. The following formats are supported: parquet avro orc Defaults to parquet. Table Format Version format-version The Iceberg table specification version used for output files: 1 2 (default) See the note below regarding versions. Select the streams containing the data that you’d like to send to Iceberg. Decodable automatically creates an Iceberg table for each stream selected, and performs schema mapping between the Decodable stream and the Iceberg table. See Data types mapping for how Decodable types map to Iceberg types. Then, select Next. (Optional) Give the destination tables a name. By default, Decodable uses the same name for the destination table as the stream. For example, if you are sending data from a stream called users, then the destination Iceberg table is given the name users. Give the newly created connection a Name and Description and select Save. Upon starting this connection, you can use it to send data to your Iceberg table. If you are sending data from a change stream to an Iceberg table, then upsert mode is used. Otherwise, append mode is used. Connector notes Existing tables in Iceberg If the destination table exists, its schema must exactly match the schema of the table that the connector would have created itself. If the schema of the existing table doesn’t match, you’ll get this error: Failure: table already exists with different schema. To resolve this, you have two options: Remove the table from Iceberg and let Decodable recreate it in the schema that matches the stream. This may not be desirable if you have data in the table already. Make the source stream’s schema match that of the target Iceberg table by using a pipeline to process the existing stream into a new one which is then used as the source for your connection to Iceberg. Removing streams from an existing connection If you remove streams from an existing connection, when the connection is next restarted, it will fail to start. To mitigate this issue, you have several options. You can restart the connection and discard the current state, and choose earliest or latest to read data from. The connection will then read all records from the beginning of the source streams, or the end of the source streams respectively. The impact of this is that with earliest you reprocess data and so can end up with duplicates. With latest you may skip data. If duplicate or missing data is unacceptable, do the following: Pause the connection/pipeline that writes to the streams that you want to retain in the connection Make sure that the connection has caught up on all records. Create a new version of the connection that only reads from the remaining streams. Start the new version of the connection from latest Start the source connection/pipelines that you previously paused. Iceberg table spec version This connector defaults to using version 2 of the Iceberg table specification. Support for version 1 is provided for backwards compatibility. Note that version 1 has limited support for change streams. It doesn’t support consuming deletion records and will fail at runtime if it encounters one. In addition, updates cause entire files to be rewritten, increasing cost and decreasing performance. We therefore recommend using version 2 when working with change streams. If necessary, use a Pipeline to convert the stream into an append stream with the TO_APPEND function. Connector starting state and offsets A new sink connection will start reading from the Latest point in the source Decodable stream. This means that only data that’s written to the stream when the connection has started will be sent to the external system. You can override this when you start the connection to Earliest if you want to send all the existing data on the source stream to the target system, along with all new data that arrives on the stream. When you restart a sink connection it will continue to read data from the point it most recently stored in the checkpoint before the connection stopped. You can also opt to discard the connection’s state and restart it afresh from Earliest or Latest as described above. Learn more about starting state here. Data types mapping The following table describes the mapping of Decodable data types to their Iceberg data type counterparts. Decodable Type Iceberg Type BOOLEAN boolean TINYINT integer SMALLINT integer INTEGER integer BIGINT long FLOAT float DOUBLE double CHAR string VARCHAR string STRING string BINARY binary VARBINARY fixed DECIMAL decimal DATE date TIME time TIMESTAMP timestamp without timezone TIMESTAMP_LTZ timestamp with timezone ARRAY list MAP map MULTISET map ROW struct RAW Not supported INTERVAL Not supported STRUCTURED Not supported TIMESTAMP WITH TIMEZONE Not supported DISTINCT Not supported NULL Not supported SYMBOL Not supported LOGICAL Not supported