Amazon S3 source connector

Use the Amazon S3 Connector to get data from an Amazon S3 bucket into Decodable. If you are looking for information about how to create a connection to get data into Amazon S3 from Decodable, see Amazon S3 sink connector in the Connect to a data destination chapter.

Features

Delivery guarantee

Exactly once

Prerequisites

Before you can create the Amazon S3 connection, you must have an Identity and Access Management (IAM) role with the following policies. See the Setting up an IAM user section for more information.

  • A Trust Policy that allows access from Decodable’s AWS account. The ExternalId must match your Decodable account name.

  • A Permissions Policy with read and write permissions for the S3 bucket.

For better performance, consider enabling an SQS queue to receive notifications about when new files are written to the S3 bucket. See SQS prerequisites for more information.

Setting up an IAM user

The following is an example of what the Trust Policy should look like. Replace the <MY_DECODABLE_ACCOUNT_NAME> with your own. In the example, 671293015970 is Decodable’s AWS account ID and cannot be changed.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "AWS": "arn:aws:iam::671293015970:root"
      },
      "Action": "sts:AssumeRole",
      "Condition": {
        "StringEquals": {
          "sts:ExternalId": "<MY_DECODABLE_ACCOUNT_NAME>"
        }
      }
    }
  ]
}

You must also have read and write permissions on the S3 bucket. See the following list of permissions and replace <YOUR_BUCKET>, <YOUR_ROLE_ARN>, and /some/dir appropriately.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "AWS": "arn:aws:iam::<YOUR_ROLE_ARN>"
      },
      "Action": ["s3:PutObject", "s3:GetObject", "s3:DeleteObject"],
      "Resource": "arn:aws:s3:::<YOUR_BUCKET>/some/dir/*"
    },
    {
      "Effect": "Allow",
      "Principal": {
        "AWS": "arn:aws:iam::<YOUR_ROLE_ARN>"
      },
      "Action": ["s3:ListBucket"],
      "Resource": "arn:aws:s3:::<YOUR_BUCKET>"
    }
  ]
}

SQS prerequisites

By default, the Amazon S3 connector monitors files in the specified bucket by scanning the entire bucket at an interval set by the user. However, for S3 buckets with a large number of files, this approach can be costly and inefficient. An alternative solution is to configure the Amazon S3 connector to detect and pick up new files using Amazon SQS (Simple Queue Service). This queue must be configured separately to receive event notifications for new files from the S3 bucket.

When this option is enabled, the connector does an initial full bucket scan of the S3 bucket after it is started for the first time or every time state is discarded on startup. Following this initial scan, the connector exclusively discovers new files to read from the file notifications arriving in the SQS queue. This option is recommended for long-lived and high-scale workloads.

During connection creation, you can disable the initial full bucket scan by setting the source.scan-on-startup property to false.

The following prerequisites apply if you want to get data from an SQS-enabled S3 bucket:

  1. You must have a standard queue type with the following access policy.

    {
      "Version": "2012-10-17",
      "Id": "example-ID",
      "Statement": [
        {
          "Sid": "example-statement-ID",
          "Effect": "Allow",
          "Principal": {
            "Service": "s3.amazonaws.com"
          },
          "Action": "SQS:SendMessage",
          "Resource": "arn:aws:sqs:<YOUR_SQS_QUEUE_ARN_HERE>",
          "Condition": {
            "ArnLike": {
              "aws:SourceArn": "arn:aws:s3:*:*:*"
            }
          }
        }
      ]
    }
  2. Update your IAM role to include a permissions policy for SQS.

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "Statement1",
          "Effect": "Allow",
          "Action": [
            "sqs:ReceiveMessage",
            "sqs:DeleteMessage"
          ],
          "Resource": [
            "arn:aws:sqs:<YOUR_SQS_QUEUE_ARN_HERE>"
          ]
        }
      ]
    }
  3. Configure your S3 bucket to send event notification messages to the SQS queue you configured in Step 1 whenever new files arrive that match your path prefix.

    1. When configuring event notifications, make sure that you select the check box for All object create events under Event Types. For the best performance, do not select any other event types. See Enabling and configuring event notifications using the Amazon S3 console in the AWS documentation for information on how to enable notification messages.

      Once activated, this connection consumes and deletes messages from your queue during file processing. It is not recommended for other applications to share this same SQS queue.

Steps

  1. If you have an existing connection using the older version of the Amazon S3 Connector and would like to switch to the newest version, then do the following steps. If you are not upgrading an existing connection, skip these steps.

    1. Stop the existing Amazon S3 connection.

    2. Stop any pipelines that are using it.

  2. From the Connections page, select the Amazon S3 Connector and complete the following fields.

    If you want to use the Decodable CLI or API to create the connection, you can refer to the Property Name column for information about what the underlying property names are. The connector name is s3-v2 (not s3, which is a deprecated version of the connector).
    UI Field Property Name Description

    AWS Region

    region

    Optional. The AWS region that your S3 bucket is located in. If not specified, defaults to your Decodable account region. For example, us-west-2.

    Path

    path

    The file path to the bucket or directory that you want to send data to.

    For example, s3://bucket/directory.

    IAM Role ARN

    role-arn

    The AWS ARN of the IAM role.

    For example, arn:aws:iam::111222333444:role/decodable-s3-access.

    Partition Template

    partition-cols

    Optional. The field names that you want to use to partition your data.

    For example, if you want to partition your data based on the datetime field, then enter datetime.

    Value Format

    format

    The format for data in the Amazon S3 source. You can select one of the following:

    -JSON: See JSON format properties for information on what additional properties you can specify when using JSON format.

    - Parquet: See Parquet format properties for information on what additional properties you can specify when using Parquet format.

    - Avro: Select this option if you want to read Avro data based on an Avro schema.

    Source polling frequency

    source.monitor-interval

    How often to scan the S3 bucket for new files. If are using SQS to receive notifications about when new events are added to the S3 bucket, this will be how often events are polled from the queue.

    Defaults to 10 seconds.

    SQS URL

    source.sqs-url

    The SQS queue URL.

    This field is only required if you are connecting to an SQS-enabled S3 bucket.

    N/A

    source.scan-on-startup

    Optional. This field is only applicable when source.sqs-url is set.

    Specify whether to perform an initial bucket scan when the connection is first started. When set to true, the connection scans the S3 bucket and ingests any historical data it contains. When set to false, the initial scan is skipped and only new data received after the SQS queue begins to receive file notifications is ingested.

    Defaults to true.

  3. Select which stream to send the Amazon S3 records to. Then, select Next.

  4. Give the newly created connection a Name and Description and select Save.

  5. If you are replacing an existing Amazon S3 connection, then restart any pipelines that were processing data for the previous connection.

Reference

JSON format properties

The following properties are only applicable when format=json.

Property Required? Description

json.timestamp-format.standard

Optional

Specify the timestamp format for TIMESTAMP and TIMESTAMP_LTZ types. Defaults to SQL
SQL will use a "yyyy-MM-dd HH:mm:ss.SSS" format, e.g "2020-12-30 12:13:14.123"ISO-8601 will parse input TIMESTAMP in "yyyy-MM-ddTHH:mm:ss.SSS" format, e.g "2020-12-30T12:13:14.123"

json.encode.decimal-as-plain-number

Optional

Must be true or false, defaults to false.
When true, always encode numbers without scientific notation.
For example, a number encoded 2.7E-8 by default would be encoded 0.000000027.

Parquet format properties

The following properties are only applicable when format=parquet.

Property Description

parquet.compression

Options are SNAPPY, GZIP and LZO. Defaults to no compression.

Other parquet options are available. See ParquetOutputFormat for more information.