API Docs - v1.0.7
Source
cdc (Source)
The CDC source receives events when a Database table's change event (INSERT, UPDATE, DELETE) is triggered. The events are received in key-value format.
The following are key values of the map of a CDC change event and their descriptions.
For insert: Keys will be specified table's columns.
For delete: Keys will be 'before_' followed by specified table's columns. Eg: before_X.
For update: Keys will be specified table's columns and 'before_' followed by specified table's columns.
For 'polling' mode: Keys will be specified table's columns.
See parameter: mode for supported databases and change events.
Syntax
@source(type="cdc", url="<STRING>", mode="<STRING>", jdbc.driver.name="<STRING>", username="<STRING>", password="<STRING>", pool.properties="<STRING>", datasource.name="<STRING>", table.name="<STRING>", polling.column="<STRING>", polling.interval="<INT>", operation="<STRING>", connector.properties="<STRING>", database.server.id="<STRING>", database.server.name="<STRING>", @map(...)))
QUERY PARAMETERS
Name | Description | Default Value | Possible Data Types | Optional | Dynamic |
---|---|---|---|---|---|
url | Connection url to the database. use format: jdbc:mysql://<host>:<port>/<database_name> |
STRING | No | No | |
mode | Mode to capture the change data. Mode 'polling' uses a polling.column to monitor the given table. Mode 'listening' uses logs to monitor the given table. The required parameters are different for each modes. mode 'listening' currently supports only MySQL. INSERT, UPDATE, DELETE events can be received. mode 'polling' supports RDBMS. INSERT, UPDATE events can be received. |
listening | STRING | Yes | No |
jdbc.driver.name | The driver class name for connecting the database. Required for 'polling' mode. | STRING | Yes | No | |
username | Username of a user with SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT privileges on Change Data Capturing table. For polling mode, a user with SELECT privileges. |
STRING | No | No | |
password | Password for the above user. | STRING | No | No | |
pool.properties | Any pool parameters for the database connection must be specified as key-value pairs. | STRING | Yes | No | |
datasource.name | Name of the wso2 datasource to connect to the database. When datasource.name is provided, the url, username and password are not needed. Has a more priority over url based connection. Accepted only when mode is set to 'polling'. |
STRING | Yes | No | |
table.name | Name of the table which needs to be monitored for data changes. | STRING | No | No | |
polling.column | Column name on which the polling is done to capture the change data. It is recommend to have a TIMESTAMP field as the polling.column in order to capture inserts and updates. Numeric auto incremental fields and char fields can be also used as polling.column. Note that it will only support insert change capturing and depends on how the char field's data is input. Mandatory when mode is 'polling'. |
STRING | Yes | No | |
polling.interval | The interval in seconds to poll the given table for changes. Accepted only when mode is set to 'polling'. |
1 | INT | Yes | No |
operation | Interested change event operation. 'insert', 'update' or 'delete'. Required for 'listening' mode. Not case sensitive. |
STRING | No | No | |
connector.properties | Debezium connector specified properties as a comma separated string. This properties will have more priority over the parameters. Only for 'listening' mode |
Empty_String | STRING | Yes | No |
database.server.id | For MySQL, a unique integer between 1 to 2^32 as the ID, This is used when joining MySQL database cluster to read binlog. Only for 'listening'mode. | Random integer between 5400 and 6400 | STRING | Yes | No |
database.server.name | Logical name that identifies and provides a namespace for the particular database server. Only for 'listening' mode. | {host}_{port} | STRING | Yes | No |
Examples EXAMPLE 1
@source(type = 'cdc' , url = 'jdbc:mysql://localhost:3306/SimpleDB',
username = 'cdcuser', password = 'pswd4cdc',
table.name = 'students', operation = 'insert',
@map(type='keyvalue', @attributes(id = 'id', name = 'name')))
define stream inputStream (id string, name string);
In this example, the cdc source starts listening to the row insertions on students table with columns name and id which is under MySQL SimpleDB database that can be accessed with the given url
EXAMPLE 2
@source(type = 'cdc' , url = 'jdbc:mysql://localhost:3306/SimpleDB',
username = 'cdcuser', password = 'pswd4cdc',
table.name = 'students', operation = 'update',
@map(type='keyvalue', @attributes(id = 'id', name = 'name',
before_id = 'before_id', before_name = 'before_name')))
define stream inputStream (before_id string, id string,
before_name string , name string);
In this example, the cdc source starts listening to the row updates on students table which is under MySQL SimpleDB database that can be accessed with the given url.
EXAMPLE 3
@source(type = 'cdc' , url = 'jdbc:mysql://localhost:3306/SimpleDB',
username = 'cdcuser', password = 'pswd4cdc',
table.name = 'students', operation = 'delete',
@map(type='keyvalue', @attributes(before_id = 'before_id', before_name = 'before_name')))
define stream inputStream (before_id string, before_name string);
In this example, the cdc source starts listening to the row deletions on students table which is under MySQL SimpleDB database that can be accessed with the given url.
EXAMPLE 4
@source(type = 'cdc', mode='polling', polling.column = 'id',
jdbc.driver.name = 'com.mysql.jdbc.Driver', url = 'jdbc:mysql://localhost:3306/SimpleDB',
username = 'cdcuser', password = 'pswd4cdc',
table.name = 'students',
@map(type='keyvalue'), @attributes(id = 'id', name = 'name'))
define stream inputStream (id int, name string);
In this example, the cdc source starts polling students table for inserts. polling.column is an auto incremental field. url, username, password, and jdbc.driver.name are used to connect to the database.
EXAMPLE 5
@source(type = 'cdc', mode='polling', polling.column = 'id', datasource.name = 'SimpleDB',
table.name = 'students',
@map(type='keyvalue'), @attributes(id = 'id', name = 'name'))
define stream inputStream (id int, name string);
In this example, the cdc source starts polling students table for inserts. polling.column is a char column with the pattern S001, S002, ... . datasource.name is used to connect to the database. Note that the datasource.name works only with the Stream Processor.
EXAMPLE 6
@source(type = 'cdc', mode='polling', polling.column = 'last_updated', datasource.name = 'SimpleDB',
table.name = 'students',
@map(type='keyvalue'))
define stream inputStream (name string);
In this example, the cdc source starts polling students table for inserts and updates. polling.column is a timestamp field.