API Docs - v5.0.9
Tested Siddhi Core version: 5.1.13
It could also support other Siddhi Core minor versions.
Sink
kafka (Sink)
A Kafka sink publishes events processed by WSO2 SP to a topic with a partition for a Kafka cluster. The events can be published in the TEXT
XML
JSON
or Binary
format.
If the topic is not already created in the Kafka cluster, the Kafka sink creates the default partition for the given topic. The publishing topic and partition can be a dynamic value taken from the Siddhi event.
To configure a sink to use the Kafka transport, the type
parameter should have kafka
as its value.
@sink(type="kafka", bootstrap.servers="<STRING>", topic="<STRING>", partition.no="<INT>", sequence.id="<STRING>", key="<STRING>", is.binary.message="<BOOL>", optional.configuration="<STRING>", is.synchronous="<BOOL>", @map(...)))
QUERY PARAMETERS
Name | Description | Default Value | Possible Data Types | Optional | Dynamic |
---|---|---|---|---|---|
bootstrap.servers | This parameter specifies the list of Kafka servers to which the Kafka sink must publish events. This list should be provided as a set of comma separated values. e.g., |
STRING | No | No | |
topic | The topic to which the Kafka sink needs to publish events. Only one topic must be specified. |
STRING | No | No | |
partition.no | The partition number for the given topic. Only one partition ID can be defined. If no value is specified for this parameter, the Kafka sink publishes to the default partition of the topic (i.e., 0) |
0 | INT | Yes | No |
sequence.id | A unique identifier to identify the messages published by this sink. This ID allows receivers to identify the sink that published a specific message. |
null | STRING | Yes | No |
key | The key contains the values that are used to maintain ordering in a Kafka partition. |
null | STRING | Yes | No |
is.binary.message | In order to send the binary events via kafka sink, this parameter is set to 'True'. |
null | BOOL | No | No |
optional.configuration | This parameter contains all the other possible configurations that the producer is created with. |
null | STRING | Yes | No |
is.synchronous | The Kafka sync will publish the events to the server synchronously when thevalue is set to |
false | BOOL | Yes | No |
Examples EXAMPLE 1
@App:name('TestExecutionPlan')
define stream FooStream (symbol string, price float, volume long);
@info(name = 'query1')
@sink(
type='kafka',
topic='topic_with_partitions',
partition.no='0',
bootstrap.servers='localhost:9092',
@map(type='xml'))
Define stream BarStream (symbol string, price float, volume long);
from FooStream select symbol, price, volume insert into BarStream;
This Kafka sink configuration publishes to 0th partition of the topic named topic_with_partitions
.
EXAMPLE 2
@App:name('TestExecutionPlan')
define stream FooStream (symbol string, price float, volume long);
@info(name = 'query1')
@sink(
type='kafka',
topic='{{symbol}}',
partition.no='{{volume}}',
bootstrap.servers='localhost:9092',
@map(type='xml'))
Define stream BarStream (symbol string, price float, volume long);
from FooStream select symbol, price, volume insert into BarStream;
This query publishes dynamic topic and partitions that are taken from the Siddhi event. The value for partition.no
is taken from the volume
attribute, and the topic value is taken from the symbol
attribute.
kafkaMultiDC (Sink)
A Kafka sink publishes events processed by WSO2 SP to a topic with a partition for a Kafka cluster. The events can be published in the TEXT
XML
JSON
or Binary
format.
If the topic is not already created in the Kafka cluster, the Kafka sink creates the default partition for the given topic. The publishing topic and partition can be a dynamic value taken from the Siddhi event.
To configure a sink to publish events via the Kafka transport, and using two Kafka brokers to publish events to the same topic, the type
parameter must have kafkaMultiDC
as its value.
@sink(type="kafkaMultiDC", bootstrap.servers="<STRING>", topic="<STRING>", sequence.id="<STRING>", key="<STRING>", partition.no="<INT>", is.binary.message="<BOOL>", optional.configuration="<STRING>", @map(...)))
QUERY PARAMETERS
Name | Description | Default Value | Possible Data Types | Optional | Dynamic |
---|---|---|---|---|---|
bootstrap.servers | This parameter specifies the list of Kafka servers to which the Kafka sink must publish events. This list should be provided as a set of comma -separated values. There must be at least two servers in this list. e.g., |
STRING | No | No | |
topic | The topic to which the Kafka sink needs to publish events. Only one topic must be specified. |
STRING | No | No | |
sequence.id | A unique identifier to identify the messages published by this sink. This ID allows receivers to identify the sink that published a specific message. |
null | STRING | Yes | No |
key | The key contains the values that are used to maintain ordering in a Kafka partition. |
null | STRING | Yes | No |
partition.no | The partition number for the given topic. Only one partition ID can be defined. If no value is specified for this parameter, the Kafka sink publishes to the default partition of the topic (i.e., 0) |
0 | INT | Yes | No |
is.binary.message | In order to send the binary events via kafkaMultiDCSink, it is required to set this parameter to |
null | BOOL | No | No |
optional.configuration | This parameter contains all the other possible configurations that the producer is created with. |
null | STRING | Yes | No |
Examples EXAMPLE 1
@App:name('TestExecutionPlan')
define stream FooStream (symbol string, price float, volume long);
@info(name = 'query1')
@sink(type='kafkaMultiDC', topic='myTopic', partition.no='0',bootstrap.servers='host1:9092, host2:9092', @map(type='xml'))Define stream BarStream (symbol string, price float, volume long);
from FooStream select symbol, price, volume insert into BarStream;
This query publishes to the default (i.e., 0th) partition of the brokers in two data centers
Source
kafka (Source)
A Kafka source receives events to be processed by WSO2 SP from a topic with a partition for a Kafka cluster. The events received can be in the TEXT
XML
JSON
or Binary
format.
If the topic is not already created in the Kafka cluster, the Kafka sink creates the default partition for the given topic.
@source(type="kafka", bootstrap.servers="<STRING>", topic.list="<STRING>", group.id="<STRING>", threading.option="<STRING>", partition.no.list="<STRING>", seq.enabled="<BOOL>", is.binary.message="<BOOL>", topic.offsets.map="<STRING>", enable.offsets.commit="<BOOL>", enable.async.commit="<BOOL>", optional.configuration="<STRING>", @map(...)))
QUERY PARAMETERS
Name | Description | Default Value | Possible Data Types | Optional | Dynamic |
---|---|---|---|---|---|
bootstrap.servers | This specifies the list of Kafka servers to which the Kafka source must listen. This list can be provided as a set of comma-separated values. |
STRING | No | No | |
topic.list | This specifies the list of topics to which the source must listen. This list can be provided as a set of comma-separated values. |
STRING | No | No | |
group.id | This is an ID to identify the Kafka source group. The group ID ensures that sources with the same topic and partition that are in the same group do not receive the same event. |
STRING | No | No | |
threading.option | This specifies whether the Kafka source is to be run on a single thread, or in multiple threads based on a condition. Possible values are as follows: |
STRING | No | No | |
partition.no.list | The partition number list for the given topic. This is provided as a list of comma-separated values. e.g., |
0 | STRING | Yes | No |
seq.enabled | If this parameter is set to |
false | BOOL | Yes | No |
is.binary.message | In order to receive binary events via the Kafka source,it is required to setthis parameter to 'True'. |
false | BOOL | Yes | No |
topic.offsets.map | This parameter specifies reading offsets for each topic and partition. The value for this parameter is specified in the following format: |
null | STRING | Yes | No |
enable.offsets.commit | This parameter specifies whether to commit offsets. |
true | BOOL | Yes | No |
enable.async.commit | This parameter will changes the type of the committing offsets returned on the last poll for the subscribed list of topics and partitions. |
true | BOOL | Yes | No |
optional.configuration | This parameter contains all the other possible configurations that the consumer is created with. |
null | STRING | Yes | No |
Examples EXAMPLE 1
@App:name('TestExecutionPlan')
define stream BarStream (symbol string, price float, volume long);
@info(name = 'query1')
@source(
type='kafka',
topic.list='kafka_topic,kafka_topic2',
group.id='test',
threading.option='partition.wise',
bootstrap.servers='localhost:9092',
partition.no.list='0,1',
@map(type='xml'))
Define stream FooStream (symbol string, price float, volume long);
from FooStream select symbol, price, volume insert into BarStream;
This kafka source configuration listens to the kafka_topic
and kafka_topic2
topics with 0
and 1
partitions. A thread is created for each topic and partition combination. The events are received in the XML format, mapped to a Siddhi event, and sent to a stream named FooStream
.
EXAMPLE 2
@App:name('TestExecutionPlan')
define stream BarStream (symbol string, price float, volume long);
@info(name = 'query1')
@source(
type='kafka',
topic.list='kafka_topic',
group.id='test',
threading.option='single.thread',
bootstrap.servers='localhost:9092',
@map(type='xml'))
Define stream FooStream (symbol string, price float, volume long);
from FooStream select symbol, price, volume insert into BarStream;
This Kafka source configuration listens to the kafka_topic
topic for the default partition because no partition.no.list
is defined. Only one thread is created for the topic. The events are received in the XML format, mapped to a Siddhi event, and sent to a stream named FooStream
.
EXAMPLE 3
@App:name('TestExecutionPlan')
@source(type='kafka',
topic.list='trp_topic',
partition.no.list='0',
threading.option='single.thread',
group.id='group',
bootstrap.servers='localhost:9092',
@map(type='xml', enclosing.element='//events', @attributes(symbol ='symbol', price = 'price', volume = 'volume', partition = 'trp:partition', topic = 'trp:topic', key = 'trp:key', recordTimestamp = 'trp:record.timestamp', eventTimestamp = 'trp:event.timestamp', checkSum = 'trp:check.sum', topicOffset = 'trp:offset')))
define stream FooStream (symbol string, price float, volume long, partition string, topic string, key string, recordTimestamp string, eventTimestamp string, checkSum string, topicOffset string);
from FooStream select * insert into BarStream;
This Kafka source configuration listens to the trp_topic
topic for the default partition because no partition.no.list
is defined.
Since the custom attribute mapping is enabled with TRP values, the siddhi event will be populated with the relevant trp values as well
kafkaMultiDC (Source)
The Kafka Multi-Datacenter(DC) source receives records from the same topic in brokers deployed in two different kafka clusters. It filters out all the duplicate messages and ensuresthat the events are received in the correct order using sequential numbering. It receives events in formats such as TEXT
, XML
JSON and
Binary`.The Kafka Source creates the default partition '0' for a given topic, if the topic has not yet been created in the Kafka cluster.
@source(type="kafkaMultiDC", bootstrap.servers="<STRING>", topic="<STRING>", partition.no="<INT>", is.binary.message="<BOOL>", optional.configuration="<STRING>", @map(...)))
QUERY PARAMETERS
Name | Description | Default Value | Possible Data Types | Optional | Dynamic |
---|---|---|---|---|---|
bootstrap.servers | This contains the kafka server list which the kafka source listens to. This is given using comma-separated values. eg: 'localhost:9092,localhost:9093' |
STRING | No | No | |
topic | This is the topic that the source listens to. eg: 'topic_one' |
STRING | No | No | |
partition.no | This is the partition number of the given topic. |
0 | INT | Yes | No |
is.binary.message | In order to receive the binary events via the Kafka Multi-DC source, the value of this parameter needs to be set to 'True'. |
false | BOOL | Yes | No |
optional.configuration | This contains all the other possible configurations with which the consumer can be created.eg: producer.type:async,batch.size:200 |
null | STRING | Yes | No |
Examples EXAMPLE 1
@App:name('TestExecutionPlan')
define stream BarStream (symbol string, price float, volume long);
@info(name = 'query1')
@source(type='kafkaMultiDC', topic='kafka_topic', bootstrap.servers='host1:9092,host1:9093', partition.no='1', @map(type='xml'))
Define stream FooStream (symbol string, price float, volume long);
from FooStream select symbol, price, volume insert into BarStream;
The following query listens to 'kafka_topic' topic, deployed in the broker host1:9092 and host1:9093, with partition 1. A thread is created for each broker. The receiving xml events are mapped to a siddhi event and sent to the FooStream.