Configuration Reference for Debezium MySQL Source Connector for Confluent Platform
The Debezium MySQL Source connector can be configured using a variety of
configuration properties.
These are properties for the self-managed connector. If you are using
Confluent Cloud, see
MySQL CDC Source (Debezium) Connector
for Confluent Cloud
.
Required Parameters
name
A unique name for the connector. Trying to register again with the same name
will fail. This property is required by all Kafka Connect connectors.
Type: string
Default: No default
connector.class
The name of the Java class for the connector. Always specify
io.debezium.connector.mysql.MySqlConnector
for the MySQL connector.
Type: string
Default: No default
tasks.max
The maximum number of tasks that should be created for this connector. The
MySQL connector always uses a single task and therefore does not use this
value, so the default is always acceptable.
Type: int
Default: 1
database.hostname
IP address or hostname of the MySQL database server.
Type: string
Default: No default
database.port
Integer port number of the MySQL database server.
Type: int
Importance: low
Default:
3306
database.user
Name of the MySQL user to use when connecting to the MySQL database server.
Type: string
Importance: high
Default: No default
database.password
Password to use when connecting to the MySQL database server.
Type: password
Importance: high
Default: No default
topic.prefix
Topic prefix that provides a namespace for the SQL Server database server you
want Debezium to capture. It should be unique across all other connectors,
since it used as the prefix for all Kafka topic names that receive records from
this connector. Only alphanumeric characters, hyphen, dots and underscores
must be used in the database server logical name.
Warning
Do not change the value of this property. If you change the name value,
after a restart, instead of continuing to emit events to the original
topics, the connector emits subsequent events to topics whose names are
based on the new value. The connector is also unable to recover its
database schema history topic.
Type: string
Default: No default
database.server.id
A numeric ID of this database client, which must be unique across all
currently-running database processes in the MySQL cluster. The connector
joins the MySQL database cluster as another server (with this unique ID) so it
can read the binlog. By default, a random number is generated between
5400
and
6400
. Confluent recommends setting a value.
Type: int
Importance: low
Default: random
database.include.list
An optional comma-separated list of regular expressions that match database
names to be monitored. Any database name not included in the include list will
be excluded from monitoring. By default all databases will be monitored. May
not be used with
database.exclude.list
.
Type: list of strings
Importance: low
Default: empty string
database.exclude.list
An optional comma-separated list of regular expressions that match database
names to be excluded from monitoring. Any database name not included in the
exclude list will be monitored. May not be used with
database.include.list
.
Type: list of strings
Importance: low
Default: empty string
table.include.list
An optional, comma-separated list of regular expressions that match schema
names for which you want to capture changes for tables to be monitored. Any
schema not included in the include list will not have its changes captured.
Each identifier is of the form
databaseName.tableName
. By default, the
connector will monitor every non-system table in each monitored schema. May
not be used with
table.exclude.list
.
Type: list of strings
Importance: low
Default: No default
table.exclude.list
An optional comma-separated list of regular expressions that match
fully-qualified table identifiers for tables to be excluded from monitoring.
Any table not included in the blacklist will be monitored. Each identifier is
of the form
databaseName.tableName
. May not be used with
table.include.list
.
Type: list of strings
Importance: low
Default: empty string
column.exclude.list
An optional, comma-separated list of regular expressions that match the
fully-qualified names of columns to exclude from change event record values.
Fully-qualified names for columns are of the form
databaseName.tableName.columnName
.
Type: list of strings
Importance: low
Default: empty string
column.include.list
An optional, comma-separated list of regular expressions that match the
fully-qualified names of columns to include in change event record values.
Fully-qualified names for columns are of the form
databaseName.tableName.columnName
.
Type: list of strings
Importance: low
Default: empty string
skip.messages.without.change
Specifies whether to skip publishing messages when there is no change in
included columns.
Type: boolean
Default: false
column.truncate.to.length.chars
An optional comma-separated list of regular expressions that match the
fully-qualified names of character-based columns. The column values are
truncated in the change event message values if the field values are longer
than the specified number of characters. Multiple properties with different
lengths can be used in a single configuration, although in each the length
must be a positive integer. Fully-qualified names for columns are in the form
databaseName.tableName.columnName
.
Type: list of strings
Importance: low
Default: No default
column.mask.with.length.chars
An optional comma-separated list of regular expressions that match the
fully-qualified names of character-based columns. The column values are
replaced in the change event message values with a field value consisting of
the specified number of asterisk (*) characters. Multiple properties with
different lengths can be used in a single configuration, although in each the
length must be a positive integer. Fully-qualified names for columns are in
the form
databaseName.tableName.columnName
.
Type: list of strings
Importance: low
Default: No default
column.mask.hash.hashAlgorithm.with.salt.salt
;
column.mask.hash.v2.hashAlgorithm.with.salt.salt
An optional, comma-separated list of regular expressions that match the
fully-qualified names of character-based columns. Fully qualified names for a
column are in the following form:
<databaseName>.<tableName>.<columnName>
.
For more details about these properties, see the
Debezium documentation
.
Type: list of strings
Default: No default
column.propagate.source.type
An optional comma-separated list of regular expressions that match the
fully-qualified names of columns whose original type and length should be
added as a parameter to the corresponding field schemas in the emitted change
messages. The schema parameters
__debezium.source.column.type
,
__debezium.source.column.length
and
_debezium.source.column.scale
are
used to propagate the original type name and length (for variable-width
types), respectively. Useful to properly size corresponding columns in sink
databases. Fully-qualified names for columns are in the form
databaseName.tableName.columnName
.
Type: list of strings
Importance: low
Default: No default
database.propagate.source.type
An optional, comma-separated list of regular expressions that match the fully
qualified names of columns whose original type and length should be added as a
parameter to the corresponding field schemas in the emitted change event
records. The following schema parameters are used to propagate the original
type name and length for variable-width types, respectively:
__debezium.source.column.type
,
__debezium.source.column.length
, and
__debezium.source.column.scale
.
This is a useful to properly size corresponding columns in sink databases.
Fully-qualified data type names are of one of the following forms:
databaseName.tableName.typeName
. For a list of MySQL-specific data type
names, see
Data type mappings
in the Debezium documentation.
Type: list of strings
Importance: low
Default: No default
time.precision.mode
Time, date, and timestamps can be represented with
different kinds of precision.
Type: string
Importance: low
Default:
adaptive_time_microseconds
Settings include the following:
adaptive_time_microseconds
: (Default) which captures the date,
datetime and timestamp values exactly as they are in the database. It uses
either millisecond, microsecond, or nanosecond precision values that are are
based on the database column’s type. An exception to this are TIME type
fields, which are always captured as microseconds.
adaptive
: (deprecated) Captures the time and timestamp values exactly as
they are the database using either millisecond, microsecond, or nanosecond
precision values. These values are based on the database column type.
connect
: Represents time and timestamp values using Connect’s
built-in representations for Time, Date, and Timestamp. It uses millisecond
precision regardless of database column precision.
decimal.handling.mode
Specifies how the connector should handle values for
DECIMAL
and
NUMERIC
columns.
Type: string
Importance: low
Default:
precise
Settings include the following:
precise
: (the default) represents them precisely using
java.math.BigDecimal
values represented in change events in a binary
form; or double represents them using double values, which may result in a
loss of precision but will be far easier to use.
double
: Represents them using
double
values, which may result in a
loss of precisions but is easier to use.
string
: encodes values as formatted string which is easy to consume but
semantic information about the real type is lost.
bigint.unsigned.handling.mode
Specifies how BIGINT UNSIGNED columns should be represented in change events.
Type: string
Importance: low
Default:
long
Settings include the following:
precise
uses
java.math.BigDecimal
to represent values, which are
encoded in the change events using a binary representation and
Kafka Connect’s
org.apache.kafka.connect.data.Decimal
type.
long
(the default) represents values using Java’s
long
, which may
not offer the precision but will be far easier to use in consumers.
long
is usually the preferable setting. The
precise
setting should only be used
when working with values larger than 2^63 (these values can not be conveyed
using
long
).
include.schema.changes
Boolean value that specifies whether the connector should publish changes in
the database schema to a Kafka topic with the same name as the database server
ID. Each schema change will be recorded using a key that contains the database
name and whose value includes the DDL statement(s). This is independent of how
the connector internally records database history.
Type: boolean
Importance: low
Default:
true
include.schema.comments
Boolean value that specifies whether the connector should parse and publish
table and column comments on metadata objects. Enabling this option will bring
the implications on memory usage. The number and size of logical schema objects
is what largely impacts how much memory is consumed by the Debezium connectors,
and adding potentially large string data to each of them can potentially be
quite expensive.
Type: boolean
Importance: low
Default:
false
include.query
Boolean value that specifies whether the connector should include the original
SQL query that generated the change event. Note: This option requires MySQL be
configured with the
binlog_rows_query_log_events
option set to
ON
.
Query will not be present for events generated from the snapshot process. Note
that enabling this option may expose tables or fields explicitly excluded
or masked by including the original SQL statement in the change event.
Type: boolean
Importance: low
Default:
false
event.deserialization.failure.handling.mode
Specifies how the connector should react to exceptions during deserialization
of binlog events.
fail
propagates the exception (indicating the
problematic event and its binlog offset), causing the connector to stop.
warn
causes the problematic event to be skipped and the problematic event
and its binlog offset to be logged (make sure that the logger is set to the
WARN
or
ERROR
level).
ignore
causes the problematic event to be
skipped.
Type: string
Importance: low
Default:
fail
inconsistent.schema.handling.mode
Specifies how the connector should react to binlog events that relate to tables not present in the internal schema representation (which is inconsistent with the database).
fail
throws an exception (indicating the problematic event and its binlog offset), causing the connector to stop.
warn
causes the problematic event to be skipped and the problematic event and its binlog offset to be logged (make sure that the logger is set to the
WARN
or
ERROR
level).
ignore
causes the problematic event to be skipped.
Type: string
Importance: low
Default:
fail
max.queue.size
Positive integer value that specifies the maximum size of the blocking queue into which change events read from the database log are placed before they are written to Kafka. This queue can provide backpressure to the binlog reader when, for example, writes to Kafka are slower or if Kafka is not available. Events that appear in the queue are not included in the offsets periodically recorded by this connector. Defaults to
8192
, and should always be larger than the maximum batch size specified in the
max.batch.size
property.
Type: int
Importance: low
Default: 8192
max.batch.size
Positive integer value that specifies the maximum size of each batch of events that should be processed during each iteration of this connector. Defaults to
2048
.
Type: int
Importance: low
Default: 2048
max.queue.size.in.bytes
Long value for the maximum size in bytes of the blocking queue. The feature is
disabled by default, it will be active if it’s set with a positive long value.
Type: long
Importance: low
Default: 0
poll.interval.ms
Positive integer value that specifies the number of milliseconds the connector should wait during each iteration for new change events to appear. Defaults to
500
milliseconds.
Type: int
Importance: low
Default: 500
connect.timeout.ms
A positive integer value that specifies the maximum time in milliseconds this
connector should wait after trying to connect to the MySQL database server
before timing out. Defaults to 30000 milliseconds (30 seconds).
Type: string
Importance: low
Default: 30000
gtid.source.includes
A comma-separated list of regular expressions that match source UUIDs in the GTID set used to find the binlog position in the MySQL server. Only the GTID ranges that have sources matching one of these include patterns will be used. May not be used with
gtid.source.excludes
.
Type: list of strings
Importance: low
No default
gtid.source.excludes
A comma-separated list of regular expressions that match source UUIDs in the GTID set used to find the binlog position in the MySQL server. Only the GTID ranges that have sources matching none of these exclude patterns will be used. May not be used with
gtid.source.includes
.
Type: list of strings
Importance: low
No default
tombstones.on.delete
Controls whether a tombstone event should be generated after a delete event. When set to
true
, the delete operations are represented by a delete event and a subsequent tombstone event. When set to
false
, only a delete event is sent. Emitting the tombstone event (the default behavior) allows Kafka to completely delete all events pertaining to the given key once the source record got deleted.
Type: string
Importance: low
Default: true
message.key.columns
A list of expressions that specify the columns that the connector uses to form
custom message keys for change event records that it publishes to the Kafka
topics for specified tables.
By default, Debezium uses the primary key column of a table as the message key
for records that it emits. In place of the default, or to specify a key for
tables that lack a primary key, you can configure custom message keys based on
one or more columns.
To establish a custom message key for a table, list the table, followed by the
columns to use as the message key. Each list entry takes the following format:
<fully-qualified_tableName>:_<keyColumn>_,<keyColumn>
To base a table key on multiple column names, insert commas between the column names.
Each fully-qualified table name is a regular expression in the following format:
<databaseName>.<tableName>
The property can include entries for multiple tables. Use a semicolon to
separate table entries in the list. The following example sets the message key
for the tables inventory.customers and purchase.orders:
inventory.customers:pk1,pk2;(.*).purchase.orders:pk3,pk4
For the table inventory.customer, the columns pk1 and pk2 are specified as the
message key. For the purchase.orders tables in any database, the columns
pk3 and pk4 server as the message key.
There is no limit to the number of columns that you use to create custom message
keys. However, it’s best to use the minimum number that are required to specify
a unique key.
Type: list
Default: No default
binary.handling.modeSpecifies how binary columns (for example, blob, binary, varbinary) should be
represented in change events.
Type: bytes or string
Importance: low
Default: Bytes
schema.name.adjustment.modeSpecifies how schema names should be adjusted for compatibility with the
message converter used by the connector.
Type: string
Importance: low
Valid values: