API Docs - v2.0.14
Tested Siddhi Core version: 5.1.21
It could also support other Siddhi Core minor versions.
Source
cdc (Source)
The CDC source receives events when change events (i.e., INSERT, UPDATE, DELETE) are triggered for a database table. Events are received in the 'key-value' format.
There are two modes you could perform CDC: Listening mode and Polling mode.
In polling mode, the datasource is periodically polled for capturing the changes. The polling period can be configured.
In polling mode, you can only capture INSERT and UPDATE changes.
On listening mode, the Source will keep listening to the Change Log of the database and notify in case a change has taken place. Here, you are immediately notified about the change, compared to polling mode.
The key values of the map of a CDC change event are as follows.
For 'listening' mode:
For insert: Keys are specified as columns of the table.
For delete: Keys are followed by the specified table columns. This is achieved via 'before_'. e.g., specifying 'before_X' results in the key being added before the column named 'X'.
For update: Keys are followed followed by the specified table columns. This is achieved via 'before_'. e.g., specifying 'before_X' results in the key being added before the column named 'X'.
For 'polling' mode: Keys are specified as the columns of the table.In order to connect in to the database table for receive CDC events, url, username, password and driverClassName(in polling mode) can be provided in deployment.yaml file under the siddhi namespace as below,
yaml siddhi: extensions: - extension: name: 'cdc' namespace: 'source' properties: url: jdbc:sqlserver://localhost:1433;databaseName=CDC_DATA_STORE password: <password> username: <> driverClassName: com.microsoft.sqlserver.jdbc.SQLServerDriver
Preparations required for working with Oracle Databases in listening mode
Using the extension in Windows, Mac OSX and AIX are pretty straight forward inorder to achieve the required behaviour please follow the steps given below
- Download the compatible version of oracle instantclient for the database version from here and extract
- Extract and set the environment variable LD_LIBRARY_PATH
to the location of instantclient which was exstracted as shown below
export LD_LIBRARY_PATH=<path to the instant client location>
- Inside the instantclient folder which was download there are two jars xstreams.jar
and ojdbc<version>.jar
convert them to OSGi bundles using the tools which were provided in the <distribution>/bin
for converting the ojdbc.jar
use the tool spi-provider.sh|bat
and for the conversion of xstreams.jar
use the jni-provider.sh as shown below(Note: this way of converting Xstreams jar is applicable only for Linux environments for other OSs this step is not required and converting it through the jartobundle.sh
tool is enough)
./jni-provider.sh <input-jar> <destination> <comma seperated native library names>
once ojdbc and xstreams jars are converted to OSGi copy the generated jars to the <distribution>/lib
. Currently siddhi-io-cdc only supports the oracle database distributions 12 and above
Configurations for PostgreSQL
When using listening mode with PostgreSQL, following properties has to be configured accordingly to create the connection.
slot.name: (default value = debezium) in postgreSQL only one connection can be created from single slot, so to create multiple connection custom slot.name should be provided.
plugin.name: (default value = decoderbufs ) Logical decoding output plugin name which the database is configured with. Other supported values are pgoutput, decoderbufs, wal2json.
table.name: table name should be provided as <schema_name>.<table_name>. As an example, public.customer
See parameter: mode for supported databases and change events.
@source(type="cdc", url="<STRING>", mode="<STRING>", jdbc.driver.name="<STRING>", username="<STRING>", password="<STRING>", pool.properties="<STRING>", datasource.name="<STRING>", table.name="<STRING>", polling.column="<STRING>", polling.interval="<INT>", operation="<STRING>", connector.properties="<STRING>", database.server.id="<STRING>", database.server.name="<STRING>", wait.on.missed.record="<BOOL>", missed.record.waiting.timeout="<INT>", polling.history.size="<INT>", cron.expression="<STRING>", plugin.name="<STRING>", @map(...)))
QUERY PARAMETERS
Name | Description | Default Value | Possible Data Types | Optional | Dynamic |
---|---|---|---|---|---|
url | The connection URL to the database. |
STRING | No | No | |
mode | Mode to capture the change data. The type of events that can be received, and the required parameters differ based on the mode. The mode can be one of the following: |
listening | STRING | Yes | No |
jdbc.driver.name | The driver class name for connecting the database. It is required to specify a value for this parameter when the mode is 'polling'. |
STRING | Yes | No | |
username | The username to be used for accessing the database. This user needs to have the 'SELECT', 'RELOAD', 'SHOW DATABASES', 'REPLICATION SLAVE', and 'REPLICATION CLIENT'privileges for the change data capturing table (specified via the 'table.name' parameter). |
STRING | No | No | |
password | The password of the username you specified for accessing the database. |
STRING | No | No | |
pool.properties | The pool parameters for the database connection can be specified as key-value pairs. |
STRING | Yes | No | |
datasource.name | Name of the wso2 datasource to connect to the database. When datasource name is provided, the URL, username and password are not needed. A datasource based connection is given more priority over the URL based connection. |
STRING | Yes | No | |
table.name | The name of the table that needs to be monitored for data changes. |
STRING | No | No | |
polling.column | The column name that is polled to capture the change data. It is recommended to have a TIMESTAMP field as the 'polling.column' in order to capture the inserts and updates. |
STRING | Yes | No | |
polling.interval | The time interval (specified in seconds) to poll the given table for changes. |
1 | INT | Yes | No |
operation | The change event operation you want to carry out. Possible values are 'insert', 'update', 'delete' or you can provide multiple operation as coma separated values. This parameter is not case sensitive. |
STRING | No | No | |
connector.properties | Here, you can specify Debezium connector properties as a comma-separated string. |
Empty_String | STRING | Yes | No |
database.server.id | An ID to be used when joining MySQL database cluster to read the bin log. This should be a unique integer between 1 to 2^32. This parameter is applicable only when the mode is 'listening'. |
Random integer between 5400 and 6400 | STRING | Yes | No |
database.server.name | A logical name that identifies and provides a namespace for the database server. This parameter is applicable only when the mode is 'listening'. |
{host}_{port} | STRING | Yes | No |
wait.on.missed.record | Indicates whether the process needs to wait on missing/out-of-order records. |
false | BOOL | Yes | No |
missed.record.waiting.timeout | The timeout (specified in seconds) to retry for missing/out-of-order record. This should be used along with the wait.on.missed.record parameter. If the parameter is not set, the process will indefinitely wait for the missing record. |
-1 | INT | Yes | No |
polling.history.size | Should be use when metrics are enabled, Define the number of polling details that should expose to metrics, Ex: if polling.history.size is 20, then it will expose details of last 20 polling |
10 | INT | Yes | No |
cron.expression | This is used to specify a timestamp in cron expression.The records which has been inserted or updated is printed when the given expression satisfied by the system time. This parameter is applicable only when the mode is 'polling'. |
None | STRING | Yes | No |
plugin.name | This is used when the logical decoding output plugin needed to specify to create the connection to the database. Mostly this will be required on PostgreSQL. |
decoderbufs | STRING | Yes | No |
Examples EXAMPLE 1
@source(type = 'cdc' , url = 'jdbc:mysql://localhost:3306/SimpleDB',
username = 'cdcuser', password = 'pswd4cdc',
table.name = 'students', operation = 'insert',
@map(type='keyvalue', @attributes(id = 'id', name = 'name')))
define stream inputStream (id string, name string);
In this example, the CDC source listens to the row insertions that are made in the 'students' table with the column name, and the ID. This table belongs to the 'SimpleDB' MySQL database that can be accessed via the given URL.
EXAMPLE 2
@source(type = 'cdc' , url = 'jdbc:mysql://localhost:3306/SimpleDB',
username = 'cdcuser', password = 'pswd4cdc',
table.name = 'students', operation = 'update',
@map(type='keyvalue', @attributes(id = 'id', name = 'name',
before_id = 'before_id', before_name = 'before_name')))
define stream inputStream (before_id string, id string,
before_name string , name string);
In this example, the CDC source listens to the row updates that are made in the 'students' table. This table belongs to the 'SimpleDB' MySQL database that can be accessed via the given URL.
EXAMPLE 3
@source(type = 'cdc' , url = 'jdbc:mysql://localhost:3306/SimpleDB',
username = 'cdcuser', password = 'pswd4cdc',
table.name = 'students', operation = 'delete',
@map(type='keyvalue', @attributes(before_id = 'before_id', before_name = 'before_name')))
define stream inputStream (before_id string, before_name string);
In this example, the CDC source listens to the row deletions made in the 'students' table. This table belongs to the 'SimpleDB' database that can be accessed via the given URL.
EXAMPLE 4
@source(type = 'cdc' , url = 'jdbc:mysql://localhost:3306/SimpleDB',
username = 'cdcuser', password = 'pswd4cdc',
table.name = 'students', operation = 'insert,update,delete',
@map(type='keyvalue', @attributes(before_id = 'before_id', before_name = 'before_name', name = 'name', id = 'id', operation= 'trp:operation')))
define stream inputStream (id string, name string, before_id string, before_name string, operation string);
In this example, the CDC source listens to multiple operations of the 'students' table. This table belongs to the 'SimpleDB' database that can be accessed via the given URL.
EXAMPLE 5
@source(type = 'cdc', mode='polling', polling.column = 'id',
jdbc.driver.name = 'com.mysql.jdbc.Driver', url = 'jdbc:mysql://localhost:3306/SimpleDB',
username = 'cdcuser', password = 'pswd4cdc',
table.name = 'students',
@map(type='keyvalue'), @attributes(id = 'id', name = 'name'))
define stream inputStream (id int, name string);
In this example, the CDC source polls the 'students' table for inserts. 'id' that is specified as the polling colum' is an auto incremental field. The connection to the database is made via the URL, username, password, and the JDBC driver name.
EXAMPLE 6
@source(type = 'cdc', mode='polling', polling.column = 'id', datasource.name = 'SimpleDB',
table.name = 'students',
@map(type='keyvalue'), @attributes(id = 'id', name = 'name'))
define stream inputStream (id int, name string);
In this example, the CDC source polls the 'students' table for inserts. The given polling column is a char column with the 'S001, S002, ... .' pattern. The connection to the database is made via a data source named 'SimpleDB'. Note that the 'datasource.name' parameter works only with the Stream Processor.
EXAMPLE 7
@source(type = 'cdc', mode='polling', polling.column = 'last_updated', datasource.name = 'SimpleDB',
table.name = 'students',
@map(type='keyvalue'))
define stream inputStream (name string);
In this example, the CDC source polls the 'students' table for inserts and updates. The polling column is a timestamp field.
EXAMPLE 8
@source(type='cdc', jdbc.driver.name='com.mysql.jdbc.Driver', url='jdbc:mysql://localhost:3306/SimpleDB', username='cdcuser', password='pswd4cdc', table.name='students', mode='polling', polling.column='id', operation='insert', wait.on.missed.record='true', missed.record.waiting.timeout='10',
@map(type='keyvalue'),
@attributes(batch_no='batch_no', item='item', qty='qty'))
define stream inputStream (id int, name string);
In this example, the CDC source polls the 'students' table for inserts. The polling column is a numeric field. This source expects the records in the database to be written concurrently/out-of-order so it waits if it encounters a missing record. If the record doesn't appear within 10 seconds it resumes the process.
EXAMPLE 9
@source(type = 'cdc', url = 'jdbc:oracle:thin://localhost:1521/ORCLCDB', username='c##xstrm', password='xs', table.name='DEBEZIUM.sweetproductiontable', operation = 'insert', connector.properties='oracle.outserver.name=DBZXOUT,oracle.pdb=ORCLPDB1' @map(type = 'keyvalue'))
define stream insertSweetProductionStream (ID int, NAME string, WEIGHT int);
In this example, the CDC source connect to an Oracle database and listens for insert queries of sweetproduction table