MySQL source connector

Features

Connector name

mysql-cdc

Delivery guarantee

Exactly once

Supported task sizes

S, M, L

Multiplex capability

A single instance of this connector can read from multiple tables and databases.

See Ingesting data from multiple tables.

Supported stream types

Change stream

Configuration properties

If you are using the CLI to create or edit a connection with this connector, you must use the declarative approach. You can generate the connection definition for the tables that you want to ingest using decodable connection scan.

Use the properties below to configure the connector. Refer to the documentation for instructions on creating and editing connections.

Property Description Required Default

Basic

host

The host of the MySQL server

Yes

port

The port of the MySQL server

3306

database-name

The name of the database

username

The username to use to authenticate to MySQL

Yes

password

The password associated with the username. This must be provided as a secret resource.

Yes

ssl-mode

Specifies how the connector should handle secure connection to the database server.

Must be one of the following:

  • DISABLED: Establish unencrypted connections.

  • PREFERRED: Establish encrypted connections if the server enabled them, otherwise fall back to unencrypted connections.

  • REQUIRED: Establish secure connections if the server enabled them, fail otherwise;

  • VERIFY_CA: Like REQUIRED but additionally verify the server TLS certificate against the configured Certificate Authority (CA) certificates.

  • VERIFY_IDENTITY: Like VERIFY_CA, but additionally verify that the server certificate matches the host to which the connection is attempted.

PREFERRED

Advanced

scan.startup.mode

Specifies where in the collection to start reading data when the connection is first started, or when it’s restarted with the state discarded. Must be one of the following:

  • initial: At startup, takes an initial snapshot of monitored database tables, then continuously reads the latest binlog entries thereafter.

  • latest-offset: Avoids taking an initial snapshot of monitored database tables upon startup. Instead, reads changes from the end of the binlog, capturing only the modifications made since the connector was initiated or restarted.

  • timestamp: Bypasses the snapshot phase and initiates reading binlog events directly from a specified timestamp.

initial

scan.startup.timestamp-millis

The timestamp to start reading data from, specified in milliseconds since Unix time

Required if scan.startup.mode is timestamp

server.timeZone

The global time zone of the database server. This controls how time-related values are read from your database.

This must be equivalent to the output of the following query on your database: SELECT @@global.time_zone;

UTC

Prerequisites

Ingesting data from multiple tables

A single instance of this connector can read from multiple tables and databases.

If you are using the CLI to create or edit a connection with this connector, you must use the declarative approach. You can generate the connection definition for the tables that you want to ingest using decodable connection scan.

Resource specifier keys

The following resource specifier keys are available:

Name Description

database-name

The database name

table-name

The table name

Metadata fields

If decodable connection scan is run with --opt with-metadata-fields=true then each of the following metadata fields will be added to the streams. These can be useful when mapping multiple tables to a single Stream, and when tracking record update times.

Name Description

_decodable_database

Source database

_decodable_table

Source table

_decodable_op_ts

Timestamp of ingestion.

Defaults to 1970-01-01 00:00:00.000 for all existing records in the upstream table. All records inserted or updated following the initial connection activation will be populated with the UTC-based timestamp of the operation.

The _decodable_database and _decodable_table metadata fields are added to the Stream primary key as well.

Binlog retention

If the connection is stopped or in a failed state for longer than the binlog’s retention period, the connection will fail when it’s restarted. This is because for CDC to work it needs a contiguous series of binlog entries.

If you want to restart the connection in this situation you must discard its current state. By doing this, the initial snapshot of the required tables will be taken again and then the binlog used for subsequent reads.

To do this do, one of the following:

  1. In the Decodable Web UI, select Start and under Starting State select Reset current state and start from the initial state

  2. In the Decodable CLI, do one of the following:

    1. Use connection activate and add the --force flag, for example:

      decodable connection activate cef0e708 --force

      or

    2. Use query with a suitable specifier for the connection (such as --name) and add the --operation reset-state argument, for example:

      decodable query --name customers-source --operation reset-state

Connector starting state and offsets

When you create a connection, or restart it and discard state, it will read from the database based on the configuration of the scan startup mode. By default this is initial and will therefore snapshot the set of monitored tables and read the binlog thereafter.

Learn more about starting state here.

Data types mapping

The following table shows the Decodable data types that are generated from the corresponding MySQL data types.

MySQL Type Decodable Type

TINYINT

TINYINT(>1)

TINYINT

SMALLINT

TINYINT

UNSIGNED

SMALLINT

INT

MEDIUMINT

SMALLINT

UNSIGNED

INT

BIGINT

INT

UNSIGNED

BIGINT

BIGINT

UNSIGNED

DECIMAL(20,0)

FLOAT

FLOAT

DOUBLE

DOUBLE

DECIMAL(p, s)

NUMERIC(p, s)

DEC(p, s)

FIXED(p, s)

DECIMAL(p,s)

BOOLEAN

TINYINT(1)

BIT(1)

BOOLEAN

BIT(>1)

BYTES

YEAR

INT

DATE

DATE

TIME[(p)]

TIME[(p)]

TIMESTAMP [(p)]

DATETIME [(p)]

TIMESTAMP[(p)]

CHAR(n)

CHAR(n)

VARCHAR(n)

VARCHAR(n)

BINARY(n)

BINARY(n)

VARBINARY(n)

VARBINARY(n)

TINYTEXT

TEXT

MEDIUMTEXT

LONGTEXT

STRING

TINYBLOB

BLOB

MEDIUMBLOB

LONGBLOB

BYTES

JSON

ENUM

SET

STRING

GEOMETRY

POINT

LINESTRING

POLYGON

MULTIPOINT

MULTILINESTRING

MULTIPOLYGON

A String representation:

{"srid": <>,
 "type": "<>",
 "coordinates": [<>, <>, ...]
}

where:

  • srid: the Spatial Reference System (SRS) in which the geometry is defined (defaults to 0 if no srid is specified in the spatial value)

  • type: the spatial data type (GEOMETRY, LINESTRING, etc)

  • coordinates: the coordinates of the spatial data

GEOMETRYCOLLECTION

A String representation:

{"srid": <> ,
 "type": "GeometryCollection",
 "geometries": [
    {"type":"<>","coordinates":[<>, <>, ...]}
]}