Skip to content

API Docs - v1.0.0-beta

Tested Siddhi Core version: 5.1.3

It could also support other Siddhi Core minor versions.

Sink

grpc (Sink)

This extension publishes event data encoded into GRPC Classes as defined in the user input jar. This extension has a default gRPC service classes added. The default service is called "EventService". Please find the protobuf definition here. This grpc sink is used for scenarios where we send a request and don't expect a response back. I.e getting a google.protobuf.Empty response back.

Syntax

@sink(type="grpc", publisher.url="<STRING>", headers="<STRING>", idle.timeout="<LONG>", keep.alive.time="<LONG>", keep.alive.timeout="<LONG>", keep.alive.without.calls="<BOOL>", enable.retry="<BOOL>", max.retry.attempts="<INT>", retry.buffer.size="<LONG>", per.rpc.buffer.size="<LONG>", channel.termination.waiting.time="<LONG>", truststore.file="<STRING>", truststore.password="<STRING>", truststore.algorithm="<STRING>", tls.store.type="<STRING>", keystore.file="<STRING>", keystore.password="<STRING>", keystore.algorithm="<STRING>", @map(...)))

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
publisher.url

The url to which the outgoing events should be published via this extension. This url should consist the host address, port, service name, method name in the following format. grpc://0.0.0.0:9763/<serviceName>/<methodName>

STRING No No
headers

GRPC Request headers in format "'<key>:<value>','<key>:<value>'". If header parameter is not provided just the payload is sent

- STRING Yes No
idle.timeout

Set the duration in seconds without ongoing RPCs before going to idle mode.

1800 LONG Yes No
keep.alive.time

Sets the time in seconds without read activity before sending a keepalive ping. Keepalives can increase the load on services so must be used with caution. By default set to Long.MAX_VALUE which disables keep alive pinging.

Long.MAX_VALUE LONG Yes No
keep.alive.timeout

Sets the time in seconds waiting for read activity after sending a keepalive ping.

20 LONG Yes No
keep.alive.without.calls

Sets whether keepalive will be performed when there are no outstanding RPC on a connection.

false BOOL Yes No
enable.retry

Enables the retry mechanism provided by the gRPC library.

false BOOL Yes No
max.retry.attempts

Sets max number of retry attempts. The total number of retry attempts for each RPC will not exceed this number even if service config may allow a higher number.

5 INT Yes No
retry.buffer.size

Sets the retry buffer size in bytes. If the buffer limit is exceeded, no RPC could retry at the moment, and in hedging case all hedges but one of the same RPC will cancel.

16777216 LONG Yes No
per.rpc.buffer.size

Sets the per RPC buffer limit in bytes used for retry. The RPC is not retriable if its buffer limit is exceeded.

1048576 LONG Yes No
channel.termination.waiting.time

The time in seconds to wait for the channel to become terminated, giving up if the timeout is reached.

5 LONG Yes No
truststore.file

the file path of truststore. If this is provided then server authentication is enabled

- STRING Yes No
truststore.password

the password of truststore. If this is provided then the integrity of the keystore is checked

- STRING Yes No
truststore.algorithm

the encryption algorithm to be used for server authentication

- STRING Yes No
tls.store.type

TLS store type

- STRING Yes No
keystore.file

the file path of keystore. If this is provided then client authentication is enabled

- STRING Yes No
keystore.password

the password of keystore

- STRING Yes No
keystore.algorithm

the encryption algorithm to be used for client authentication

- STRING Yes No

Examples EXAMPLE 1

@sink(type='grpc',
      publisher.url = 'grpc://134.23.43.35:8080/org.wso2.grpc.EventService/consume',
      @map(type='json'))
define stream FooStream (message String);

Here a stream named FooStream is defined with grpc sink. A grpc server should be running at 194.23.98.100 listening to port 8080. sink.id is set to 1 here. So we can write a source with sink.id 1 so that it will listen to responses for requests published from this stream. Note that since we are using EventService/consume the sink will be operating in default mode

EXAMPLE 2

@sink(type='grpc',
      publisher.url = 'grpc://134.23.43.35:8080/org.wso2.grpc.EventService/consume',
      headers='{{headers}}',
      @map(type='json'),
           @payload('{{message}}'))
define stream FooStream (message String, headers String);

A similar example to above but with headers. Headers are also send into the stream as a data. In the sink headers dynamic property reads the value and sends it as MetaData with the request

EXAMPLE 3

@sink(type='grpc',
      publisher.url = 'grpc://134.23.43.35:8080/org.wso2.grpc.test.MyService/send',
      @map(type='protobuf'),
define stream FooStream (stringValue string, intValue int,longValue long,booleanValue bool,floatValue float,doubleValue double);

Here a stream named FooStream is defined with grpc sink.A grpc servershould be running at 194.23.98.100 listening to port 8080, since there is no mapper provided, attributes of stream definition should be same as the attributes of protobuf message definition

grpc-call (Sink)

This extension publishes event data encoded into GRPC Classes as defined in the user input jar. This extension has a default gRPC service classes jar added. The default service is called "EventService". Please find the protobuf definition here. This grpc-call sink is used for scenarios where we send a request out and expect a response back. In default mode this will use EventService process method. grpc-call-response source is used to receive the responses. A unique sink.id is used to correlate between the sink and its corresponding source.

Syntax

@sink(type="grpc-call", publisher.url="<STRING>", sink.id="<INT>", headers="<STRING>", idle.timeout="<LONG>", keep.alive.time="<LONG>", keep.alive.timeout="<LONG>", keep.alive.without.calls="<BOOL>", enable.retry="<BOOL>", max.retry.attempts="<INT>", retry.buffer.size="<LONG>", per.rpc.buffer.size="<LONG>", channel.termination.waiting.time="<LONG>", max.inbound.message.size="<LONG>", max.inbound.metadata.size="<LONG>", truststore.file="<STRING>", truststore.password="<STRING>", truststore.algorithm="<STRING>", tls.store.type="<STRING>", keystore.file="<STRING>", keystore.password="<STRING>", keystore.algorithm="<STRING>", @map(...)))

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
publisher.url

The url to which the outgoing events should be published via this extension. This url should consist the host address, port, service name, method name in the following format. grpc://0.0.0.0:9763/<serviceName>/<methodName>

STRING No No
sink.id

a unique ID that should be set for each grpc-call-sink. There is a 1:1 mapping between grpc-call sinks and grpc-call-response sources. Each sink has one particular source listening to the responses to requests published from that sink. So the same sink.id should be given when writing the source also.

INT No No
headers

GRPC Request headers in format "'<key>:<value>','<key>:<value>'". If header parameter is not provided just the payload is sent

- STRING Yes No
idle.timeout

Set the duration in seconds without ongoing RPCs before going to idle mode.

1800 LONG Yes No
keep.alive.time

Sets the time in seconds without read activity before sending a keepalive ping. Keepalives can increase the load on services so must be used with caution. By default set to Long.MAX_VALUE which disables keep alive pinging.

Long.MAX_VALUE LONG Yes No
keep.alive.timeout

Sets the time in seconds waiting for read activity after sending a keepalive ping.

20 LONG Yes No
keep.alive.without.calls

Sets whether keepalive will be performed when there are no outstanding RPC on a connection.

false BOOL Yes No
enable.retry

Enables the retry and hedging mechanism provided by the gRPC library.

false BOOL Yes No
max.retry.attempts

Sets max number of retry attempts. The total number of retry attempts for each RPC will not exceed this number even if service config may allow a higher number.

5 INT Yes No
retry.buffer.size

Sets the retry buffer size in bytes. If the buffer limit is exceeded, no RPC could retry at the moment, and in hedging case all hedges but one of the same RPC will cancel.

16777216 LONG Yes No
per.rpc.buffer.size

Sets the per RPC buffer limit in bytes used for retry. The RPC is not retriable if its buffer limit is exceeded.

1048576 LONG Yes No
channel.termination.waiting.time

The time in seconds to wait for the channel to become terminated, giving up if the timeout is reached.

5 LONG Yes No
max.inbound.message.size

Sets the maximum message size allowed to be received on the channel in bytes

4194304 LONG Yes No
max.inbound.metadata.size

Sets the maximum size of metadata allowed to be received in bytes

8192 LONG Yes No
truststore.file

the file path of truststore. If this is provided then server authentication is enabled

- STRING Yes No
truststore.password

the password of truststore. If this is provided then the integrity of the keystore is checked

- STRING Yes No
truststore.algorithm

the encryption algorithm to be used for server authentication

- STRING Yes No
tls.store.type

TLS store type

- STRING Yes No
keystore.file

the file path of keystore. If this is provided then client authentication is enabled

- STRING Yes No
keystore.password

the password of keystore

- STRING Yes No
keystore.algorithm

the encryption algorithm to be used for client authentication

- STRING Yes No

Examples EXAMPLE 1

@sink(type='grpc-call',
      publisher.url = 'grpc://194.23.98.100:8080/EventService/process',
      sink.id= '1', @map(type='json'))
define stream FooStream (message String);
@source(type='grpc-call-response', sink.id= '1')
define stream BarStream (message String);

Here a stream named FooStream is defined with grpc sink. A grpc server should be running at 194.23.98.100 listening to port 8080. sink.id is set to 1 here. So we can write a source with sink.id 1 so that it will listen to responses for requests published from this stream. Note that since we are using EventService/process the sink will be operating in default mode

EXAMPLE 2

@sink(type='grpc-call',
      publisher.url = 'grpc://194.23.98.100:8080/EventService/process',
      sink.id= '1', @map(type='json'))
define stream FooStream (message String);

@source(type='grpc-call-response', sink.id= '1')
define stream BarStream (message String);

Here with the same FooStream definition we have added a BarStream which has a grpc-call-response source with the same sink.id 1. So the responses for calls sent from the FooStream will be added to BarStream.

EXAMPLE 3

@sink(type='grpc-call',
      publisher.url = 'grpc://194.23.98.100:8888/org.wso2.grpc.test.MyService/process',
      sink.id= '1', @map(type='protobuf'))
define stream FooStream (stringValue string, intValue int,longValue long,booleanValue bool,floatValue float,doubleValue double);

@source(type='grpc-call-response', sink.id= '1')
define stream FooStream (stringValue string, intValue int,longValue long,booleanValue bool,floatValue float,doubleValue double);

Here with the same FooStream definition we have added a BarStream which has a grpc-call-response source with the same sink.id 1. So the responses for calls sent from the FooStream will be added to BarStream. since there is no mapping available stream definition attributes names should be as same as the attributes of the protobuf message definition

grpc-service-response (Sink)

This extension is used to send responses back to a gRPC client after receiving requests through grpc-service source. This correlates with the particular source using a unique source.id

Syntax

@sink(type="grpc-service-response", source.id="<INT>", @map(...)))

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
source.id

A unique id to identify the correct source to which this sink is mapped. There is a 1:1 mapping between source and sink

INT No No

Examples EXAMPLE 1

@sink(type='grpc-service-response',
      source.id='1',
      @map(type='json'))
define stream BarStream (messageId String, message String);

@source(type='grpc-service',
        url='grpc://134.23.43.35:8080/org.wso2.grpc.EventService/process',
        source.id='1',
        @map(type='json',
             @attributes(messageId='trp:messageId', message='message')))
define stream FooStream (messageId String, message String);
from FooStream
select * 
insert into BarStream;

The grpc requests are received through the grpc-service sink. Each received event is sent back through grpc-service-source. This is just a passthrough through Siddhi as we are selecting everything from FooStream and inserting into BarStream.

Source

grpc (Source)

This extension starts a grpc server during initialization time. The server listens to requests from grpc stubs. This source has a default mode of operation and custom user defined grpc service mode. By default this uses EventService. Please find the proto definition here In the default mode this source will use EventService consume method. This method will receive requests and injects them into stream through a mapper.

Syntax

@source(type="grpc", receiver.url="<STRING>", max.inbound.message.size="<INT>", max.inbound.metadata.size="<INT>", server.shutdown.waiting.time="<LONG>", truststore.file="<STRING>", truststore.password="<STRING>", truststore.algorithm="<STRING>", tls.store.type="<STRING>", keystore.file="<STRING>", keystore.password="<STRING>", keystore.algorithm="<STRING>", @map(...)))

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
receiver.url

The url which can be used by a client to access the grpc server in this extension. This url should consist the host address, port, service name, method name in the following format. grpc://0.0.0.0:9763/<serviceName>/<methodName>

STRING No No
max.inbound.message.size

Sets the maximum message size in bytes allowed to be received on the server.

4194304 INT Yes No
max.inbound.metadata.size

Sets the maximum size of metadata in bytes allowed to be received.

8192 INT Yes No
server.shutdown.waiting.time

The time in seconds to wait for the server to shutdown, giving up if the timeout is reached.

5 LONG Yes No
truststore.file

the file path of truststore. If this is provided then server authentication is enabled

- STRING Yes No
truststore.password

the password of truststore. If this is provided then the integrity of the keystore is checked

- STRING Yes No
truststore.algorithm

the encryption algorithm to be used for server authentication

- STRING Yes No
tls.store.type

TLS store type

- STRING Yes No
keystore.file

the file path of keystore. If this is provided then client authentication is enabled

- STRING Yes No
keystore.password

the password of keystore

- STRING Yes No
keystore.algorithm

the encryption algorithm to be used for client authentication

- STRING Yes No

Examples EXAMPLE 1

@source(type='grpc',
       receiver.url='grpc://194.23.98.100:8888/org.wso2.grpc.EventService/consume',
       @map(type='json'))
define stream BarStream (message String);

Here the port is given as 8888. So a grpc server will be started on port 8888 and the server will expose EventService. This is the default service packed with the source. In EventService the consume method is

EXAMPLE 2

@source(type='grpc',
       receiver.url='grpc://194.23.98.100:8888/org.wso2.grpc.EventService/consume',
       @map(type='json', @attributes(name='trp:name', age='trp:age', message='message')))
define stream BarStream (message String, name String, age int);

Here we are getting headers sent with the request as transport properties and injecting them into the stream. With each request a header will be sent in MetaData in the following format: 'Name:John', 'Age:23'

EXAMPLE 3

@source(type='grpc',
       receiver.url='grpc://194.23.98.100:8888/org.wso2.grpc.test.MyService/send',
       @map(type='protobuf', ))
define stream BarStream (stringValue string, intValue int,longValue long,booleanValue bool,floatValue float,doubleValue double);

Here the port is give as 8888. So a grpc server will be started on port 8888 and sever will be keep listening to the 'send method in the 'MyService' service.

grpc-call-response (Source)

This grpc source receives responses received from gRPC server for requests sent from a grpc-call sink. The source will receive responses for sink with the same sink.id. For example if you have a gRPC sink with sink.id 15 then we need to set the sink.id as 15 in the source to receives responses. Sinks and sources have 1:1 mapping

Syntax

@source(type="grpc-call-response", sink.id="<INT>", @map(...)))

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
sink.id

a unique ID that should be set for each grpc-call source. There is a 1:1 mapping between grpc-call sinks and grpc-call-response sources. Each sink has one particular source listening to the responses to requests published from that sink. So the same sink.id should be given when writing the sink also.

INT No No

Examples EXAMPLE 1

@source(type='grpc-call-response', sink.id= '1')
define stream BarStream (message String);

Here we are listening to responses for requests sent from the sink with sink.id 1 will be received here. The results will be injected into BarStream

grpc-service (Source)

This extension implements a grpc server for receiving and responding to requests. During initialization time a grpc server is started on the user specified port exposing the required service as given in the url. This source also has a default mode and a user defined grpc service mode. By default this uses EventService. Please find the proto definition here In the default mode this will use the EventService process method. This accepts grpc message class Event as defined in the EventService proto. This uses GrpcServiceResponse sink to send reponses back in the same Event message format.

Syntax

@source(type="grpc-service", receiver.url="<STRING>", max.inbound.message.size="<INT>", max.inbound.metadata.size="<INT>", service.timeout="<INT>", server.shutdown.waiting.time="<LONG>", truststore.file="<STRING>", truststore.password="<STRING>", truststore.algorithm="<STRING>", tls.store.type="<STRING>", keystore.file="<STRING>", keystore.password="<STRING>", keystore.algorithm="<STRING>", @map(...)))

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
receiver.url

The url which can be used by a client to access the grpc server in this extension. This url should consist the host address, port, service name, method name in the following format. grpc://0.0.0.0:9763/<serviceName>/<methodName>

STRING No No
max.inbound.message.size

Sets the maximum message size in bytes allowed to be received on the server.

4194304 INT Yes No
max.inbound.metadata.size

Sets the maximum size of metadata in bytes allowed to be received.

8192 INT Yes No
service.timeout

The period of time in milliseconds to wait for siddhi to respond to a request received. After this time period of receiving a request it will be closed with an error message.

10000 INT Yes No
server.shutdown.waiting.time

The time in seconds to wait for the server to shutdown, giving up if the timeout is reached.

5 LONG Yes No
truststore.file

the file path of truststore. If this is provided then server authentication is enabled

- STRING Yes No
truststore.password

the password of truststore. If this is provided then the integrity of the keystore is checked

- STRING Yes No
truststore.algorithm

the encryption algorithm to be used for server authentication

- STRING Yes No
tls.store.type

TLS store type

- STRING Yes No
keystore.file

the file path of keystore. If this is provided then client authentication is enabled

- STRING Yes No
keystore.password

the password of keystore

- STRING Yes No
keystore.algorithm

the encryption algorithm to be used for client authentication

- STRING Yes No

Examples EXAMPLE 1

@source(type='grpc-service',
       receiver.url='grpc://localhost:8888/org.wso2.grpc.EventService/process',
       source.id='1',
       @map(type='json', @attributes(messageId='trp:messageId', message='message')))
define stream FooStream (messageId String, message String);

Here a grpc server will be started at port 8888. The process method of EventService will be exposed for clients. source.id is set as 1. So a grpc-service-response sink with source.id = 1 will send responses back for requests received to this source. Note that it is required to specify the transport property messageId since we need to correlate the request message with the response.

EXAMPLE 2

@sink(type='grpc-service-response',
      source.id='1',
      @map(type='json'))
define stream BarStream (messageId String, message String);

@source(type='grpc-service',
       receiver.url='grpc://134.23.43.35:8080/org.wso2.grpc.EventService/process',
       source.id='1',
       @map(type='json', @attributes(messageId='trp:messageId', message='message')))
define stream FooStream (messageId String, message String);

from FooStream
select * 
insert into BarStream;

The grpc requests are received through the grpc-service sink. Each received event is sent back through grpc-service-source. This is just a passthrough through Siddhi as we are selecting everything from FooStream and inserting into BarStream.

EXAMPLE 3

@source(type='grpc-service', source.id='1'        receiver.url='grpc://locanhost:8888/org.wso2.grpc.EventService/consume',        @map(type='json', @attributes(name='trp:name', age='trp:age', message='message'))) define stream BarStream (message String, name String, age int);

Here we are getting headers sent with the request as transport properties and injecting them into the stream. With each request a header will be sent in MetaData in the following format: 'Name:John', 'Age:23'

EXAMPLE 4

@sink(type='grpc-service-response',
      source.id='1',
      message.id='{{messageId}}',
      @map(type='protobuf',
@payload(stringValue='a',intValue='b',longValue='c',booleanValue='d',floatValue = 'e', doubleValue ='f')))
define stream BarStream (a string,messageId string, b int,c long,d bool,e float,f double);

@source(type='grpc-service',
       receiver.url='grpc://134.23.43.35:8888/org.wso2.grpc.test.MyService/process',
       source.id='1',
       @map(type='protobuf', @attributes(messageId='trp:message.id', a = 'stringValue', b = 'intValue', c = 'longValue',d = 'booleanValue', e = 'floatValue', f ='doubleValue')))
define stream FooStream (a string,messageId string, b int,c long,d bool,e float,f double);

from FooStream
select * 
insert into BarStream;

Here a grpc server will be started at port 8888. The process method of the MyService will be exposed to the clients.source.id is set as 1.So a grpc-service-response sink with source.id = 1 will send responses back for requests received to this source. Note that it is required to specify the transport property messageId since we need to correlate the request message with the response.