Amazon S3 source connector

Use the Amazon S3 Connector to get data from an Amazon S3 bucket into Decodable. If you are looking for information about how to create a connection to get data into Amazon S3 from Decodable, see Amazon S3 sink connector in the Connect to a data destination chapter.

For better performance, consider enabling an Amazon SQS (Simple Queue Service) queue to receive notifications about when new files are written to the S3 bucket. See Detecting bucket changes with SQS for more information.

Features

Delivery guarantee

Exactly once

Prerequisites

Access to your AWS resources

Decodable interacts with resources in AWS on your behalf. To do this you need an IAM role configured with a trust policy that allows access from Decodable’s AWS account, and a permission policy as detailed below.

For more details on how this works, how to configure the trust policy, and example steps to follow see here.

To use this connector you must associate a permissions policy with the IAM role. This policy must have the following permissions:

  • Read access on the S3 bucket path from which you’re reading data.

    s3:GetObject

    If you want to read data directly at the root level of the bucket, then leave the path blank with the trailing /* included.

  • List access on the bucket from which you’re reading data

    s3:ListBucket
  • Sample Permission Policy
    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::my_bucket/some/dir/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::my_bucket"
        }
      ]
    }

Detecting bucket changes with SQS

By default, the Amazon S3 connector monitors files in the specified bucket by scanning the entire bucket at an interval set by the user. However, for S3 buckets with a large number of files, this approach can be costly and inefficient. An alternative solution is to configure the Amazon S3 connector to detect and pick up new files using Amazon SQS. This queue must be configured to receive event notifications for new files from the S3 bucket.

When this option is enabled, the connector does an initial full bucket scan of the S3 bucket after it’s started for the first time or every time state is discarded on startup. Following this initial scan, the connector exclusively discovers new files to read from the file notifications arriving in the SQS queue. This option is recommended for long-lived and high-scale workloads.

During connection creation, you can disable the initial full bucket scan by setting the source.scan-on-startup property to false.

The following prerequisites apply if you want to get data from an SQS-enabled S3 bucket:

  1. You must have a standard queue type with the following access policy.

    {
      "Version": "2012-10-17",
      "Id": "example-ID",
      "Statement": [
        {
          "Sid": "example-statement-ID",
          "Effect": "Allow",
          "Principal": {
            "Service": "s3.amazonaws.com"
          },
          "Action": "SQS:SendMessage",
          "Resource": "arn:aws:sqs:<YOUR_SQS_QUEUE_ARN_HERE>",
          "Condition": {
            "ArnLike": {
              "aws:SourceArn": "arn:aws:s3:*:*:*"
            }
          }
        }
      ]
    }
  2. Update your IAM role to include a permissions policy for SQS.

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "Statement1",
          "Effect": "Allow",
          "Action": [
            "sqs:ReceiveMessage",
            "sqs:DeleteMessage"
          ],
          "Resource": [
            "arn:aws:sqs:<YOUR_SQS_QUEUE_ARN_HERE>"
          ]
        }
      ]
    }
  3. Configure your S3 bucket to send event notification messages to the SQS queue you configured in Step 1 whenever new files arrive that match your path prefix.

    1. When configuring event notifications, make sure that you select the check box for All object create events under Event Types. For the best performance, don’t select any other event types. See Enabling and configuring event notifications using the Amazon S3 console in the AWS documentation for information on how to enable notification messages.

      Once activated, this connection consumes and deletes messages from your queue during file processing. It’s not recommended for other applications to share this same SQS queue.

Steps

  1. If you have an existing connection using the older version of the Amazon S3 Connector and would like to switch to the newest version, then do the following steps. If you aren’t upgrading an existing connection, skip these steps.

    1. Stop the existing Amazon S3 connection.

    2. Stop any pipelines that are using it.

  2. From the Connections page, select the Amazon S3 Connector and complete the following fields.

    If you want to use the Decodable CLI or API to create the connection, you can refer to the Property Name column for information about what the underlying property names are. The connector name is s3-v2 (not s3, which is a deprecated version of the connector).
    UI Field Property Name Description

    AWS Region

    region

    Optional. The AWS region that your S3 bucket is located in. If not specified, defaults to your Decodable account region. For example, us-west-2.

    Path

    path

    The file path to the bucket or directory that you want to send data to.

    For example, s3://bucket/directory.

    IAM Role ARN

    role-arn

    The AWS ARN of the IAM role.

    For example, arn:aws:iam::111222333444:role/decodable-s3-access.

    Partition Template

    partition-cols

    Optional. The field names that you want to use to partition your data.

    For example, if you want to partition your data based on the datetime field, then enter datetime.

    Value Format

    format

    The format for data in the Amazon S3 source. You can select one of the following:

    -JSON: See JSON format properties for information on what additional properties you can specify when using JSON format.

    - Parquet: See Parquet format properties for information on what additional properties you can specify when using Parquet format.

    - Avro: Select this option if you want to read Avro data based on an Avro schema.

    Source polling frequency

    source.monitor-interval

    How often to scan the S3 bucket for new files. If are using SQS to receive notifications about when new events are added to the S3 bucket, this will be how often events are polled from the queue.

    Defaults to 10 seconds.

    SQS URL

    source.sqs-url

    The SQS queue URL.

    This field is only required if you are connecting to an SQS-enabled S3 bucket.

    N/A

    source.scan-on-startup

    Optional. This field is only applicable when source.sqs-url is set.

    Specify whether to perform an initial bucket scan when the connection is first started. When set to true, the connection scans the S3 bucket and ingests any historical data it contains. When set to false, the initial scan is skipped and only new data received after the SQS queue begins to receive file notifications is ingested.

    Defaults to true.

  3. Select which stream to send the Amazon S3 records to. Then, select Next.

  4. Give the newly created connection a Name and Description and select Save.

  5. If you are replacing an existing Amazon S3 connection, then restart any pipelines that were processing data for the previous connection.

Reference

JSON format properties

The following properties are only applicable when format=json.

Property Required? Description

json.timestamp-format.standard

Optional

Specify the timestamp format for TIMESTAMP and TIMESTAMP_LTZ types. Defaults to SQL
SQL will use a yyyy-MM-dd HH:mm:ss.SSS format, e.g "2020-12-30 12:13:14.123"ISO-8601 will parse input TIMESTAMP in yyyy-MM-ddTHH:mm:ss.SSS format, e.g "2020-12-30T12:13:14.123"

json.encode.decimal-as-plain-number

Optional

Must be true or false, defaults to false.
When true, always encode numbers without scientific notation.
For example, a number encoded 2.7E-8 by default would be encoded 0.000000027.

Parquet format properties

The following properties are only applicable when format=parquet.

Property Description

parquet.compression

Options are SNAPPY, GZIP and LZO. Defaults to no compression.

Other parquet options are available. See ParquetOutputFormat for more information.