Skip to content

API Docs - v2.0.15

Tested Siddhi Core version: 5.1.21

It could also support other Siddhi Core minor versions.

Sink

nats (Sink)

NATS Sink allows users to subscribe to a Nats or Nats streaming broker and publish messages.

Syntax

@sink(type="nats", destination="<STRING>", bootstrap.servers="<STRING>", server.urls="<STRING>", client.id="<STRING>", cluster.id="<STRING>", streaming.cluster.id="<STRING>", ack.wait="<LONG>", optional.configuration="<STRING>", auth.type="<STRING>", username="<STRING>", password="<STRING>", token="<STRING>", truststore.file="<STRING>", tls.store.type="<STRING>", truststore.password="<STRING>", truststore.algorithm="<STRING>", client.verify="<BOOL>", keystore.file="<STRING>", keystore.algorithm="<STRING>", keystore.password="<STRING>", @map(...)))

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
destination

Subject name which NATS sink should publish to.

STRING No Yes
bootstrap.servers

Deprecated, use server.urls instead, The NATS based urls of the NATS server. Can be provided multiple urls separated by commas(,).

nats://localhost:4222 STRING Yes No
server.urls

The NATS based urls of the NATS server. Can be provided multiple urls separated by commas(,).

nats://localhost:4222 STRING Yes No
client.id

The identifier of the client publishing/connecting to the NATS streaming broker. Should be unique for each client connecting to the server/cluster.(supported only for nats streaming connections).

None STRING Yes No
cluster.id

Deprecated, use streaming.cluster.id instead. The identifier of the NATS server/cluster. Should be provided when using nats streaming broker.

STRING No No
streaming.cluster.id

The identifier of the NATS server/cluster. Should be provided when using nats streaming broker

STRING No No
ack.wait

Ack timeout in seconds for nats publisher, Supported only with nats streaming broker.

LONG No No
optional.configuration

This parameter contains all the other possible configurations that the nats client can be created with.
 io.nats.client.reconnect.max:1, io.nats.client.timeout:1000

- STRING Yes No
auth.type

Set the authentication type. Should be provided when using secure connection. Supported authentication types: user, token, tls

- STRING Yes No
username

Set the username, should be provided if auth.type is set as user

- STRING Yes No
password

Set the password, should be provided if auth.type is set as user

- STRING Yes No
token

Set the token, should be provided if auth.type is set as token

- STRING Yes No
truststore.file

Configure the truststore file

${carbon.home}/resources/security/client-truststore.jks STRING Yes No
tls.store.type

TLS store type.

JKS STRING Yes No
truststore.password

The password for the client truststore

wso2carbon STRING Yes No
truststore.algorithm

The encryption algorithm of the truststore.

SunX509 STRING Yes No
client.verify

Enable the client verification, should be set to true if client needs to be verify by the server.

false BOOL Yes No
keystore.file

Configure the Keystore file, only if client verification is needed.

${carbon.home}/resources/security/wso2carbon.jks STRING Yes No
keystore.algorithm

The encryption algorithm of the keystore.

SunX509 STRING Yes No
keystore.password

The password for the keystore.

wso2carbon STRING Yes No

Examples EXAMPLE 1

@sink(type='nats', @map(type='xml'), destination='SP_NATS_OUTPUT_TEST', server.urls='nats://localhost:4222',client.id='nats_client',streaming.cluster.id='test-cluster')
define stream outputStream (name string, age int, country string);

This example shows how to publish events to a nats streaming broker with basic configurations. Here the nats sink will publish events into the SP_NATS_OUTPUT_TEST subject. Nats streaming server should be runs on the localhost:4222 address. streaming.cluster.id should be provided if wer want to publish events into a nats streaming broker.

EXAMPLE 2

@sink(type='nats', @map(type='xml'), destination='nats-test1', server.urls='nats://localhost:4222')
define stream inputStream (name string, age int, country string)

This example shows how to publish events into a nats broker with basic configurations. Nats server should be running on localhost:4222 and this sink will publish events to the nats-test1 subject.

EXAMPLE 3

@sink(type='nats',@map(type='protobuf', class='io.siddhi.extension.io.nats.utils.protobuf.Person'),
 destination='nats-test1', server.urls='nats://localhost:4222')
define stream inputStream (nic long, name string)

Above query shows how to use nats sink to publish protobuf messages into a nats broker.

Source

nats (Source)

NATS Source allows users to subscribe to a NATS broker and receive messages. It has the ability to receive all the message types supported by NATS.

Syntax

@source(type="nats", destination="<STRING>", bootstrap.servers="<STRING>", server.urls="<STRING>", client.id="<STRING>", cluster.id="<STRING>", streaming.cluster.id="<STRING>", queue.group.name="<STRING>", durable.name="<STRING>", subscription.sequence="<STRING>", optional.configuration="<STRING>", ack.wait="<LONG>", auth.type="<STRING>", username="<STRING>", password="<STRING>", token="<STRING>", truststore.file="<STRING>", tls.store.type="<STRING>", truststore.password="<STRING>", truststore.algorithm="<STRING>", client.verify="<BOOL>", keystore.file="<STRING>", keystore.algorithm="<STRING>", keystore.password="<STRING>", @map(...)))

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
destination

Subject name which NATS Source should subscribe to.

STRING No No
bootstrap.servers

Deprecated, use server.urls instead, The NATS based urls of the NATS server. Can be provided multiple urls separated by commas(,).

nats://localhost:4222 STRING Yes No
server.urls

The NATS based urls of the NATS server. Can be provided multiple urls separated by commas(,).

nats://localhost:4222 STRING Yes No
client.id

The identifier of the client subscribing/connecting to the NATS streaming broker. Should be unique for each client connecting to the server/cluster.(supported only for nats streaming connections).

None STRING Yes No
cluster.id

Deprecated, use streaming.cluster.id instead. The identifier of the NATS server/cluster. Should be provided when using nats streaming broker.

STRING No No
streaming.cluster.id

The identifier of the NATS server/cluster. Should be provided when using nats streaming broker

STRING No No
queue.group.name

This can be used when there is a requirement to share the load of a NATS subject. Clients belongs to the same queue group share the subscription load.

None STRING Yes No
durable.name

This can be used to subscribe to a subject from the last acknowledged message when a client or connection failure happens. The client can be uniquely identified using the tuple (client.id, durable.name).[supported only with nats streaming connections]

None STRING Yes No
subscription.sequence

This can be used to subscribe to a subject from a given number of message sequence. All the messages from the given point of sequence number will be passed to the client. If not provided then the either the persisted value or 0 will be used. [supported only with nats streaming connection]

None STRING Yes No
optional.configuration

This parameter contains all the other possible configurations that the nats client can be created with.
 io.nats.client.reconnect.max:8, io.nats.client.timeout:5000

- STRING Yes No
ack.wait

Add ack wait interval for nats subscriptions in seconds. Supported only with nats streaming brokers.

- LONG Yes No
auth.type

Set the authentication type. Should be provided when using secure connection. Supported authentication types: user, token, tls

- STRING Yes No
username

Set the username, should be provided if auth.type is set as user

- STRING Yes No
password

Set the password, should be provided if auth.type is set as user

- STRING Yes No
token

Set the token, should be provided if auth.type is set as token

- STRING Yes No
truststore.file

Configure the truststore file

${carbon.home}/resources/security/client-truststore.jks STRING Yes No
tls.store.type

TLS store type.

JKS STRING Yes No
truststore.password

The password for the client truststore

wso2carbon STRING Yes No
truststore.algorithm

The encryption algorithm of the truststore.

SunX509 STRING Yes No
client.verify

Enable the client verification, should be set to true if client needs to be verify by the server.

false BOOL Yes No
keystore.file

Configure the Keystore file, only if client verification is needed.

${carbon.home}/resources/security/wso2carbon.jks STRING Yes No
keystore.algorithm

The encryption algorithm of the keystore.

SunX509 STRING Yes No
keystore.password

The password for the keystore.

wso2carbon STRING Yes No

Examples EXAMPLE 1

@source(type='nats', @map(type='text'), destination='SP_NATS_INPUT_TEST', server.urls='nats://localhost:4222',client.id='nats_client',streaming.cluster.id='test-cluster',queue.group.name = 'group_nats',durable.name = 'nats-durable',subscription.sequence = '100')
define stream inputStream (name string, age int, country string);

This example shows how to subscribe to a NATS subject in nats streaming broker with some basic configurations.With the above configuration the source identified as 'nats-client' will subscribes to a subject named as 'SP_NATS_INPUT_TEST' which resides in a nats instance with a cluster id of 'test-cluster', running in localhost and listening to the port 4222 for client connection. This subscription will receive all the messages from 100th in the subject. Since this is using a nats streaming broker it's mandatory to provide the streaming.cluster.id parameter.

EXAMPLE 2

@source(type='nats', @map(type='xml'), destination='nats-test', server.urls='nats://localhost:4222')
define stream inputStream1 (name string, age int, country string);

This will subscribe to a Nats subject in nats broker with some basic configurations. Nats server should be running on the localhost:4222 address and this source will keep listening to messages which receives into the nats-test subject

EXAMPLE 3

@source(type='nats', @map(type='json', @attributes(name='$.name', age='$.age', country='$.country', sequenceNum='trp:sequenceNumber')), destination='SIDDHI_NATS_SOURCE_TEST_DEST', client.id='nats_client', server.urls='nats://localhost:4222', streaming.cluster.id='test-cluster')
define stream inputStream (name string, age int, country string, sequenceNum string);

This example shows how to pass Nats sequence number to the event.