API Docs - v1.0.12
Tested Siddhi Core version: 5.1.21
It could also support other Siddhi Core minor versions.
Sink
grpc (Sink)
gRPC sink publishes event data encoded into GRPC Classes as defined in the user input jar. This extension has a default gRPC service classes added. The default service is called EventService
. This grpc sink is used for scenarios where we send a request and don't expect a response back(getting a google.protobuf.Empty
response back). Please find the default protobuf definition here. Please find the custom protobuf definition that uses in examples here.
@sink(type="grpc", publisher.url="<STRING>", headers="<STRING>", idle.timeout="<LONG>", keep.alive.time="<LONG>", keep.alive.timeout="<LONG>", keep.alive.without.calls="<BOOL>", enable.retry="<BOOL>", max.retry.attempts="<INT>", retry.buffer.size="<LONG>", per.rpc.buffer.size="<LONG>", channel.termination.waiting.time="<LONG>", truststore.file="<STRING>", truststore.password="<STRING>", truststore.algorithm="<STRING>", tls.store.type="<STRING>", keystore.file="<STRING>", keystore.password="<STRING>", keystore.algorithm="<STRING>", enable.ssl="<BOOL>", mutual.auth.enabled="<BOOL>", @map(...)))
QUERY PARAMETERS
Name | Description | Default Value | Possible Data Types | Optional | Dynamic |
---|---|---|---|---|---|
publisher.url | The url to which the outgoing events should be published via this extension. This url should consist the host hostPort, port, fully qualified service name, method name in the following format. |
STRING | No | No | |
headers | GRPC Request headers in format |
- | STRING | Yes | No |
idle.timeout | Set the duration in seconds without ongoing RPCs before going to idle mode. |
1800 | LONG | Yes | No |
keep.alive.time | Sets the time in seconds without read activity before sending a keepalive ping. Keepalives can increase the load on services so must be used with caution. By default set to Long.MAX_VALUE which disables keep alive pinging. |
Long.MAX_VALUE | LONG | Yes | No |
keep.alive.timeout | Sets the time in seconds waiting for read activity after sending a keepalive ping. |
20 | LONG | Yes | No |
keep.alive.without.calls | Sets whether keepalive will be performed when there are no outstanding RPC on a connection. |
false | BOOL | Yes | No |
enable.retry | Enables the retry mechanism provided by the gRPC library. |
false | BOOL | Yes | No |
max.retry.attempts | Sets max number of retry attempts. The total number of retry attempts for each RPC will not exceed this number even if service config may allow a higher number. |
5 | INT | Yes | No |
retry.buffer.size | Sets the retry buffer size in bytes. If the buffer limit is exceeded, no RPC could retry at the moment, and in hedging case all hedges but one of the same RPC will cancel. |
16777216 | LONG | Yes | No |
per.rpc.buffer.size | Sets the per RPC buffer limit in bytes used for retry. The RPC is not retriable if its buffer limit is exceeded. |
1048576 | LONG | Yes | No |
channel.termination.waiting.time | The time in seconds to wait for the channel to become terminated, giving up if the timeout is reached. |
5 | LONG | Yes | No |
truststore.file | the file path of truststore. If this is provided then server authentication is enabled |
- | STRING | Yes | No |
truststore.password | the password of truststore. If this is provided then the integrity of the keystore is checked |
- | STRING | Yes | No |
truststore.algorithm | the encryption algorithm to be used for server authentication |
- | STRING | Yes | No |
tls.store.type | TLS store type |
- | STRING | Yes | No |
keystore.file | the file path of keystore. If this is provided then client authentication is enabled |
- | STRING | Yes | No |
keystore.password | the password of keystore |
- | STRING | Yes | No |
keystore.algorithm | the encryption algorithm to be used for client authentication |
- | STRING | Yes | No |
enable.ssl | to enable ssl. If set to true and truststore.file is not given then it will be set to default carbon jks by default |
FALSE | BOOL | Yes | No |
mutual.auth.enabled | to enable mutual authentication. If set to true and truststore.file or keystore.file is not given then it will be set to default carbon jks by default |
FALSE | BOOL | Yes | No |
Examples EXAMPLE 1
@sink(type='grpc',
publisher.url = 'grpc://134.23.43.35:8080/org.wso2.grpc.EventService/consume',
@map(type='json'))
define stream FooStream (message String);
Here a stream named FooStream is defined with grpc sink. A grpc server should be running at 194.23.98.100 listening to port 8080. sink.id is set to 1 here. So we can write a source with sink.id 1 so that it will listen to responses for requests published from this stream. Note that since we are using EventService/consume the sink will be operating in default mode
EXAMPLE 2
@sink(type='grpc',
publisher.url = 'grpc://134.23.43.35:8080/org.wso2.grpc.EventService/consume',
headers='{{headers}}',
@map(type='json'),
@payload('{{message}}'))
define stream FooStream (message String, headers String);
A similar example to above but with headers. Headers are also send into the stream as a data. In the sink headers dynamic property reads the value and sends it as MetaData with the request
EXAMPLE 3
@sink(type='grpc',
publisher.url = 'grpc://134.23.43.35:8080/org.wso2.grpc.MyService/send',
@map(type='protobuf'),
define stream FooStream (stringValue string, intValue int,longValue long,booleanValue bool,floatValue float,doubleValue double);
Here a stream named FooStream is defined with grpc sink. A grpc server should be running at 134.23.43.35 listening to port 8080 since there is no mapper provided, attributes of stream definition should be as same as the attributes of protobuf message definition.
EXAMPLE 4
@sink(type='grpc',
publisher.url = 'grpc://134.23.43.35:8080/org.wso2.grpc.MyService/testMap',
@map(type='protobuf'),
define stream FooStream (stringValue string, intValue int,map object);
Here a stream named FooStream is defined with grpc sink. A grpc server should be running at 134.23.43.35 listening to port 8080. The 'map object' in the stream definition defines that this stream is going to use Map object with grpc service. We can use any map object that extends 'java.util.AbstractMap' class.
EXAMPLE 5
@sink(type='grpc',
publisher.url = 'grpc://134.23.43.35:8080/org.wso2.grpc.MyService/testMap',
@map(type='protobuf',
@payload(stringValue='a',longValue='b',intValue='c',booleanValue='d',floatValue = 'e', doubleValue = 'f')))
define stream FooStream (a string, b long, c int,d bool,e float,f double);
Here a stream named FooStream is defined with grpc sink. A grpc server should be running at 194.23.98.100 listening to port 8080. @payload is provided in this stream, therefore we can use any name for the attributes in the stream definition, but we should correctly map those names with protobuf message attributes. If we are planning to send metadata within a stream we should use @payload to map attributes to identify the metadata attribute and the protobuf attributes separately.
EXAMPLE 6
@sink(type='grpc',
publisher.url = 'grpc://194.23.98.100:8888/org.wso2.grpc.test.StreamService/clientStream',
@map(type='protobuf'))
define stream FooStream (stringValue string, intValue int,longValue long,booleanValue bool,floatValue float,doubleValue double);
Here in the grpc sink, we are sending a stream of requests to the server that runs on 194.23.98.100 and port 8888. When we need to send a stream of requests from the grpc sink we have to define a client stream RPC method.Then the siddhi will identify whether it's a unary method or a stream method and send requests according to the method type.
grpc-call (Sink)
grpc-call sink publishes event data encoded into GRPC Classes as defined in the user input jar. This extension has a default gRPC service classes jar added. The default service is called EventService
. This grpc-call sink is used for scenarios where we send a request out and expect a response back. In default mode this will use EventService process
method. grpc-call-response source is used to receive the responses. A unique sink.id is used to correlate between the sink and its corresponding source.Please find the default protobuf definition here.Please find the custom protobuf definition that uses in examples here.
@sink(type="grpc-call", publisher.url="<STRING>", sink.id="<INT>", headers="<STRING>", idle.timeout="<LONG>", keep.alive.time="<LONG>", keep.alive.timeout="<LONG>", keep.alive.without.calls="<BOOL>", enable.retry="<BOOL>", max.retry.attempts="<INT>", retry.buffer.size="<LONG>", per.rpc.buffer.size="<LONG>", channel.termination.waiting.time="<LONG>", max.inbound.message.size="<LONG>", max.inbound.metadata.size="<LONG>", truststore.file="<STRING>", truststore.password="<STRING>", truststore.algorithm="<STRING>", tls.store.type="<STRING>", keystore.file="<STRING>", keystore.password="<STRING>", keystore.algorithm="<STRING>", enable.ssl="<BOOL>", mutual.auth.enabled="<BOOL>", @map(...)))
QUERY PARAMETERS
Name | Description | Default Value | Possible Data Types | Optional | Dynamic |
---|---|---|---|---|---|
publisher.url | The url to which the outgoing events should be published via this extension. This url should consist the host hostPort, port, fully qualified service name, method name in the following format. |
STRING | No | No | |
sink.id | a unique ID that should be set for each grpc-call-sink. There is a 1:1 mapping between grpc-call sinks and grpc-call-response sources. Each sink has one particular source listening to the responses to requests published from that sink. So the same sink.id should be given when writing the source also. |
INT | No | No | |
headers | GRPC Request headers in format |
- | STRING | Yes | No |
idle.timeout | Set the duration in seconds without ongoing RPCs before going to idle mode. |
1800 | LONG | Yes | No |
keep.alive.time | Sets the time in seconds without read activity before sending a keepalive ping. Keepalives can increase the load on services so must be used with caution. By default set to Long.MAX_VALUE which disables keep alive pinging. |
Long.MAX_VALUE | LONG | Yes | No |
keep.alive.timeout | Sets the time in seconds waiting for read activity after sending a keepalive ping. |
20 | LONG | Yes | No |
keep.alive.without.calls | Sets whether keepalive will be performed when there are no outstanding RPC on a connection. |
false | BOOL | Yes | No |
enable.retry | Enables the retry and hedging mechanism provided by the gRPC library. |
false | BOOL | Yes | No |
max.retry.attempts | Sets max number of retry attempts. The total number of retry attempts for each RPC will not exceed this number even if service config may allow a higher number. |
5 | INT | Yes | No |
retry.buffer.size | Sets the retry buffer size in bytes. If the buffer limit is exceeded, no RPC could retry at the moment, and in hedging case all hedges but one of the same RPC will cancel. |
16777216 | LONG | Yes | No |
per.rpc.buffer.size | Sets the per RPC buffer limit in bytes used for retry. The RPC is not retriable if its buffer limit is exceeded. |
1048576 | LONG | Yes | No |
channel.termination.waiting.time | The time in seconds to wait for the channel to become terminated, giving up if the timeout is reached. |
5 | LONG | Yes | No |
max.inbound.message.size | Sets the maximum message size allowed to be received on the channel in bytes |
4194304 | LONG | Yes | No |
max.inbound.metadata.size | Sets the maximum size of metadata allowed to be received in bytes |
8192 | LONG | Yes | No |
truststore.file | the file path of truststore. If this is provided then server authentication is enabled |
- | STRING | Yes | No |
truststore.password | the password of truststore. If this is provided then the integrity of the keystore is checked |
- | STRING | Yes | No |
truststore.algorithm | the encryption algorithm to be used for server authentication |
- | STRING | Yes | No |
tls.store.type | TLS store type |
- | STRING | Yes | No |
keystore.file | the file path of keystore. If this is provided then client authentication is enabled |
- | STRING | Yes | No |
keystore.password | the password of keystore |
- | STRING | Yes | No |
keystore.algorithm | the encryption algorithm to be used for client authentication |
- | STRING | Yes | No |
enable.ssl | to enable ssl. If set to true and truststore.file is not given then it will be set to default carbon jks by default |
FALSE | BOOL | Yes | No |
mutual.auth.enabled | to enable mutual authentication. If set to true and truststore.file or keystore.file is not given then it will be set to default carbon jks by default |
FALSE | BOOL | Yes | No |
Examples EXAMPLE 1
@sink(type='grpc-call',
publisher.url = 'grpc://194.23.98.100:8080/EventService/process',
sink.id= '1', @map(type='json'))
define stream FooStream (message String);
@source(type='grpc-call-response', sink.id= '1')
define stream BarStream (message String);
Here a stream named FooStream is defined with grpc sink. A grpc server should be running at 194.23.98.100 listening to port 8080. sink.id is set to 1 here. So we can write a source with sink.id 1 so that it will listen to responses for requests published from this stream. Note that since we are using EventService/process the sink will be operating in default mode
EXAMPLE 2
@sink(type='grpc-call',
publisher.url = 'grpc://194.23.98.100:8080/EventService/process',
sink.id= '1', @map(type='json'))
define stream FooStream (message String);
@source(type='grpc-call-response', sink.id= '1')
define stream BarStream (message String);
Here with the same FooStream definition we have added a BarStream which has a grpc-call-response source with the same sink.id 1. So the responses for calls sent from the FooStream will be added to BarStream.
EXAMPLE 3
@sink(type='grpc-call',
publisher.url = 'grpc://194.23.98.100:8888/org.wso2.grpc.test.MyService/process',
sink.id= '1', @map(type='protobuf'))
define stream FooStream (stringValue string, intValue int,longValue long,booleanValue bool,floatValue float,doubleValue double);
@source(type='grpc-call-response', receiver.url = 'grpc://localhost:8888/org.wso2.grpc.MyService/process', sink.id= '1',
@map(type='protobuf'))define stream FooStream (stringValue string, intValue int,longValue long,booleanValue bool,floatValue float,doubleValue double);
Here a stream named FooStream is defined with grpc sink. A grpc server should be running at 194.23.98.100 listening to port 8080. We have added another stream called BarStream which is a grpc-call-response source with the same sink.id 1 and as same as FooStream definition. So the responses for calls sent from the FooStream will be added to BarStream. Since there is no mapping available in the stream definition attributes names should be as same as the attributes of the protobuf message definition. (Here the only reason we provide receiver.url in the grpc-call-response source is for protobuf mapper to map Response into a siddhi event, we can give any address and any port number in the URL, but we should provide the service name and the method name correctly)
EXAMPLE 4
@sink(type='grpc-call',
publisher.url = 'grpc://194.23.98.100:8888/org.wso2.grpc.test.MyService/process',
sink.id= '1', @map(type='protobuf',
@payload(stringValue='a',longValue='c',intValue='b',booleanValue='d',floatValue = 'e', doubleValue = 'f')))define stream FooStream (a string, b int,c long,d bool,e float,f double);
@source(type='grpc-call-response', receiver.url = 'grpc://localhost:8888/org.wso2.grpc.test.MyService/process', sink.id= '1',
@map(type='protobuf',@attributes(a = 'stringValue', b = 'intValue', c = 'longValue',d = 'booleanValue', e ='floatValue', f ='doubleValue')))define stream FooStream (a string, b int,c long,d bool,e float,f double);
Here with the same FooStream definition we have added a BarStream which has a grpc-call-response source with the same sink.id 1. So the responses for calls sent from the FooStream will be added to BarStream. In this stream we provided mapping for both the sink and the source. so we can use any name for the attributes in the stream definition, but we have to map those attributes with correct protobuf attributes. As same as the grpc-sink, if we are planning to use metadata we should map the attributes.
grpc-service-response (Sink)
This extension is used to send responses back to a gRPC client after receiving requests through grpc-service source. This correlates with the particular source using a unique source.id
Syntax@sink(type="grpc-service-response", source.id="<INT>", @map(...)))
QUERY PARAMETERS
Name | Description | Default Value | Possible Data Types | Optional | Dynamic |
---|---|---|---|---|---|
source.id | A unique id to identify the correct source to which this sink is mapped. There is a 1:1 mapping between source and sink |
INT | No | No |
Examples EXAMPLE 1
@sink(type='grpc-service-response',
source.id='1',
@map(type='json'))
define stream BarStream (messageId String, message String);
@source(type='grpc-service',
url='grpc://134.23.43.35:8080/org.wso2.grpc.EventService/process',
source.id='1',
@map(type='json',
@attributes(messageId='trp:messageId', message='message')))
define stream FooStream (messageId String, message String);
from FooStream
select *
insert into BarStream;
The grpc requests are received through the grpc-service sink. Each received event is sent back through grpc-service-source. This is just a passthrough through Siddhi as we are selecting everything from FooStream and inserting into BarStream.
Source
grpc (Source)
This extension starts a grpc server during initialization time. The server listens to requests from grpc stubs. This source has a default mode of operation and custom user defined grpc service mode. By default this uses EventService
. Please find the proto definition here. In the default mode this source will use EventService consume
method. Please find the custom protobuf definition that uses in examples here. This method will receive requests and injects them into stream through a mapper.
@source(type="grpc", receiver.url="<STRING>", max.inbound.message.size="<INT>", max.inbound.metadata.size="<INT>", server.shutdown.waiting.time="<LONG>", truststore.file="<STRING>", truststore.password="<STRING>", truststore.algorithm="<STRING>", tls.store.type="<STRING>", keystore.file="<STRING>", keystore.password="<STRING>", keystore.algorithm="<STRING>", enable.ssl="<BOOL>", mutual.auth.enabled="<BOOL>", threadpool.size="<INT>", threadpool.buffer.size="<INT>", @map(...)))
QUERY PARAMETERS
Name | Description | Default Value | Possible Data Types | Optional | Dynamic |
---|---|---|---|---|---|
receiver.url | The url which can be used by a client to access the grpc server in this extension. This url should consist the host hostPort, port, fully qualified service name, method name in the following format. |
STRING | No | No | |
max.inbound.message.size | Sets the maximum message size in bytes allowed to be received on the server. |
4194304 | INT | Yes | No |
max.inbound.metadata.size | Sets the maximum size of metadata in bytes allowed to be received. |
8192 | INT | Yes | No |
server.shutdown.waiting.time | The time in seconds to wait for the server to shutdown, giving up if the timeout is reached. |
5 | LONG | Yes | No |
truststore.file | the file path of truststore. If this is provided then server authentication is enabled |
- | STRING | Yes | No |
truststore.password | the password of truststore. If this is provided then the integrity of the keystore is checked |
- | STRING | Yes | No |
truststore.algorithm | the encryption algorithm to be used for server authentication |
- | STRING | Yes | No |
tls.store.type | TLS store type |
- | STRING | Yes | No |
keystore.file | the file path of keystore. If this is provided then client authentication is enabled |
- | STRING | Yes | No |
keystore.password | the password of keystore |
- | STRING | Yes | No |
keystore.algorithm | the encryption algorithm to be used for client authentication |
- | STRING | Yes | No |
enable.ssl | to enable ssl. If set to true and keystore.file is not given then it will be set to default carbon jks by default |
FALSE | BOOL | Yes | No |
mutual.auth.enabled | to enable mutual authentication. If set to true and keystore.file or truststore.file is not given then it will be set to default carbon jks by default |
FALSE | BOOL | Yes | No |
threadpool.size | Sets the maximum size of threadpool dedicated to serve requests at the gRPC server |
100 | INT | Yes | No |
threadpool.buffer.size | Sets the maximum size of threadpool buffer server |
100 | INT | Yes | No |
System Parameters
Name | Description | Default Value | Possible Parameters |
---|---|---|---|
keyStoreFile | Path of the key store file |
${carbon.home}/resources/security/wso2carbon.jks | valid path for a key store file |
keyStorePassword | This is the password used with key store file |
wso2carbon | valid password for the key store file |
keyStoreAlgorithm | The encryption algorithm to be used for client authentication |
SunX509 | - |
trustStoreFile | This is the trust store file with the path |
${carbon.home}/resources/security/client-truststore.jks | - |
trustStorePassword | This is the password used with trust store file |
wso2carbon | valid password for the trust store file |
trustStoreAlgorithm | the encryption algorithm to be used for server authentication |
SunX509 | - |
Examples EXAMPLE 1
@source(type='grpc',
receiver.url='grpc://localhost:8888/org.wso2.grpc.EventService/consume',
@map(type='json'))
define stream BarStream (message String);
Here the port is given as 8888. So a grpc server will be started on port 8888 and the server will expose EventService. This is the default service packed with the source. In EventService the consume method is
EXAMPLE 2
@source(type='grpc',
receiver.url='grpc://localhost:8888/org.wso2.grpc.EventService/consume',
@map(type='json', @attributes(name='trp:name', age='trp:age', message='message')))
define stream BarStream (message String, name String, age int);
Here we are getting headers sent with the request as transport properties and injecting them into the stream. With each request a header will be sent in MetaData in the following format: 'Name:John', 'Age:23'
EXAMPLE 3
@source(type='grpc',
receiver.url='grpc://localhost:8888/org.wso2.grpc.MyService/send',
@map(type='protobuf'))
define stream BarStream (stringValue string, intValue int,longValue long,booleanValue bool,floatValue float,doubleValue double);
Here the port is given as 8888. So a grpc server will be started on port 8888 and sever will keep listening to the 'send' RPC method in the 'MyService' service.
EXAMPLE 4
@source(type='grpc',
receiver.url='grpc://localhost:8888/org.wso2.grpc.MyService/send',
@map(type='protobuf',
@attributes(a = 'stringValue', b = 'intValue', c = 'longValue',d = 'booleanValue', e ='floatValue', f ='doubleValue')))
define stream BarStream (a string ,c long,b int, d bool,e float,f double);
Here the port is given as 8888. So a grpc server will be started on port 8888 and sever will keep listening to the 'send' method in the 'MyService' service. Since we provide mapping in the stream we can use any names for stream attributes, but we have to map those names with correct protobuf message attributes' names. If we want to send metadata, we should map the attributes.
EXAMPLE 5
@source(type='grpc',
receiver.url='grpc://localhost:8888/org.wso2.grpc.StreamService/clientStream',
@map(type='protobuf'))
define stream BarStream (stringValue string, intValue int,longValue long,booleanValue bool,floatValue float,doubleValue double);
Here we receive a stream of requests to the grpc source. Whenever we want to use streaming with grpc source, we have to define the RPC method as client streaming method (look at the sample proto file provided in the resource folder[here](https://github.com/siddhi-io/siddhi-io-grpc/tree/master/component/src/main/resources)), when we define a stream method siddhi will identify it as a stream RPC method and ready to accept stream of request from the client.
grpc-call-response (Source)
This grpc source receives responses received from gRPC server for requests sent from a grpc-call sink. The source will receive responses for sink with the same sink.id. For example if you have a gRPC sink with sink.id 15 then we need to set the sink.id as 15 in the source to receives responses. Sinks and sources have 1:1 mapping
Syntax@source(type="grpc-call-response", sink.id="<INT>", @map(...)))
QUERY PARAMETERS
Name | Description | Default Value | Possible Data Types | Optional | Dynamic |
---|---|---|---|---|---|
sink.id | a unique ID that should be set for each grpc-call source. There is a 1:1 mapping between grpc-call sinks and grpc-call-response sources. Each sink has one particular source listening to the responses to requests published from that sink. So the same sink.id should be given when writing the sink also. |
INT | No | No |
Examples EXAMPLE 1
@source(type='grpc-call-response', sink.id= '1')
define stream BarStream (message String);@sink(type='grpc-call',
publisher.url = 'grpc://194.23.98.100:8080/EventService/process',
sink.id= '1', @map(type='json'))
define stream FooStream (message String);
Here we are listening to responses for requests sent from the sink with sink.id 1 will be received here. The results will be injected into BarStream
grpc-service (Source)
This extension implements a grpc server for receiving and responding to requests. During initialization time a grpc server is started on the user specified port exposing the required service as given in the url. This source also has a default mode and a user defined grpc service mode. By default this uses EventService
. Please find the proto definition here In the default mode this will use the EventService process
method. Please find the custom protobuf definition that uses in examples here. This accepts grpc message class Event as defined in the EventService proto. This uses grpc-service-response
sink to send reponses back in the same Event message format.
@source(type="grpc-service", receiver.url="<STRING>", max.inbound.message.size="<INT>", max.inbound.metadata.size="<INT>", service.timeout="<INT>", server.shutdown.waiting.time="<LONG>", truststore.file="<STRING>", truststore.password="<STRING>", truststore.algorithm="<STRING>", tls.store.type="<STRING>", keystore.file="<STRING>", keystore.password="<STRING>", keystore.algorithm="<STRING>", enable.ssl="<BOOL>", mutual.auth.enabled="<BOOL>", threadpool.size="<INT>", threadpool.buffer.size="<INT>", @map(...)))
QUERY PARAMETERS
Name | Description | Default Value | Possible Data Types | Optional | Dynamic |
---|---|---|---|---|---|
receiver.url | The url which can be used by a client to access the grpc server in this extension. This url should consist the host hostPort, port, fully qualified service name, method name in the following format. |
STRING | No | No | |
max.inbound.message.size | Sets the maximum message size in bytes allowed to be received on the server. |
4194304 | INT | Yes | No |
max.inbound.metadata.size | Sets the maximum size of metadata in bytes allowed to be received. |
8192 | INT | Yes | No |
service.timeout | The period of time in milliseconds to wait for siddhi to respond to a request received. After this time period of receiving a request it will be closed with an error message. |
10000 | INT | Yes | No |
server.shutdown.waiting.time | The time in seconds to wait for the server to shutdown, giving up if the timeout is reached. |
5 | LONG | Yes | No |
truststore.file | the file path of truststore. If this is provided then server authentication is enabled |
- | STRING | Yes | No |
truststore.password | the password of truststore. If this is provided then the integrity of the keystore is checked |
- | STRING | Yes | No |
truststore.algorithm | the encryption algorithm to be used for server authentication |
- | STRING | Yes | No |
tls.store.type | TLS store type |
- | STRING | Yes | No |
keystore.file | the file path of keystore. If this is provided then client authentication is enabled |
- | STRING | Yes | No |
keystore.password | the password of keystore |
- | STRING | Yes | No |
keystore.algorithm | the encryption algorithm to be used for client authentication |
- | STRING | Yes | No |
enable.ssl | to enable ssl. If set to true and keystore.file is not given then it will be set to default carbon jks by default |
FALSE | BOOL | Yes | No |
mutual.auth.enabled | to enable mutual authentication. If set to true and truststore.file or keystore.file is not given then it will be set to default carbon jks by default |
FALSE | BOOL | Yes | No |
threadpool.size | Sets the maximum size of threadpool dedicated to serve requests at the gRPC server |
100 | INT | Yes | No |
threadpool.buffer.size | Sets the maximum size of threadpool buffer server |
100 | INT | Yes | No |
System Parameters
Name | Description | Default Value | Possible Parameters |
---|---|---|---|
keyStoreFile | This is the key store file with the path |
${carbon.home}/resources/security/wso2carbon.jks | valid path for a key store file |
keyStorePassword | This is the password used with key store file |
wso2carbon | valid password for the key store file |
keyStoreAlgorithm | The encryption algorithm to be used for client authentication |
SunX509 | - |
trustStoreFile | This is the trust store file with the path |
${carbon.home}/resources/security/client-truststore.jks | - |
trustStorePassword | This is the password used with trust store file |
wso2carbon | valid password for the trust store file |
trustStoreAlgorithm | the encryption algorithm to be used for server authentication |
SunX509 | - |
Examples EXAMPLE 1
@source(type='grpc-service',
receiver.url='grpc://localhost:8888/org.wso2.grpc.EventService/process',
source.id='1',
@map(type='json', @attributes(messageId='trp:messageId', message='message')))
define stream FooStream (messageId String, message String);
Here a grpc server will be started at port 8888. The process method of EventService will be exposed for clients. source.id is set as 1. So a grpc-service-response sink with source.id = 1 will send responses back for requests received to this source. Note that it is required to specify the transport property messageId since we need to correlate the request message with the response.
EXAMPLE 2
@sink(type='grpc-service-response',
source.id='1',
message.id='{{messageId}}',
@map(type='json'))
define stream BarStream (messageId String, message String);
@source(type='grpc-service',
receiver.url='grpc://134.23.43.35:8080/org.wso2.grpc.EventService/process',
source.id='1',
@map(type='json', @attributes(messageId='trp:messageId', message='message')))
define stream FooStream (messageId String, message String);
from FooStream
select *
insert into BarStream;
The grpc requests are received through the grpc-service sink. Each received event is sent back through grpc-service-source. This is just a passthrough through Siddhi as we are selecting everything from FooStream and inserting into BarStream.
EXAMPLE 3
@source(type='grpc-service', source.id='1'
receiver.url='grpc://locanhost:8888/org.wso2.grpc.EventService/consume',
@map(type='json', @attributes(name='trp:name', age='trp:age', message='message'))) define stream BarStream (message String, name String, age int);
Here we are getting headers sent with the request as transport properties and injecting them into the stream. With each request a header will be sent in MetaData in the following format: 'Name:John', 'Age:23'
EXAMPLE 4
@sink(type='grpc-service-response',
source.id='1',
message.id='{{messageId}}',
@map(type='protobuf',
@payload(stringValue='a',intValue='b',longValue='c',booleanValue='d',floatValue = 'e', doubleValue ='f')))
define stream BarStream (a string,messageId string, b int,c long,d bool,e float,f double);
@source(type='grpc-service',
receiver.url='grpc://134.23.43.35:8888/org.wso2.grpc.test.MyService/process',
source.id='1',
@map(type='protobuf', @attributes(messageId='trp:message.id', a = 'stringValue', b = 'intValue', c = 'longValue',d = 'booleanValue', e = 'floatValue', f ='doubleValue')))
define stream FooStream (a string,messageId string, b int,c long,d bool,e float,f double);
from FooStream
select *
insert into BarStream;
Here a grpc server will be started at port 8888. The process method of the MyService will be exposed to the clients. 'source.id' is set as 1. So a grpc-service-response sink with source.id = 1 will send responses back for requests received to this source. Note that it is required to specify the transport property messageId since we need to correlate the request message with the response and also we should map stream attributes with correct protobuf message attributes even they define using the same name as protobuf message attributes.