Skip to content

API Docs - v1.0.8

Tested Siddhi Core version: 5.1.5

It could also support other Siddhi Core minor versions.

Sink

grpc (Sink)

This extension publishes event data encoded into GRPC Classes as defined in the user input jar. This extension has a default gRPC service classes added. The default service is called "EventService". Please find the protobuf definition here. If we want to use our custom gRPC services, we have to pack auto-generated gRPC service classes and protobuf classes into a jar file and add it into the project classpath (or to the jars folder in the siddhi-tooling folder if we use it with siddhi-tooling). Please find the custom protobuf definition that uses in examples here. This grpc sink is used for scenarios where we send a request and don't expect a response back. I.e getting a google.protobuf.Empty response back.

Syntax

@sink(type="grpc", publisher.url="<STRING>", headers="<STRING>", idle.timeout="<LONG>", keep.alive.time="<LONG>", keep.alive.timeout="<LONG>", keep.alive.without.calls="<BOOL>", enable.retry="<BOOL>", max.retry.attempts="<INT>", retry.buffer.size="<LONG>", per.rpc.buffer.size="<LONG>", channel.termination.waiting.time="<LONG>", truststore.file="<STRING>", truststore.password="<STRING>", truststore.algorithm="<STRING>", tls.store.type="<STRING>", keystore.file="<STRING>", keystore.password="<STRING>", keystore.algorithm="<STRING>", enable.ssl="<BOOL>", mutual.auth.enabled="<BOOL>", @map(...)))

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
publisher.url

The url to which the outgoing events should be published via this extension. This url should consist the host hostPort, port, fully qualified service name, method name in the following format. grpc://0.0.0.0:9763/<serviceName>/<methodName>
For example:
grpc://0.0.0.0:9763/org.wso2.grpc.EventService/consume

STRING No No
headers

GRPC Request headers in format "'<key>:<value>','<key>:<value>'". If header parameter is not provided just the payload is sent

- STRING Yes No
idle.timeout

Set the duration in seconds without ongoing RPCs before going to idle mode.

1800 LONG Yes No
keep.alive.time

Sets the time in seconds without read activity before sending a keepalive ping. Keepalives can increase the load on services so must be used with caution. By default set to Long.MAX_VALUE which disables keep alive pinging.

Long.MAX_VALUE LONG Yes No
keep.alive.timeout

Sets the time in seconds waiting for read activity after sending a keepalive ping.

20 LONG Yes No
keep.alive.without.calls

Sets whether keepalive will be performed when there are no outstanding RPC on a connection.

false BOOL Yes No
enable.retry

Enables the retry mechanism provided by the gRPC library.

false BOOL Yes No
max.retry.attempts

Sets max number of retry attempts. The total number of retry attempts for each RPC will not exceed this number even if service config may allow a higher number.

5 INT Yes No
retry.buffer.size

Sets the retry buffer size in bytes. If the buffer limit is exceeded, no RPC could retry at the moment, and in hedging case all hedges but one of the same RPC will cancel.

16777216 LONG Yes No
per.rpc.buffer.size

Sets the per RPC buffer limit in bytes used for retry. The RPC is not retriable if its buffer limit is exceeded.

1048576 LONG Yes No
channel.termination.waiting.time

The time in seconds to wait for the channel to become terminated, giving up if the timeout is reached.

5 LONG Yes No
truststore.file

the file path of truststore. If this is provided then server authentication is enabled

- STRING Yes No
truststore.password

the password of truststore. If this is provided then the integrity of the keystore is checked

- STRING Yes No
truststore.algorithm

the encryption algorithm to be used for server authentication

- STRING Yes No
tls.store.type

TLS store type

- STRING Yes No
keystore.file

the file path of keystore. If this is provided then client authentication is enabled

- STRING Yes No
keystore.password

the password of keystore

- STRING Yes No
keystore.algorithm

the encryption algorithm to be used for client authentication

- STRING Yes No
enable.ssl

to enable ssl. If set to true and truststore.file is not given then it will be set to default carbon jks by default

FALSE BOOL Yes No
mutual.auth.enabled

to enable mutual authentication. If set to true and truststore.file or keystore.file is not given then it will be set to default carbon jks by default

FALSE BOOL Yes No

Examples EXAMPLE 1

@sink(type='grpc',
      publisher.url = 'grpc://134.23.43.35:8080/org.wso2.grpc.EventService/consume',
      @map(type='json'))
define stream FooStream (message String);

Here a stream named FooStream is defined with grpc sink. A grpc server should be running at 194.23.98.100 listening to port 8080. sink.id is set to 1 here. So we can write a source with sink.id 1 so that it will listen to responses for requests published from this stream. Note that since we are using EventService/consume the sink will be operating in default mode

EXAMPLE 2

@sink(type='grpc',
      publisher.url = 'grpc://134.23.43.35:8080/org.wso2.grpc.EventService/consume',
      headers='{{headers}}',
      @map(type='json'),
           @payload('{{message}}'))
define stream FooStream (message String, headers String);

A similar example to above but with headers. Headers are also send into the stream as a data. In the sink headers dynamic property reads the value and sends it as MetaData with the request

EXAMPLE 3

@sink(type='grpc',
      publisher.url = 'grpc://134.23.43.35:8080/org.wso2.grpc.MyService/send',
      @map(type='protobuf'),
define stream FooStream (stringValue string, intValue int,longValue long,booleanValue bool,floatValue float,doubleValue double);

Here a stream named FooStream is defined with grpc sink. A grpc server should be running at 134.23.43.35 listening to port 8080 since there is no mapper provided, attributes of stream definition should be as same as the attributes of protobuf message definition.

EXAMPLE 4

@sink(type='grpc',
      publisher.url = 'grpc://134.23.43.35:8080/org.wso2.grpc.MyService/testMap',
      @map(type='protobuf'),
define stream FooStream (stringValue string, intValue int,map object);

Here a stream named FooStream is defined with grpc sink. A grpc server should be running at 134.23.43.35 listening to port 8080. The 'map object' in the stream definition defines that this stream is going to use Map object with grpc service. We can use any map object that extends 'java.util.AbstractMap' class.

EXAMPLE 5

@sink(type='grpc',
      publisher.url = 'grpc://134.23.43.35:8080/org.wso2.grpc.MyService/testMap',
      @map(type='protobuf', 
@payload(stringValue='a',longValue='b',intValue='c',booleanValue='d',floatValue = 'e', doubleValue = 'f'))) 
define stream FooStream (a string, b long, c int,d bool,e float,f double);

Here a stream named FooStream is defined with grpc sink. A grpc server should be running at 194.23.98.100 listening to port 8080. @payload is provided in this stream, therefore we can use any name for the attributes in the stream definition, but we should correctly map those names with protobuf message attributes. If we are planning to send metadata within a stream we should use @payload to map attributes to identify the metadata attribute and the protobuf attributes separately.

EXAMPLE 6

@sink(type='grpc',
      publisher.url = 'grpc://194.23.98.100:8888/org.wso2.grpc.test.StreamService/clientStream',
      @map(type='protobuf')) 
define stream FooStream (stringValue string, intValue int,longValue long,booleanValue bool,floatValue float,doubleValue double);

Here in the grpc sink, we are sending a stream of requests to the server that runs on 194.23.98.100 and port 8888. When we need to send a stream of requests from the grpc sink we have to define a client stream RPC method.Then the siddhi will identify whether it's a unary method or a stream method and send requests according to the method type.

grpc-call (Sink)

This extension publishes event data encoded into GRPC Classes as defined in the user input jar. This extension has a default gRPC service classes jar added. The default service is called "EventService". Please find the protobuf definition here. If we want to use our custom gRPC services, we have to pack auto-generated gRPC service classes and protobuf classes into a jar file and add it into the project classpath (or to the jars folder in the siddhi-tooling folder if we use it with siddhi-tooling). Please find the custom protobuf definition that uses in examples here. This grpc-call sink is used for scenarios where we send a request out and expect a response back. In default mode this will use EventService process method. grpc-call-response source is used to receive the responses. A unique sink.id is used to correlate between the sink and its corresponding source.

Syntax

@sink(type="grpc-call", publisher.url="<STRING>", sink.id="<INT>", headers="<STRING>", idle.timeout="<LONG>", keep.alive.time="<LONG>", keep.alive.timeout="<LONG>", keep.alive.without.calls="<BOOL>", enable.retry="<BOOL>", max.retry.attempts="<INT>", retry.buffer.size="<LONG>", per.rpc.buffer.size="<LONG>", channel.termination.waiting.time="<LONG>", max.inbound.message.size="<LONG>", max.inbound.metadata.size="<LONG>", truststore.file="<STRING>", truststore.password="<STRING>", truststore.algorithm="<STRING>", tls.store.type="<STRING>", keystore.file="<STRING>", keystore.password="<STRING>", keystore.algorithm="<STRING>", enable.ssl="<BOOL>", mutual.auth.enabled="<BOOL>", @map(...)))

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
publisher.url

The url to which the outgoing events should be published via this extension. This url should consist the host hostPort, port, fully qualified service name, method name in the following format. grpc://0.0.0.0:9763/<serviceName>/<methodName>
For example:
grpc://0.0.0.0:9763/org.wso2.grpc.EventService/consume

STRING No No
sink.id

a unique ID that should be set for each grpc-call-sink. There is a 1:1 mapping between grpc-call sinks and grpc-call-response sources. Each sink has one particular source listening to the responses to requests published from that sink. So the same sink.id should be given when writing the source also.

INT No No
headers

GRPC Request headers in format "'<key>:<value>','<key>:<value>'". If header parameter is not provided just the payload is sent

- STRING Yes No
idle.timeout

Set the duration in seconds without ongoing RPCs before going to idle mode.

1800 LONG Yes No
keep.alive.time

Sets the time in seconds without read activity before sending a keepalive ping. Keepalives can increase the load on services so must be used with caution. By default set to Long.MAX_VALUE which disables keep alive pinging.

Long.MAX_VALUE LONG Yes No
keep.alive.timeout

Sets the time in seconds waiting for read activity after sending a keepalive ping.

20 LONG Yes No
keep.alive.without.calls

Sets whether keepalive will be performed when there are no outstanding RPC on a connection.

false BOOL Yes No
enable.retry

Enables the retry and hedging mechanism provided by the gRPC library.

false BOOL Yes No
max.retry.attempts

Sets max number of retry attempts. The total number of retry attempts for each RPC will not exceed this number even if service config may allow a higher number.

5 INT Yes No
retry.buffer.size

Sets the retry buffer size in bytes. If the buffer limit is exceeded, no RPC could retry at the moment, and in hedging case all hedges but one of the same RPC will cancel.

16777216 LONG Yes No
per.rpc.buffer.size

Sets the per RPC buffer limit in bytes used for retry. The RPC is not retriable if its buffer limit is exceeded.

1048576 LONG Yes No
channel.termination.waiting.time

The time in seconds to wait for the channel to become terminated, giving up if the timeout is reached.

5 LONG Yes No
max.inbound.message.size

Sets the maximum message size allowed to be received on the channel in bytes

4194304 LONG Yes No
max.inbound.metadata.size

Sets the maximum size of metadata allowed to be received in bytes

8192 LONG Yes No
truststore.file

the file path of truststore. If this is provided then server authentication is enabled

- STRING Yes No
truststore.password

the password of truststore. If this is provided then the integrity of the keystore is checked

- STRING Yes No
truststore.algorithm

the encryption algorithm to be used for server authentication

- STRING Yes No
tls.store.type

TLS store type

- STRING Yes No
keystore.file

the file path of keystore. If this is provided then client authentication is enabled

- STRING Yes No
keystore.password

the password of keystore

- STRING Yes No
keystore.algorithm

the encryption algorithm to be used for client authentication

- STRING Yes No
enable.ssl

to enable ssl. If set to true and truststore.file is not given then it will be set to default carbon jks by default

FALSE BOOL Yes No
mutual.auth.enabled

to enable mutual authentication. If set to true and truststore.file or keystore.file is not given then it will be set to default carbon jks by default

FALSE BOOL Yes No

Examples EXAMPLE 1

@sink(type='grpc-call',
      publisher.url = 'grpc://194.23.98.100:8080/EventService/process',
      sink.id= '1', @map(type='json'))
define stream FooStream (message String);
@source(type='grpc-call-response', sink.id= '1')
define stream BarStream (message String);

Here a stream named FooStream is defined with grpc sink. A grpc server should be running at 194.23.98.100 listening to port 8080. sink.id is set to 1 here. So we can write a source with sink.id 1 so that it will listen to responses for requests published from this stream. Note that since we are using EventService/process the sink will be operating in default mode

EXAMPLE 2

@sink(type='grpc-call',
      publisher.url = 'grpc://194.23.98.100:8080/EventService/process',
      sink.id= '1', @map(type='json'))
define stream FooStream (message String);

@source(type='grpc-call-response', sink.id= '1')
define stream BarStream (message String);

Here with the same FooStream definition we have added a BarStream which has a grpc-call-response source with the same sink.id 1. So the responses for calls sent from the FooStream will be added to BarStream.

EXAMPLE 3

@sink(type='grpc-call',
      publisher.url = 'grpc://194.23.98.100:8888/org.wso2.grpc.test.MyService/process',
      sink.id= '1', @map(type='protobuf'))
define stream FooStream (stringValue string, intValue int,longValue long,booleanValue bool,floatValue float,doubleValue double);

@source(type='grpc-call-response', receiver.url = 'grpc://localhost:8888/org.wso2.grpc.MyService/process', sink.id= '1', 
@map(type='protobuf'))define stream FooStream (stringValue string, intValue int,longValue long,booleanValue bool,floatValue float,doubleValue double);

Here a stream named FooStream is defined with grpc sink. A grpc server should be running at 194.23.98.100 listening to port 8080. We have added another stream called BarStream which is a grpc-call-response source with the same sink.id 1 and as same as FooStream definition. So the responses for calls sent from the FooStream will be added to BarStream. Since there is no mapping available in the stream definition attributes names should be as same as the attributes of the protobuf message definition. (Here the only reason we provide receiver.url in the grpc-call-response source is for protobuf mapper to map Response into a siddhi event, we can give any address and any port number in the URL, but we should provide the service name and the method name correctly)

EXAMPLE 4

@sink(type='grpc-call',
      publisher.url = 'grpc://194.23.98.100:8888/org.wso2.grpc.test.MyService/process',
      sink.id= '1', @map(type='protobuf', 
@payload(stringValue='a',longValue='c',intValue='b',booleanValue='d',floatValue = 'e', doubleValue = 'f')))define stream FooStream (a string, b int,c long,d bool,e float,f double);

@source(type='grpc-call-response', receiver.url = 'grpc://localhost:8888/org.wso2.grpc.test.MyService/process', sink.id= '1', 
@map(type='protobuf',@attributes(a = 'stringValue', b = 'intValue', c = 'longValue',d = 'booleanValue', e ='floatValue', f ='doubleValue')))define stream FooStream (a string, b int,c long,d bool,e float,f double);

Here with the same FooStream definition we have added a BarStream which has a grpc-call-response source with the same sink.id 1. So the responses for calls sent from the FooStream will be added to BarStream. In this stream we provided mapping for both the sink and the source. so we can use any name for the attributes in the stream definition, but we have to map those attributes with correct protobuf attributes. As same as the grpc-sink, if we are planning to use metadata we should map the attributes.

grpc-service-response (Sink)

This extension is used to send responses back to a gRPC client after receiving requests through grpc-service source. This correlates with the particular source using a unique source.id

Syntax

@sink(type="grpc-service-response", source.id="<INT>", @map(...)))

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
source.id

A unique id to identify the correct source to which this sink is mapped. There is a 1:1 mapping between source and sink

INT No No

Examples EXAMPLE 1

@sink(type='grpc-service-response',
      source.id='1',
      @map(type='json'))
define stream BarStream (messageId String, message String);

@source(type='grpc-service',
        url='grpc://134.23.43.35:8080/org.wso2.grpc.EventService/process',
        source.id='1',
        @map(type='json',
             @attributes(messageId='trp:messageId', message='message')))
define stream FooStream (messageId String, message String);
from FooStream
select * 
insert into BarStream;

The grpc requests are received through the grpc-service sink. Each received event is sent back through grpc-service-source. This is just a passthrough through Siddhi as we are selecting everything from FooStream and inserting into BarStream.

Source

grpc (Source)

This extension starts a grpc server during initialization time. The server listens to requests from grpc stubs. This source has a default mode of operation and custom user defined grpc service mode. By default this uses EventService. Please find the proto definition here. In the default mode this source will use EventService consume method. If we want to use our custom gRPC services, we have to pack auto-generated gRPC service classes and protobuf classes into a jar file and add it into the project classpath (or to the jars folder in the siddhi-tooling folder if we use it with siddhi-tooling). Please find the custom protobuf definition that uses in examples here. This method will receive requests and injects them into stream through a mapper.

Syntax

@source(type="grpc", receiver.url="<STRING>", max.inbound.message.size="<INT>", max.inbound.metadata.size="<INT>", server.shutdown.waiting.time="<LONG>", truststore.file="<STRING>", truststore.password="<STRING>", truststore.algorithm="<STRING>", tls.store.type="<STRING>", keystore.file="<STRING>", keystore.password="<STRING>", keystore.algorithm="<STRING>", enable.ssl="<BOOL>", mutual.auth.enabled="<BOOL>", threadpool.size="<INT>", threadpool.buffer.size="<INT>", @map(...)))

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
receiver.url

The url which can be used by a client to access the grpc server in this extension. This url should consist the host hostPort, port, fully qualified service name, method name in the following format. grpc://0.0.0.0:9763/<serviceName>/<methodName>
For example:
grpc://0.0.0.0:9763/org.wso2.grpc.EventService/consume

STRING No No
max.inbound.message.size

Sets the maximum message size in bytes allowed to be received on the server.

4194304 INT Yes No
max.inbound.metadata.size

Sets the maximum size of metadata in bytes allowed to be received.

8192 INT Yes No
server.shutdown.waiting.time

The time in seconds to wait for the server to shutdown, giving up if the timeout is reached.

5 LONG Yes No
truststore.file

the file path of truststore. If this is provided then server authentication is enabled

- STRING Yes No
truststore.password

the password of truststore. If this is provided then the integrity of the keystore is checked

- STRING Yes No
truststore.algorithm

the encryption algorithm to be used for server authentication

- STRING Yes No
tls.store.type

TLS store type

- STRING Yes No
keystore.file

the file path of keystore. If this is provided then client authentication is enabled

- STRING Yes No
keystore.password

the password of keystore

- STRING Yes No
keystore.algorithm

the encryption algorithm to be used for client authentication

- STRING Yes No
enable.ssl

to enable ssl. If set to true and keystore.file is not given then it will be set to default carbon jks by default

FALSE BOOL Yes No
mutual.auth.enabled

to enable mutual authentication. If set to true and keystore.file or truststore.file is not given then it will be set to default carbon jks by default

FALSE BOOL Yes No
threadpool.size

Sets the maximum size of threadpool dedicated to serve requests at the gRPC server

100 INT Yes No
threadpool.buffer.size

Sets the maximum size of threadpool buffer server

100 INT Yes No

System Parameters

Name Description Default Value Possible Parameters
keyStoreFile

Path of the key store file

${carbon.home}/resources/security/wso2carbon.jks valid path for a key store file
keyStorePassword

This is the password used with key store file

wso2carbon valid password for the key store file
keyStoreAlgorithm

The encryption algorithm to be used for client authentication

SunX509 -
trustStoreFile

This is the trust store file with the path

${carbon.home}/resources/security/client-truststore.jks -
trustStorePassword

This is the password used with trust store file

wso2carbon valid password for the trust store file
trustStoreAlgorithm

the encryption algorithm to be used for server authentication

SunX509 -

Examples EXAMPLE 1

@source(type='grpc',
       receiver.url='grpc://localhost:8888/org.wso2.grpc.EventService/consume',
       @map(type='json'))
define stream BarStream (message String);

Here the port is given as 8888. So a grpc server will be started on port 8888 and the server will expose EventService. This is the default service packed with the source. In EventService the consume method is

EXAMPLE 2

@source(type='grpc',
       receiver.url='grpc://localhost:8888/org.wso2.grpc.EventService/consume',
       @map(type='json', @attributes(name='trp:name', age='trp:age', message='message')))
define stream BarStream (message String, name String, age int);

Here we are getting headers sent with the request as transport properties and injecting them into the stream. With each request a header will be sent in MetaData in the following format: 'Name:John', 'Age:23'

EXAMPLE 3

@source(type='grpc',
       receiver.url='grpc://localhost:8888/org.wso2.grpc.MyService/send',
       @map(type='protobuf'))
define stream BarStream (stringValue string, intValue int,longValue long,booleanValue bool,floatValue float,doubleValue double);

Here the port is given as 8888. So a grpc server will be started on port 8888 and sever will keep listening to the 'send' RPC method in the 'MyService' service.

EXAMPLE 4

@source(type='grpc',
       receiver.url='grpc://localhost:8888/org.wso2.grpc.MyService/send',
       @map(type='protobuf', 
@attributes(a = 'stringValue', b = 'intValue', c = 'longValue',d = 'booleanValue', e ='floatValue', f ='doubleValue'))) 
define stream BarStream (a string ,c long,b int, d bool,e float,f double);

Here the port is given as 8888. So a grpc server will be started on port 8888 and sever will keep listening to the 'send' method in the 'MyService' service. Since we provide mapping in the stream we can use any names for stream attributes, but we have to map those names with correct protobuf message attributes' names. If we want to send metadata, we should map the attributes.

EXAMPLE 5

@source(type='grpc',
       receiver.url='grpc://localhost:8888/org.wso2.grpc.StreamService/clientStream',
       @map(type='protobuf')) 
define stream BarStream (stringValue string, intValue int,longValue long,booleanValue bool,floatValue float,doubleValue double);

Here we receive a stream of requests to the grpc source. Whenever we want to use streaming with grpc source, we have to define the RPC method as client streaming method (look at the sample proto file provided in the resource folder[here](https://github.com/siddhi-io/siddhi-io-grpc/tree/master/component/src/main/resources)), when we define a stream method siddhi will identify it as a stream RPC method and ready to accept stream of request from the client.

grpc-call-response (Source)

This grpc source receives responses received from gRPC server for requests sent from a grpc-call sink. The source will receive responses for sink with the same sink.id. For example if you have a gRPC sink with sink.id 15 then we need to set the sink.id as 15 in the source to receives responses. Sinks and sources have 1:1 mapping

Syntax

@source(type="grpc-call-response", sink.id="<INT>", @map(...)))

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
sink.id

a unique ID that should be set for each grpc-call source. There is a 1:1 mapping between grpc-call sinks and grpc-call-response sources. Each sink has one particular source listening to the responses to requests published from that sink. So the same sink.id should be given when writing the sink also.

INT No No

Examples EXAMPLE 1

@source(type='grpc-call-response', sink.id= '1')
define stream BarStream (message String);@sink(type='grpc-call',
      publisher.url = 'grpc://194.23.98.100:8080/EventService/process',
      sink.id= '1', @map(type='json'))
define stream FooStream (message String);

Here we are listening to responses for requests sent from the sink with sink.id 1 will be received here. The results will be injected into BarStream

grpc-service (Source)

This extension implements a grpc server for receiving and responding to requests. During initialization time a grpc server is started on the user specified port exposing the required service as given in the url. This source also has a default mode and a user defined grpc service mode. By default this uses EventService. Please find the proto definition here In the default mode this will use the EventService process method. If we want to use our custom gRPC services, we have to pack auto-generated gRPC service classes and protobuf classes into a jar file and add it into the project classpath (or to the jars folder in the siddhi-tooling folder if we use it with siddhi-tooling). Please find the custom protobuf definition that uses in examples here. This accepts grpc message class Event as defined in the EventService proto. This uses GrpcServiceResponse sink to send reponses back in the same Event message format.

Syntax

@source(type="grpc-service", receiver.url="<STRING>", max.inbound.message.size="<INT>", max.inbound.metadata.size="<INT>", service.timeout="<INT>", server.shutdown.waiting.time="<LONG>", truststore.file="<STRING>", truststore.password="<STRING>", truststore.algorithm="<STRING>", tls.store.type="<STRING>", keystore.file="<STRING>", keystore.password="<STRING>", keystore.algorithm="<STRING>", enable.ssl="<BOOL>", mutual.auth.enabled="<BOOL>", threadpool.size="<INT>", threadpool.buffer.size="<INT>", @map(...)))

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
receiver.url

The url which can be used by a client to access the grpc server in this extension. This url should consist the host hostPort, port, fully qualified service name, method name in the following format. grpc://0.0.0.0:9763/<serviceName>/<methodName>
For example:
grpc://0.0.0.0:9763/org.wso2.grpc.EventService/consume

STRING No No
max.inbound.message.size

Sets the maximum message size in bytes allowed to be received on the server.

4194304 INT Yes No
max.inbound.metadata.size

Sets the maximum size of metadata in bytes allowed to be received.

8192 INT Yes No
service.timeout

The period of time in milliseconds to wait for siddhi to respond to a request received. After this time period of receiving a request it will be closed with an error message.

10000 INT Yes No
server.shutdown.waiting.time

The time in seconds to wait for the server to shutdown, giving up if the timeout is reached.

5 LONG Yes No
truststore.file

the file path of truststore. If this is provided then server authentication is enabled

- STRING Yes No
truststore.password

the password of truststore. If this is provided then the integrity of the keystore is checked

- STRING Yes No
truststore.algorithm

the encryption algorithm to be used for server authentication

- STRING Yes No
tls.store.type

TLS store type

- STRING Yes No
keystore.file

the file path of keystore. If this is provided then client authentication is enabled

- STRING Yes No
keystore.password

the password of keystore

- STRING Yes No
keystore.algorithm

the encryption algorithm to be used for client authentication

- STRING Yes No
enable.ssl

to enable ssl. If set to true and keystore.file is not given then it will be set to default carbon jks by default

FALSE BOOL Yes No
mutual.auth.enabled

to enable mutual authentication. If set to true and truststore.file or keystore.file is not given then it will be set to default carbon jks by default

FALSE BOOL Yes No
threadpool.size

Sets the maximum size of threadpool dedicated to serve requests at the gRPC server

100 INT Yes No
threadpool.buffer.size

Sets the maximum size of threadpool buffer server

100 INT Yes No

System Parameters

Name Description Default Value Possible Parameters
keyStoreFile

This is the key store file with the path

${carbon.home}/resources/security/wso2carbon.jks valid path for a key store file
keyStorePassword

This is the password used with key store file

wso2carbon valid password for the key store file
keyStoreAlgorithm

The encryption algorithm to be used for client authentication

SunX509 -
trustStoreFile

This is the trust store file with the path

${carbon.home}/resources/security/client-truststore.jks -
trustStorePassword

This is the password used with trust store file

wso2carbon valid password for the trust store file
trustStoreAlgorithm

the encryption algorithm to be used for server authentication

SunX509 -

Examples EXAMPLE 1

@source(type='grpc-service',
       receiver.url='grpc://localhost:8888/org.wso2.grpc.EventService/process',
       source.id='1',
       @map(type='json', @attributes(messageId='trp:messageId', message='message')))
define stream FooStream (messageId String, message String);

Here a grpc server will be started at port 8888. The process method of EventService will be exposed for clients. source.id is set as 1. So a grpc-service-response sink with source.id = 1 will send responses back for requests received to this source. Note that it is required to specify the transport property messageId since we need to correlate the request message with the response.

EXAMPLE 2

@sink(type='grpc-service-response',
      source.id='1',
      @map(type='json'))
define stream BarStream (messageId String, message String);

@source(type='grpc-service',
       receiver.url='grpc://134.23.43.35:8080/org.wso2.grpc.EventService/process',
       source.id='1',
       @map(type='json', @attributes(messageId='trp:messageId', message='message')))
define stream FooStream (messageId String, message String);

from FooStream
select * 
insert into BarStream;

The grpc requests are received through the grpc-service sink. Each received event is sent back through grpc-service-source. This is just a passthrough through Siddhi as we are selecting everything from FooStream and inserting into BarStream.

EXAMPLE 3

@source(type='grpc-service', source.id='1' 
       receiver.url='grpc://locanhost:8888/org.wso2.grpc.EventService/consume', 
       @map(type='json', @attributes(name='trp:name', age='trp:age', message='message'))) define stream BarStream (message String, name String, age int);

Here we are getting headers sent with the request as transport properties and injecting them into the stream. With each request a header will be sent in MetaData in the following format: 'Name:John', 'Age:23'

EXAMPLE 4

@sink(type='grpc-service-response',
      source.id='1',
      message.id='{{messageId}}',
      @map(type='protobuf',
@payload(stringValue='a',intValue='b',longValue='c',booleanValue='d',floatValue = 'e', doubleValue ='f')))
define stream BarStream (a string,messageId string, b int,c long,d bool,e float,f double);

@source(type='grpc-service',
       receiver.url='grpc://134.23.43.35:8888/org.wso2.grpc.test.MyService/process',
       source.id='1',
       @map(type='protobuf', @attributes(messageId='trp:message.id', a = 'stringValue', b = 'intValue', c = 'longValue',d = 'booleanValue', e = 'floatValue', f ='doubleValue')))
define stream FooStream (a string,messageId string, b int,c long,d bool,e float,f double);

from FooStream
select * 
insert into BarStream;

Here a grpc server will be started at port 8888. The process method of the MyService will be exposed to the clients. 'source.id' is set as 1. So a grpc-service-response sink with source.id = 1 will send responses back for requests received to this source. Note that it is required to specify the transport property messageId since we need to correlate the request message with the response and also we should map stream attributes with correct protobuf message attributes even they define using the same name as protobuf message attributes.