Skip to content

API Docs - v1.0.5

Source

cdc (Source)

The CDC source receives events when a Database table's change event (INSERT, UPDATE, DELETE) is triggered. The events are received in key-value format.
The following are key values of the map of a CDC change event and their descriptions.
    For insert: Keys will be specified table's columns.
    For delete: Keys will be 'before_' followed by specified table's columns. Eg: before_X.
    For update: Keys will be specified table's columns and 'before_' followed by specified table's columns.
For 'polling' mode: Keys will be specified table's columns.
See parameter: mode for supported databases and change events.

Syntax

@source(type="cdc", url="<STRING>", mode="<STRING>", jdbc.driver.name="<STRING>", username="<STRING>", password="<STRING>", pool.properties="<STRING>", datasource.name="<STRING>", table.name="<STRING>", polling.column="<STRING>", polling.interval="<INT>", operation="<STRING>", connector.properties="<STRING>", database.server.id="<STRING>", database.server.name="<STRING>", @map(...)))

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
url Connection url to the database.
use format: jdbc:mysql://<host>:<port>/<database_name>
STRING No No
mode Mode to capture the change data. Mode 'polling' uses a polling.column to monitor the given table. Mode 'listening' uses logs to monitor the given table.
The required parameters are different for each modes.
mode 'listening' currently supports only MySQL. INSERT, UPDATE, DELETE events can be received.
mode 'polling' supports RDBMS. INSERT, UPDATE events can be received.
listening STRING Yes No
jdbc.driver.name The driver class name for connecting the database. Required for 'polling' mode. STRING Yes No
username Username of a user with SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT privileges on Change Data Capturing table.
For polling mode, a user with SELECT privileges.
STRING No No
password Password for the above user. STRING No No
pool.properties Any pool parameters for the database connection must be specified as key-value pairs. STRING Yes No
datasource.name Name of the wso2 datasource to connect to the database. When datasource.name is provided, the url, username and password are not needed. Has a more priority over url based connection.
Accepted only when mode is set to 'polling'.
STRING Yes No
table.name Name of the table which needs to be monitored for data changes. STRING No No
polling.column Column name on which the polling is done to capture the change data. It is recommend to have a TIMESTAMP field as the polling.column in order to capture inserts and updates.
Numeric auto incremental fields and char fields can be also used as polling.column. Note that it will only support insert change capturing and depends on how the char field's data is input.
Mandatory when mode is 'polling'.
STRING Yes No
polling.interval The interval in seconds to poll the given table for changes.
Accepted only when mode is set to 'polling'.
1 INT Yes No
operation Interested change event operation. 'insert', 'update' or 'delete'. Required for 'listening' mode.
Not case sensitive.
STRING No No
connector.properties Debezium connector specified properties as a comma separated string.
This properties will have more priority over the parameters. Only for 'listening' mode
Empty_String STRING Yes No
database.server.id For MySQL, a unique integer between 1 to 2^32 as the ID, This is used when joining MySQL database cluster to read binlog. Only for 'listening'mode. Random integer between 5400 and 6400 STRING Yes No
database.server.name Logical name that identifies and provides a namespace for the particular database server. Only for 'listening' mode. {host}_{port} STRING Yes No

Examples EXAMPLE 1

@source(type = 'cdc' , url = 'jdbc:mysql://localhost:3306/SimpleDB', 
username = 'cdcuser', password = 'pswd4cdc', 
table.name = 'students', operation = 'insert', 
@map(type='keyvalue', @attributes(id = 'id', name = 'name')))
define stream inputStream (id string, name string);

In this example, the cdc source starts listening to the row insertions on students table with columns name and id which is under MySQL SimpleDB database that can be accessed with the given url

EXAMPLE 2

@source(type = 'cdc' , url = 'jdbc:mysql://localhost:3306/SimpleDB', 
username = 'cdcuser', password = 'pswd4cdc', 
table.name = 'students', operation = 'update', 
@map(type='keyvalue', @attributes(id = 'id', name = 'name', 
before_id = 'before_id', before_name = 'before_name')))
define stream inputStream (before_id string, id string, 
before_name string , name string);

In this example, the cdc source starts listening to the row updates on students table which is under MySQL SimpleDB database that can be accessed with the given url.

EXAMPLE 3

@source(type = 'cdc' , url = 'jdbc:mysql://localhost:3306/SimpleDB', 
username = 'cdcuser', password = 'pswd4cdc', 
table.name = 'students', operation = 'delete', 
@map(type='keyvalue', @attributes(before_id = 'before_id', before_name = 'before_name')))
define stream inputStream (before_id string, before_name string);

In this example, the cdc source starts listening to the row deletions on students table which is under MySQL SimpleDB database that can be accessed with the given url.

EXAMPLE 4

@source(type = 'cdc', mode='polling', polling.column = 'id', 
jdbc.driver.name = 'com.mysql.jdbc.Driver', url = 'jdbc:mysql://localhost:3306/SimpleDB', 
username = 'cdcuser', password = 'pswd4cdc', 
table.name = 'students', 
@map(type='keyvalue'), @attributes(id = 'id', name = 'name'))
define stream inputStream (id int, name string);

In this example, the cdc source starts polling students table for inserts. polling.column is an auto incremental field. url, username, password, and jdbc.driver.name are used to connect to the database.

EXAMPLE 5

@source(type = 'cdc', mode='polling', polling.column = 'id', datasource.name = 'SimpleDB',
table.name = 'students', 
@map(type='keyvalue'), @attributes(id = 'id', name = 'name'))
define stream inputStream (id int, name string);

In this example, the cdc source starts polling students table for inserts. polling.column is a char column with the pattern S001, S002, ... . datasource.name is used to connect to the database. Note that the datasource.name works only with the Stream Processor.

EXAMPLE 6

@source(type = 'cdc', mode='polling', polling.column = 'last_updated', datasource.name = 'SimpleDB',
table.name = 'students', 
@map(type='keyvalue'))
define stream inputStream (name string);

In this example, the cdc source starts polling students table for inserts and updates. polling.column is a timestamp field.