Skip to content

API Docs - v4.1.5

Sink

kafka (Sink)

A Kafka sink publishes events processed by WSO2 SP to a topic with a partition for a Kafka cluster. The events can be published in the TEXT XML JSON or Binary format.
If the topic is not already created in the Kafka cluster, the Kafka sink creates the default partition for the given topic. The publishing topic and partition can be a dynamic value taken from the Siddhi event.
To configure a sink to use the Kafka transport, the type parameter should have kafka as its value.

Syntax

@sink(type="kafka", bootstrap.servers="<STRING>", topic="<STRING>", partition.no="<INT>", sequence.id="<STRING>", key="<STRING>", is.binary.message="<BOOL>", optional.configuration="<STRING>", @map(...)))

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
bootstrap.servers This parameter specifies the list of Kafka servers to which the Kafka sink must publish events. This list should be provided as a set of comma separated values. e.g., localhost:9092,localhost:9093. STRING No No
topic The topic to which the Kafka sink needs to publish events. Only one topic must be specified. STRING No No
partition.no The partition number for the given topic. Only one partition ID can be defined. If no value is specified for this parameter, the Kafka sink publishes to the default partition of the topic (i.e., 0) 0 INT Yes No
sequence.id A unique identifier to identify the messages published by this sink. This ID allows receivers to identify the sink that published a specific message. null STRING Yes No
key The key contains the values that are used to maintain ordering in a Kafka partition. null STRING Yes No
is.binary.message To send the binary events via kafka sink, it is needed to set this parameter value to true. null BOOL No No
optional.configuration This parameter contains all the other possible configurations that the producer is created with.
e.g., producer.type:async,batch.size:200
null STRING Yes No

Examples EXAMPLE 1

@App:name('TestExecutionPlan') 
define stream FooStream (symbol string, price float, volume long); 
@info(name = 'query1') 
@sink(
type='kafka',
topic='topic_with_partitions',
partition.no='0',
bootstrap.servers='localhost:9092',
@map(type='xml'))
Define stream BarStream (symbol string, price float, volume long);
from FooStream select symbol, price, volume insert into BarStream;

This Kafka sink configuration publishes to 0th partition of the topic named topic_with_partitions.

EXAMPLE 2

@App:name('TestExecutionPlan') 
define stream FooStream (symbol string, price float, volume long); 
@info(name = 'query1') 
@sink(
type='kafka',
topic='{{symbol}}',
partition.no='{{volume}}',
bootstrap.servers='localhost:9092',
@map(type='xml'))
Define stream BarStream (symbol string, price float, volume long); 
from FooStream select symbol, price, volume insert into BarStream;

This query publishes dynamic topic and partitions that are taken from the Siddhi event. The value for partition.no is taken from the volume attribute, and the topic value is taken from the symbol attribute.

kafkaMultiDC (Sink)

A Kafka sink publishes events processed by WSO2 SP to a topic with a partition for a Kafka cluster. The events can be published in the TEXT XML JSON or Binary format.
If the topic is not already created in the Kafka cluster, the Kafka sink creates the default partition for the given topic. The publishing topic and partition can be a dynamic value taken from the Siddhi event.
To configure a sink to publish events via the Kafka transport, and using two Kafka brokers to publish events to the same topic, the type parameter must have kafkaMultiDC as its value.

Syntax

@sink(type="kafkaMultiDC", bootstrap.servers="<STRING>", topic="<STRING>", sequence.id="<STRING>", key="<STRING>", partition.no="<INT>", is.binary.message="<BOOL>", optional.configuration="<STRING>", @map(...)))

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
bootstrap.servers This parameter specifies the list of Kafka servers to which the Kafka sink must publish events. This list should be provided as a set of comma separated values. There must be at least two servers in this list. e.g., localhost:9092,localhost:9093. STRING No No
topic The topic to which the Kafka sink needs to publish events. Only one topic must be specified. STRING No No
sequence.id A unique identifier to identify the messages published by this sink. This ID allows receivers to identify the sink that published a specific message. null STRING Yes No
key The key contains the values that are used to maintain ordering in a Kafka partition. null STRING Yes No
partition.no The partition number for the given topic. Only one partition ID can be defined. If no value is specified for this parameter, the Kafka sink publishes to the default partition of the topic (i.e., 0) 0 INT Yes No
is.binary.message To send the binary events via kafkaMultiDCSink, it is needed to set this parameter value to true. null BOOL No No
optional.configuration This parameter contains all the other possible configurations that the producer is created with.
e.g., producer.type:async,batch.size:200
null STRING Yes No

Examples EXAMPLE 1

@App:name('TestExecutionPlan') 
define stream FooStream (symbol string, price float, volume long); 
@info(name = 'query1') 
@sink(type='kafkaMultiDC', topic='myTopic', partition.no='0',bootstrap.servers='host1:9092, host2:9092', @map(type='xml'))Define stream BarStream (symbol string, price float, volume long);
from FooStream select symbol, price, volume insert into BarStream;

This query publishes to the default (i.e., 0th) partition of the brokers in two data centers

Source

kafka (Source)

A Kafka source receives events to be processed by WSO2 SP from a topic with a partition for a Kafka cluster. The events received can be in the TEXT XML JSON or Binary format.
If the topic is not already created in the Kafka cluster, the Kafka sink creates the default partition for the given topic.

Syntax

@source(type="kafka", bootstrap.servers="<STRING>", topic.list="<STRING>", group.id="<STRING>", threading.option="<STRING>", partition.no.list="<STRING>", seq.enabled="<BOOL>", is.binary.message="<BOOL>", optional.configuration="<STRING>", @map(...)))

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
bootstrap.servers This specifies the list of Kafka servers to which the Kafka source must listen. This list should beprovided as a set of comma-separated values.
e.g., localhost:9092,localhost:9093
STRING No No
topic.list This specifies the list of topics to which the source must listen. This list should be provided as a set of comma-separated values.
e.g., topic_one,topic_two
STRING No No
group.id This is an ID to identify the Kafka source group. The group ID ensures that sources with the same topic and partition that are in the same group do not receive the same event. STRING No No
threading.option This specifies whether the Kafka source is to be run on a single thread, or in multiple threads based on a condition. Possible values are as follows:
single.thread: To run the Kafka source on a single thread.
topic-wise: To use separate thread per topic.
partition.wise: To use a separate thread per partition.
STRING No No
partition.no.list The partition number list for the given topic. This is provided as a list of comma-separated values. e.g., 0,1,2,. 0 STRING Yes No
seq.enabled If this parameter is set to true, the sequence of the events received via the source is taken into account. Therefore, each event should contain a sequence number as an attribute value to indicate the sequence. false BOOL Yes No
is.binary.message To receive the binary events via kafka source, it is needed to set this parameter value to true. false BOOL Yes No
optional.configuration This parameter contains all the other possible configurations that the consumer is created with.
e.g., ssl.keystore.type:JKS,batch.size:200.
null STRING Yes No

Examples EXAMPLE 1

@App:name('TestExecutionPlan') 
define stream BarStream (symbol string, price float, volume long); 
@info(name = 'query1') 
@source(
type='kafka', 
topic.list='kafka_topic,kafka_topic2', 
group.id='test', 
threading.option='partition.wise', 
bootstrap.servers='localhost:9092', 
partition.no.list='0,1', 
@map(type='xml'))
Define stream FooStream (symbol string, price float, volume long);
from FooStream select symbol, price, volume insert into BarStream;

This kafka source configuration listens to the kafka_topic and kafka_topic2 topics with 0 and 1 partitions. A thread is created for each topic and partition combination. The events are received in the XML format, mapped to a Siddhi event, and sent to a stream named FooStream.

EXAMPLE 2

@App:name('TestExecutionPlan') 
define stream BarStream (symbol string, price float, volume long); 
@info(name = 'query1') 
@source(
type='kafka', 
topic.list='kafka_topic',
group.id='test', 
threading.option='single.thread',
bootstrap.servers='localhost:9092',
@map(type='xml'))
Define stream FooStream (symbol string, price float, volume long);
from FooStream select symbol, price, volume insert into BarStream;

This Kafka source configuration listens to the kafka_topic topic for the default partition because no partition.no.list is defined. Only one thread is created for the topic. The events are received in the XML format, mapped to a Siddhi event, and sent to a stream named FooStream.

kafkaMultiDC (Source)

The Kafka Multi Data Center(DC) Source receives records from the same topic in brokers deployed in two different kafka cluster. It will filter out all duplicate messages and try to ensurethat the events are received in the correct order by using sequence numbers. events are received in format such as text, XML JSON and Binary`.The Kafka Source will create the default partition '0' for a given topic, if the topic is not already been created in the Kafka cluster.

Syntax

@source(type="kafkaMultiDC", bootstrap.servers="<STRING>", topic="<STRING>", partition.no="<INT>", is.binary.message="<BOOL>", optional.configuration="<STRING>", @map(...)))

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
bootstrap.servers This should contain the kafka server list which the kafka source should be listening to. This should be given in comma separated values. eg: 'localhost:9092,localhost:9093' STRING No No
topic The topic which the source would be listening to. eg: 'topic_one' STRING No No
partition.no The partition number for the given topic 0 INT Yes No
is.binary.message To receive the binary events via KafkaMultiDCSource, it is needed to set this parameter value to true. false BOOL Yes No
optional.configuration This may contain all the other possible configurations which the consumer should be created with.eg: producer.type:async,batch.size:200 null STRING Yes No

Examples EXAMPLE 1

@App:name('TestExecutionPlan') 
define stream BarStream (symbol string, price float, volume long); 
@info(name = 'query1') 
@source(type='kafkaMultiDC', topic='kafka_topic', bootstrap.servers='host1:9092,host1:9093', partition.no='1', @map(type='xml'))
Define stream FooStream (symbol string, price float, volume long);
from FooStream select symbol, price, volume insert into BarStream;

The following query will listen to 'kafka_topic' topic deployed in broker host1:9092 and host1:9093 with partition 1. There will be a thread created for each broker. The receiving xml events will be mapped to a siddhi event and will be send to the FooStream.