API Docs - v2.0.1
Source
cdc (Source)
The CDC source receives events when change events (i.e., INSERT, UPDATE, DELETE) are triggered for a database table. Events are received in the 'key-value' format.
The key values of the map of a CDC change event are as follows.
For insert: Keys are specified as columns of the table.
For delete: Keys are followed followed by the specified table columns. This is achieved via 'before_'. e.g., specifying 'before_X' results in the key being added before the column named 'X'.
For update: Keys are followed followed by the specified table columns. This is achieved via 'before_'. e.g., specifying 'before_X' results in the key being added before the column named 'X'.
For 'polling' mode: Keys are specified as the coloumns of the table.
See parameter: mode for supported databases and change events.
Syntax
@source(type="cdc", url="<STRING>", mode="<STRING>", jdbc.driver.name="<STRING>", username="<STRING>", password="<STRING>", pool.properties="<STRING>", datasource.name="<STRING>", table.name="<STRING>", polling.column="<STRING>", polling.interval="<INT>", operation="<STRING>", connector.properties="<STRING>", database.server.id="<STRING>", database.server.name="<STRING>", @map(...)))
QUERY PARAMETERS
Name | Description | Default Value | Possible Data Types | Optional | Dynamic |
---|---|---|---|---|---|
url | The connection URL to the database. F=The format used is: 'jdbc:mysql://<host>:<port>/<database_name>' |
STRING | No | No | |
mode | Mode to capture the change data. The type of events that can be received, and the required parameters differ based on the mode. The mode can be one of the following: 'polling': This mode uses a column named 'polling.column' to monitor the given table. It captures change events of the 'RDBMS', 'INSERT, and 'UPDATE' types. 'listening': This mode uses logs to monitor the given table. It currently supports change events only of the 'MySQL', 'INSERT', 'UPDATE', and 'DELETE' types. |
listening | STRING | Yes | No |
jdbc.driver.name | The driver class name for connecting the database. It is required to specify a value for this parameter when the mode is 'polling'. | STRING | Yes | No | |
username | The username to be used for accessing the database. This user needs to have the 'SELECT', 'RELOAD', 'SHOW DATABASES', 'REPLICATION SLAVE', and 'REPLICATION CLIENT'privileges for the change data capturing table (specified via the 'table.name' parameter). To operate in the polling mode, the user needs 'SELECT' privileges. |
STRING | No | No | |
password | The password of the username you specified for accessing the database. | STRING | No | No | |
pool.properties | The pool parameters for the database connection can be specified as key-value pairs. | STRING | Yes | No | |
datasource.name | Name of the wso2 datasource to connect to the database. When datasource name is provided, the URL, username and password are not needed. A datasource based connection is given more priority over the URL based connection. This parameter is applicable only when the mode is set to 'polling', and it can be applied only when you use this extension with WSO2 Stream Processor. |
STRING | Yes | No | |
table.name | The name of the table that needs to be monitored for data changes. | STRING | No | No | |
polling.column | The column name that is polled to capture the change data. It is recommended to have a TIMESTAMP field as the 'polling.column' in order to capture the inserts and updates. Numeric auto-incremental fields and char fields can also be used as 'polling.column'. However, note that fields of these types only support insert change capturing, and the possibility of using a char field also depends on how the data is input. It is required to enter a value for this parameter when the mode is 'polling'. |
STRING | Yes | No | |
polling.interval | The time interval (specified in seconds) to poll the given table for changes. This parameter is applicable only when the mode is set to 'polling'. |
1 | INT | Yes | No |
operation | The change event operation you want to carry out. Possible values are 'insert', 'update' or 'delete'. It is required to specify a value when the mode is 'listening'. This parameter is not case sensitive. |
STRING | No | No | |
connector.properties | Here, you can specify Debezium connector properties as a comma-separated string. The properties specified here are given more priority over the parameters. This parameter is applicable only for the 'listening' mode. |
Empty_String | STRING | Yes | No |
database.server.id | An ID to be used when joining MySQL database cluster to read the bin log. This should be a unique integer between 1 to 2^32. This parameter is applicable only when the mode is 'listening'. | Random integer between 5400 and 6400 | STRING | Yes | No |
database.server.name | A logical name that identifies and provides a namespace for the database server. This parameter is applicable only when the mode is 'listening'. | {host}_{port} | STRING | Yes | No |
Examples EXAMPLE 1
@source(type = 'cdc' , url = 'jdbc:mysql://localhost:3306/SimpleDB',
username = 'cdcuser', password = 'pswd4cdc',
table.name = 'students', operation = 'insert',
@map(type='keyvalue', @attributes(id = 'id', name = 'name')))
define stream inputStream (id string, name string);
In this example, the CDC source listens to the row insertions that are made in the 'students' table with the column name, and the ID. This table belongs to the 'SimpleDB' MySQL database that can be accessed via the given URL.
EXAMPLE 2
@source(type = 'cdc' , url = 'jdbc:mysql://localhost:3306/SimpleDB',
username = 'cdcuser', password = 'pswd4cdc',
table.name = 'students', operation = 'update',
@map(type='keyvalue', @attributes(id = 'id', name = 'name',
before_id = 'before_id', before_name = 'before_name')))
define stream inputStream (before_id string, id string,
before_name string , name string);
In this example, the CDC source listens to the row updates that are made in the 'students' table. This table belongs to the 'SimpleDB' MySQL database that can be accessed via the given URL.
EXAMPLE 3
@source(type = 'cdc' , url = 'jdbc:mysql://localhost:3306/SimpleDB',
username = 'cdcuser', password = 'pswd4cdc',
table.name = 'students', operation = 'delete',
@map(type='keyvalue', @attributes(before_id = 'before_id', before_name = 'before_name')))
define stream inputStream (before_id string, before_name string);
In this example, the CDC source listens to the row deletions made in the 'students' table. This table belongs to the 'SimpleDB' database that can be accessed via the given URL.
EXAMPLE 4
@source(type = 'cdc', mode='polling', polling.column = 'id',
jdbc.driver.name = 'com.mysql.jdbc.Driver', url = 'jdbc:mysql://localhost:3306/SimpleDB',
username = 'cdcuser', password = 'pswd4cdc',
table.name = 'students',
@map(type='keyvalue'), @attributes(id = 'id', name = 'name'))
define stream inputStream (id int, name string);
In this example, the CDC source polls the 'students' table for inserts. 'id' that is specified as the polling colum' is an auto incremental field. The connection to the database is made via the URL, username, password, and the JDBC driver name.
EXAMPLE 5
@source(type = 'cdc', mode='polling', polling.column = 'id', datasource.name = 'SimpleDB',
table.name = 'students',
@map(type='keyvalue'), @attributes(id = 'id', name = 'name'))
define stream inputStream (id int, name string);
In this example, the CDC source polls the 'students' table for inserts. The given polling column is a char column with the 'S001, S002, ... .' pattern. The connection to the database is made via a data source named 'SimpleDB'. Note that the 'datasource.name' parameter works only with the Stream Processor.
EXAMPLE 6
@source(type = 'cdc', mode='polling', polling.column = 'last_updated', datasource.name = 'SimpleDB',
table.name = 'students',
@map(type='keyvalue'))
define stream inputStream (name string);
In this example, the CDC source polls the 'students' table for inserts and updates. The polling column is a timestamp field.