API Docs - v2.3.0
Tested Siddhi Core version: 5.1.13
It could also support other Siddhi Core minor versions.
Sink
http (Sink)
HTTP sink publishes messages via HTTP or HTTPS protocols using methods such as POST, GET, PUT, and DELETE on formats text
, XML
and JSON
. It can also publish to endpoints protected by basic authentication or OAuth 2.0.
@sink(type="http", publisher.url="<STRING>", basic.auth.username="<STRING>", basic.auth.password="<STRING>", https.truststore.file="<STRING>", https.truststore.password="<STRING>", oauth.username="<STRING>", oauth.password="<STRING>", consumer.key="<STRING>", consumer.secret="<STRING>", token.url="<STRING>", refresh.token="<STRING>", headers="<STRING>", method="<STRING>", socket.idle.timeout="<INT>", chunk.disabled="<BOOL>", ssl.protocol="<STRING>", ssl.verification.disabled="<BOOL>", tls.store.type="<STRING>", ssl.configurations="<STRING>", proxy.host="<STRING>", proxy.port="<STRING>", proxy.username="<STRING>", proxy.password="<STRING>", client.bootstrap.configurations="<STRING>", max.pool.active.connections="<INT>", min.pool.idle.connections="<INT>", max.pool.idle.connections="<INT>", min.evictable.idle.time="<STRING>", time.between.eviction.runs="<STRING>", max.wait.time="<STRING>", test.on.borrow="<BOOL>", test.while.idle="<BOOL>", exhausted.action="<INT>", hostname.verification.enabled="<BOOL>", @map(...)))
QUERY PARAMETERS
Name | Description | Default Value | Possible Data Types | Optional | Dynamic |
---|---|---|---|---|---|
publisher.url | The URL to which the outgoing events should be published. |
STRING | No | No | |
basic.auth.username | The username to be included in the authentication header when calling endpoints protected by basic authentication. |
- | STRING | Yes | No |
basic.auth.password | The password to be included in the authentication header when calling endpoints protected by basic authentication. |
- | STRING | Yes | No |
https.truststore.file | The file path of the client truststore when sending messages through |
${carbon.home}/resources/security/client-truststore.jks |
STRING | Yes | No |
https.truststore.password | The password for the client-truststore. |
wso2carbon | STRING | Yes | No |
oauth.username | The username to be included in the authentication header when calling endpoints protected by OAuth 2.0. |
- | STRING | Yes | No |
oauth.password | The password to be included in the authentication header when calling endpoints protected by OAuth 2.0. |
- | STRING | Yes | No |
consumer.key | Consumer key used for calling endpoints protected by OAuth 2.0 |
- | STRING | Yes | No |
consumer.secret | Consumer secret used for calling endpoints protected by OAuth 2.0 |
- | STRING | Yes | No |
token.url | Token URL to generate a new access tokens when calling endpoints protected by OAuth 2.0 |
- | STRING | Yes | No |
refresh.token | Refresh token used for generating new access tokens when calling endpoints protected by OAuth 2.0 |
- | STRING | Yes | No |
headers | HTTP request headers in format |
Content-Type and Content-Length headers | STRING | Yes | No |
method | The HTTP method used for calling the endpoint. |
POST | STRING | Yes | No |
socket.idle.timeout | Socket timeout in millis. |
6000 | INT | Yes | No |
chunk.disabled | Disable chunked transfer encoding. |
false | BOOL | Yes | No |
ssl.protocol | SSL/TLS protocol. |
TLS | STRING | Yes | No |
ssl.verification.disabled | Disable SSL verification. |
false | BOOL | Yes | No |
tls.store.type | TLS store type. |
JKS | STRING | Yes | No |
ssl.configurations | SSL/TSL configurations in format |
- | STRING | Yes | No |
proxy.host | Proxy server host |
- | STRING | Yes | No |
proxy.port | Proxy server port |
- | STRING | Yes | No |
proxy.username | Proxy server username |
- | STRING | Yes | No |
proxy.password | Proxy server password |
- | STRING | Yes | No |
client.bootstrap.configurations | Client bootstrap configurations in format |
- | STRING | Yes | No |
max.pool.active.connections | Maximum possible number of active connection per client pool. |
-1 | INT | Yes | No |
min.pool.idle.connections | Minimum number of idle connections that can exist per client pool. |
0 | INT | Yes | No |
max.pool.idle.connections | Maximum number of idle connections that can exist per client pool. |
100 | INT | Yes | No |
min.evictable.idle.time | Minimum time (in millis) a connection may sit idle in the client pool before it become eligible for eviction. |
300000 | STRING | Yes | No |
time.between.eviction.runs | Time between two eviction operations (in millis) on the client pool. |
30000 | STRING | Yes | No |
max.wait.time | The maximum time (in millis) the pool will wait (when there are no available connections) for a connection to be returned to the pool. |
60000 | STRING | Yes | No |
test.on.borrow | Enable connections to be validated before being borrowed from the client pool. |
true | BOOL | Yes | No |
test.while.idle | Enable connections to be validated during the eviction operation (if any). |
true | BOOL | Yes | No |
exhausted.action | Action that should be taken when the maximum number of active connections are being used. This action should be indicated as an int and possible action values are following. |
1 (Block when exhausted) | INT | Yes | No |
hostname.verification.enabled | Enable hostname verification. |
true | BOOL | Yes | No |
System Parameters
Name | Description | Default Value | Possible Parameters |
---|---|---|---|
clientBootstrapClientGroupSize | Number of client threads to perform non-blocking read and write to one or more channels. |
(Number of available processors) * 2 | Any positive integer |
clientBootstrapBossGroupSize | Number of boss threads to accept incoming connections. |
Number of available processors | Any positive integer |
clientBootstrapWorkerGroupSize | Number of worker threads to accept the connections from boss threads and perform non-blocking read and write from one or more channels. |
(Number of available processors) * 2 | Any positive integer |
trustStoreLocation | The default truststore file path. |
${carbon.home}/resources/security/client-truststore.jks |
Path to client truststore .jks file |
trustStorePassword | The default truststore password. |
wso2carbon | Truststore password as string |
Examples EXAMPLE 1
@sink(type = 'http', publisher.url = 'http://stocks.com/stocks',
@map(type = 'json'))
define stream StockStream (symbol string, price float, volume long);
Events arriving on the StockStream will be published to the HTTP endpoint http://stocks.com/stocks
using POST
method with Content-Type application/json
by converting those events to the default JSON format as following:
{ "event": { "symbol": "FB", "price": 24.5, "volume": 5000 } }
EXAMPLE 2
@sink(type='http', publisher.url = 'http://localhost:8009/foo',
client.bootstrap.configurations = "'client.bootstrap.socket.timeout:20'",
max.pool.active.connections = '1', headers = "{{headers}}",
@map(type='xml', @payload("""<stock>
{{payloadBody}}
</stock>""")))
define stream FooStream (payloadBody String, headers string);
Events arriving on FooStream will be published to the HTTP endpoint http://localhost:8009/foo
using POST
method with Content-Type application/xml
and setting payloadBody
and header
attribute values.
If the payloadBody
contains
<symbol>WSO2</symbol> <price>55.6</price> <volume>100</volume>
and header
contains 'topic:foobar'
values, then the system will generate an output with the body:
<stock> <symbol>WSO2</symbol> <price>55.6</price> <volume>100</volume> </stock>
and HTTP headers:Content-Length:xxx
,Content-Location:'xxx'
,Content-Type:'application/xml'
,HTTP_METHOD:'POST'
http-call (Sink)
The http-call sink publishes messages to endpoints via HTTP or HTTPS protocols using methods such as POST, GET, PUT, and DELETE on formats text
, XML
or JSON
and consume responses through its corresponding http-call-response source. It also supports calling endpoints protected with basic authentication or OAuth 2.0.
@sink(type="http-call", publisher.url="<STRING>", sink.id="<STRING>", basic.auth.username="<STRING>", basic.auth.password="<STRING>", https.truststore.file="<STRING>", https.truststore.password="<STRING>", oauth.username="<STRING>", oauth.password="<STRING>", consumer.key="<STRING>", consumer.secret="<STRING>", token.url="<STRING>", refresh.token="<STRING>", headers="<STRING>", method="<STRING>", downloading.enabled="<BOOL>", download.path="<STRING>", blocking.io="<BOOL>", socket.idle.timeout="<INT>", chunk.disabled="<BOOL>", ssl.protocol="<STRING>", ssl.verification.disabled="<BOOL>", ssl.configurations="<STRING>", proxy.host="<STRING>", proxy.port="<STRING>", proxy.username="<STRING>", proxy.password="<STRING>", client.bootstrap.configurations="<STRING>", max.pool.active.connections="<INT>", min.pool.idle.connections="<INT>", max.pool.idle.connections="<INT>", min.evictable.idle.time="<STRING>", time.between.eviction.runs="<STRING>", max.wait.time="<STRING>", test.on.borrow="<BOOL>", test.while.idle="<BOOL>", exhausted.action="<INT>", hostname.verification.enabled="<BOOL>", @map(...)))
QUERY PARAMETERS
Name | Description | Default Value | Possible Data Types | Optional | Dynamic |
---|---|---|---|---|---|
publisher.url | The URL which should be called. |
STRING | No | No | |
sink.id | Identifier to correlate the http-call sink to its corresponding http-call-response sources to retrieved the responses. |
STRING | No | No | |
basic.auth.username | The username to be included in the authentication header when calling endpoints protected by basic authentication. |
- | STRING | Yes | No |
basic.auth.password | The password to be included in the authentication header when calling endpoints protected by basic authentication. |
- | STRING | Yes | No |
https.truststore.file | The file path of the client truststore when sending messages through |
${carbon.home}/resources/security/client-truststore.jks |
STRING | Yes | No |
https.truststore.password | The password for the client-truststore. |
wso2carbon | STRING | Yes | No |
oauth.username | The username to be included in the authentication header when calling endpoints protected by OAuth 2.0. |
- | STRING | Yes | No |
oauth.password | The password to be included in the authentication header when calling endpoints protected by OAuth 2.0. |
- | STRING | Yes | No |
consumer.key | Consumer key used for calling endpoints protected by OAuth 2.0 |
- | STRING | Yes | No |
consumer.secret | Consumer secret used for calling endpoints protected by OAuth 2.0 |
- | STRING | Yes | No |
token.url | Token URL to generate a new access tokens when calling endpoints protected by OAuth 2.0 |
- | STRING | Yes | No |
refresh.token | Refresh token used for generating new access tokens when calling endpoints protected by OAuth 2.0 |
- | STRING | Yes | No |
headers | HTTP request headers in format |
Content-Type and Content-Length headers | STRING | Yes | No |
method | The HTTP method used for calling the endpoint. |
POST | STRING | Yes | No |
downloading.enabled | Enable response received by the http-call-response source to be written to a file. When this is enabled the |
false | BOOL | Yes | No |
download.path | The absolute file path along with the file name where the downloads should be saved. |
- | STRING | Yes | Yes |
blocking.io | Blocks the request thread until a response it received from HTTP call-response source before sending any other request. |
false | BOOL | Yes | No |
socket.idle.timeout | Socket timeout in millis. |
6000 | INT | Yes | No |
chunk.disabled | Disable chunked transfer encoding. |
false | BOOL | Yes | No |
ssl.protocol | SSL/TLS protocol. |
TLS | STRING | Yes | No |
ssl.verification.disabled | Disable SSL verification. |
false | BOOL | Yes | No |
ssl.configurations | SSL/TSL configurations. |
- | STRING | Yes | No |
proxy.host | Proxy server host |
- | STRING | Yes | No |
proxy.port | Proxy server port |
- | STRING | Yes | No |
proxy.username | Proxy server username |
- | STRING | Yes | No |
proxy.password | Proxy server password |
- | STRING | Yes | No |
client.bootstrap.configurations | Client bootstrap configurations in format |
- | STRING | Yes | No |
max.pool.active.connections | Maximum possible number of active connection per client pool. |
-1 | INT | Yes | No |
min.pool.idle.connections | Minimum number of idle connections that can exist per client pool. |
0 | INT | Yes | No |
max.pool.idle.connections | Maximum number of idle connections that can exist per client pool. |
100 | INT | Yes | No |
min.evictable.idle.time | Minimum time (in millis) a connection may sit idle in the client pool before it become eligible for eviction. |
300000 | STRING | Yes | No |
time.between.eviction.runs | Time between two eviction operations (in millis) on the client pool. |
30000 | STRING | Yes | No |
max.wait.time | The maximum time (in millis) the pool will wait (when there are no available connections) for a connection to be returned to the pool. |
60000 | STRING | Yes | No |
test.on.borrow | Enable connections to be validated before being borrowed from the client pool. |
true | BOOL | Yes | No |
test.while.idle | Enable connections to be validated during the eviction operation (if any). |
true | BOOL | Yes | No |
exhausted.action | Action that should be taken when the maximum number of active connections are being used. This action should be indicated as an int and possible action values are following. |
1 (Block when exhausted) | INT | Yes | No |
hostname.verification.enabled | Enable hostname verification |
true | BOOL | Yes | No |
System Parameters
Name | Description | Default Value | Possible Parameters |
---|---|---|---|
clientBootstrapClientGroupSize | Number of client threads to perform non-blocking read and write to one or more channels. |
(Number of available processors) * 2 | Any positive integer |
clientBootstrapBossGroupSize | Number of boss threads to accept incoming connections. |
Number of available processors | Any positive integer |
clientBootstrapWorkerGroupSize | Number of worker threads to accept the connections from boss threads and perform non-blocking read and write from one or more channels. |
(Number of available processors) * 2 | Any positive integer |
trustStoreLocation | The default truststore file path. |
${carbon.home}/resources/security/client-truststore.jks |
Path to client truststore .jks file |
trustStorePassword | The default truststore password. |
wso2carbon | Truststore password as string |
Examples EXAMPLE 1
@sink(type='http-call', sink.id='foo',
publisher.url='http://localhost:8009/foo',
@map(type='xml', @payload('{{payloadBody}}')))
define stream FooStream (payloadBody string);
@source(type='http-call-response', sink.id='foo',
@map(type='text', regex.A='((.|\n)*)',
@attributes(headers='trp:headers', message='A[1]')))
define stream ResponseStream(message string, headers string);
When events arrive in FooStream
, http-call sink makes calls to endpoint on url http://localhost:8009/foo
with POST
method and Content-Type application/xml
.
If the event payloadBody
attribute contains following XML:
<item> <name>apple</name> <price>55</price> <quantity>5</quantity> </item>
the http-call sink maps that and sends it to the endpoint.
When endpoint sends a response it will be consumed by the corresponding http-call-response source correlated via the same sink.id
foo
and that will map the response message and send it via ResponseStream
steam by assigning the message body as message
attribute and response headers as headers
attribute of the event.
EXAMPLE 2
@sink(type='http-call', publisher.url='http://localhost:8005/files/{{name}}'
downloading.enabled='true', download.path='{{downloadPath}}{{name}}',
method='GET', sink.id='download', @map(type='json'))
define stream DownloadRequestStream(name String, id int, downloadPath string);
@source(type='http-call-response', sink.id='download',
http.status.code='2\\d+',
@map(type='text', regex.A='((.|\n)*)',
@attributes(name='trp:name', id='trp:id', file='A[1]')))
define stream ResponseStream2xx(name string, id string, file string);
@source(type='http-call-response', sink.id='download',
http.status.code='4\\d+',
@map(type='text', regex.A='((.|\n)*)', @attributes(errorMsg='A[1]')))
define stream ResponseStream4xx(errorMsg string);
When events arrive in DownloadRequestStream
with name
:foo.txt
, id
:75
and downloadPath
:/user/download/
the http-call sink sends a GET request to the url http://localhost:8005/files/foo.txt
to download the file to the given path /user/download/foo.txt
and capture the response via its corresponding http-call-response source based on the response status code.
If the response status code is in the range of 200 the message will be received by the http-call-response source associated with the ResponseStream2xx
stream which expects http.status.code
with regex 2\\d+
while downloading the file to the local file system on the path /user/download/foo.txt
and mapping the response message having the absolute file path to event's file
attribute.
If the response status code is in the range of 400 then the message will be received by the http-call-response source associated with the ResponseStream4xx
stream which expects http.status.code
with regex 4\\d+
while mapping the error response to the errorMsg
attribute of the event.
http-request (Sink)
Deprecated
(Use http-call sink instead).
The http-request sink publishes messages to endpoints via HTTP or HTTPS protocols using methods such as POST, GET, PUT, and DELETE on formats text
, XML
or JSON
and consume responses through its corresponding http-response source. It also supports calling endpoints protected with basic authentication or OAuth 2.0.
@sink(type="http-request", publisher.url="<STRING>", sink.id="<STRING>", basic.auth.username="<STRING>", basic.auth.password="<STRING>", https.truststore.file="<STRING>", https.truststore.password="<STRING>", oauth.username="<STRING>", oauth.password="<STRING>", consumer.key="<STRING>", consumer.secret="<STRING>", token.url="<STRING>", refresh.token="<STRING>", headers="<STRING>", method="<STRING>", downloading.enabled="<BOOL>", download.path="<STRING>", blocking.io="<BOOL>", socket.idle.timeout="<INT>", chunk.disabled="<BOOL>", ssl.protocol="<STRING>", ssl.verification.disabled="<BOOL>", ssl.configurations="<STRING>", proxy.host="<STRING>", proxy.port="<STRING>", proxy.username="<STRING>", proxy.password="<STRING>", client.bootstrap.configurations="<STRING>", max.pool.active.connections="<INT>", min.pool.idle.connections="<INT>", max.pool.idle.connections="<INT>", min.evictable.idle.time="<STRING>", time.between.eviction.runs="<STRING>", max.wait.time="<STRING>", test.on.borrow="<BOOL>", test.while.idle="<BOOL>", exhausted.action="<INT>", hostname.verification.enabled="<BOOL>", @map(...)))
QUERY PARAMETERS
Name | Description | Default Value | Possible Data Types | Optional | Dynamic |
---|---|---|---|---|---|
publisher.url | The URL which should be called. |
STRING | No | No | |
sink.id | Identifier to correlate the http-request sink to its corresponding http-response sources to retrieved the responses. |
STRING | No | No | |
basic.auth.username | The username to be included in the authentication header when calling endpoints protected by basic authentication. |
- | STRING | Yes | No |
basic.auth.password | The password to be included in the authentication header when calling endpoints protected by basic authentication. |
- | STRING | Yes | No |
https.truststore.file | The file path of the client truststore when sending messages through |
${carbon.home}/resources/security/client-truststore.jks |
STRING | Yes | No |
https.truststore.password | The password for the client-truststore. |
wso2carbon | STRING | Yes | No |
oauth.username | The username to be included in the authentication header when calling endpoints protected by OAuth 2.0. |
- | STRING | Yes | No |
oauth.password | The password to be included in the authentication header when calling endpoints protected by OAuth 2.0. |
- | STRING | Yes | No |
consumer.key | Consumer key used for calling endpoints protected by OAuth 2.0 |
- | STRING | Yes | No |
consumer.secret | Consumer secret used for calling endpoints protected by OAuth 2.0 |
- | STRING | Yes | No |
token.url | Token URL to generate a new access tokens when calling endpoints protected by OAuth 2.0 |
- | STRING | Yes | No |
refresh.token | Refresh token used for generating new access tokens when calling endpoints protected by OAuth 2.0 |
- | STRING | Yes | No |
headers | HTTP request headers in format |
Content-Type and Content-Length headers | STRING | Yes | No |
method | The HTTP method used for calling the endpoint. |
POST | STRING | Yes | No |
downloading.enabled | Enable response received by the http-response source to be written to a file. When this is enabled the |
false | BOOL | Yes | No |
download.path | The absolute file path along with the file name where the downloads should be saved. |
- | STRING | Yes | Yes |
blocking.io | Blocks the request thread until a response it received from HTTP call-response source before sending any other request. |
false | BOOL | Yes | No |
socket.idle.timeout | Socket timeout in millis. |
6000 | INT | Yes | No |
chunk.disabled | Disable chunked transfer encoding. |
false | BOOL | Yes | No |
ssl.protocol | SSL/TLS protocol. |
TLS | STRING | Yes | No |
ssl.verification.disabled | Disable SSL verification. |
false | BOOL | Yes | No |
ssl.configurations | SSL/TSL configurations in format |
- | STRING | Yes | No |
proxy.host | Proxy server host |
- | STRING | Yes | No |
proxy.port | Proxy server port |
- | STRING | Yes | No |
proxy.username | Proxy server username |
- | STRING | Yes | No |
proxy.password | Proxy server password |
- | STRING | Yes | No |
client.bootstrap.configurations | Client bootstrap configurations in format |
- | STRING | Yes | No |
max.pool.active.connections | Maximum possible number of active connection per client pool. |
-1 | INT | Yes | No |
min.pool.idle.connections | Minimum number of idle connections that can exist per client pool. |
0 | INT | Yes | No |
max.pool.idle.connections | Maximum number of idle connections that can exist per client pool. |
100 | INT | Yes | No |
min.evictable.idle.time | Minimum time (in millis) a connection may sit idle in the client pool before it become eligible for eviction. |
300000 | STRING | Yes | No |
time.between.eviction.runs | Time between two eviction operations (in millis) on the client pool. |
30000 | STRING | Yes | No |
max.wait.time | The maximum time (in millis) the pool will wait (when there are no available connections) for a connection to be returned to the pool. |
60000 | STRING | Yes | No |
test.on.borrow | Enable connections to be validated before being borrowed from the client pool. |
true | BOOL | Yes | No |
test.while.idle | Enable connections to be validated during the eviction operation (if any). |
true | BOOL | Yes | No |
exhausted.action | Action that should be taken when the maximum number of active connections are being used. This action should be indicated as an int and possible action values are following. |
1 (Block when exhausted) | INT | Yes | No |
hostname.verification.enabled | Enable hostname verification |
true | BOOL | Yes | No |
System Parameters
Name | Description | Default Value | Possible Parameters |
---|---|---|---|
clientBootstrapClientGroupSize | Number of client threads to perform non-blocking read and write to one or more channels. |
(Number of available processors) * 2 | Any positive integer |
clientBootstrapBossGroupSize | Number of boss threads to accept incoming connections. |
Number of available processors | Any positive integer |
clientBootstrapWorkerGroupSize | Number of worker threads to accept the connections from boss threads and perform non-blocking read and write from one or more channels. |
(Number of available processors) * 2 | Any positive integer |
trustStoreLocation | The default truststore file path. |
${carbon.home}/resources/security/client-truststore.jks |
Path to client truststore .jks file |
trustStorePassword | The default truststore password. |
wso2carbon | Truststore password as string |
Examples EXAMPLE 1
@sink(type='http-request', sink.id='foo',
publisher.url='http://localhost:8009/foo',
@map(type='xml', @payload('{{payloadBody}}')))
define stream FooStream (payloadBody string);
@source(type='http-response', sink.id='foo',
@map(type='text', regex.A='((.|\n)*)',
@attributes(headers='trp:headers', message='A[1]')))
define stream ResponseStream(message string, headers string);
When events arrive in FooStream
, http-request sink makes calls to endpoint on url http://localhost:8009/foo
with POST
method and Content-Type application/xml
.
If the event payloadBody
attribute contains following XML:
<item> <name>apple</name> <price>55</price> <quantity>5</quantity> </item>
the http-request sink maps that and sends it to the endpoint.
When endpoint sends a response it will be consumed by the corresponding http-response source correlated via the same sink.id
foo
and that will map the response message and send it via ResponseStream
steam by assigning the message body as message
attribute and response headers as headers
attribute of the event.
EXAMPLE 2
@sink(type='http-request', publisher.url='http://localhost:8005/files/{{name}}'
downloading.enabled='true', download.path='{{downloadPath}}{{name}}',
method='GET', sink.id='download', @map(type='json'))
define stream DownloadRequestStream(name String, id int, downloadPath string);
@source(type='http-response', sink.id='download',
http.status.code='2\\d+',
@map(type='text', regex.A='((.|\n)*)',
@attributes(name='trp:name', id='trp:id', file='A[1]')))
define stream ResponseStream2xx(name string, id string, file string);
@source(type='http-response', sink.id='download',
http.status.code='4\\d+',
@map(type='text', regex.A='((.|\n)*)', @attributes(errorMsg='A[1]')))
define stream ResponseStream4xx(errorMsg string);
When events arrive in DownloadRequestStream
with name
:foo.txt
, id
:75
and downloadPath
:/user/download/
the http-request sink sends a GET request to the url http://localhost:8005/files/foo.txt
to download the file to the given path /user/download/foo.txt
and capture the response via its corresponding http-response source based on the response status code.
If the response status code is in the range of 200 the message will be received by the http-response source associated with the ResponseStream2xx
stream which expects http.status.code
with regex 2\\d+
while downloading the file to the local file system on the path /user/download/foo.txt
and mapping the response message having the absolute file path to event's file
attribute.
If the response status code is in the range of 400 then the message will be received by the http-response source associated with the ResponseStream4xx
stream which expects http.status.code
with regex 4\\d+
while mapping the error response to the errorMsg
attribute of the event.
http-response (Sink)
Deprecated
(Use http-service-response sink instead).
The http-response sink send responses of the requests consumed by its corresponding http-request source, by mapping the response messages to formats such as text
, XML
and JSON
.
@sink(type="http-response", source.id="<STRING>", message.id="<STRING>", headers="<STRING>", @map(...)))
QUERY PARAMETERS
Name | Description | Default Value | Possible Data Types | Optional | Dynamic |
---|---|---|---|---|---|
source.id | Identifier to correlate the http-response sink to its corresponding http-request source which consumed the request. |
STRING | No | No | |
message.id | Identifier to correlate the response with the request received by http-request source. |
STRING | No | Yes | |
headers | HTTP request headers in format |
Content-Type and Content-Length headers | STRING | Yes | No |
Examples EXAMPLE 1
@source(type='http-request', receiver.url='http://localhost:5005/add',
source.id='adder',
@map(type='json, @attributes(messageId='trp:messageId',
value1='$.event.value1',
value2='$.event.value2')))
define stream AddStream (messageId string, value1 long, value2 long);
@sink(type='http-response', source.id='adder',
message.id='{{messageId}}', @map(type = 'json'))
define stream ResultStream (messageId string, results long);
@info(name = 'query1')
from AddStream
select messageId, value1 + value2 as results
insert into ResultStream;
The http-request source on stream AddStream
listens on url http://localhost:5005/stocks
for JSON messages with format:
{ "event": { "value1": 3, "value2": 4 } }
and when events arrive it maps to AddStream
events and pass them to query query1
for processing. The query results produced on ResultStream
are sent as a response via http-response sink with format:
{ "event": { "results": 7 } }
Here the request and response are correlated by passing the messageId
produced by the http-request to the respective http-response sink.
http-service-response (Sink)
The http-service-response sink send responses of the requests consumed by its corresponding http-service source, by mapping the response messages to formats such as text
, XML
and JSON
.
@sink(type="http-service-response", source.id="<STRING>", message.id="<STRING>", headers="<STRING>", @map(...)))
QUERY PARAMETERS
Name | Description | Default Value | Possible Data Types | Optional | Dynamic |
---|---|---|---|---|---|
source.id | Identifier to correlate the http-service-response sink to its corresponding http-service source which consumed the request. |
STRING | No | No | |
message.id | Identifier to correlate the response with the request received by http-service source. |
STRING | No | Yes | |
headers | HTTP request headers in format |
Content-Type and Content-Length headers | STRING | Yes | No |
Examples EXAMPLE 1
@source(type='http-service', receiver.url='http://localhost:5005/add',
source.id='adder',
@map(type='json, @attributes(messageId='trp:messageId',
value1='$.event.value1',
value2='$.event.value2')))
define stream AddStream (messageId string, value1 long, value2 long);
@sink(type='http-service-response', source.id='adder',
message.id='{{messageId}}', @map(type = 'json'))
define stream ResultStream (messageId string, results long);
@info(name = 'query1')
from AddStream
select messageId, value1 + value2 as results
insert into ResultStream;
The http-service source on stream AddStream
listens on url http://localhost:5005/stocks
for JSON messages with format:
{ "event": { "value1": 3, "value2": 4 } }
and when events arrive it maps to AddStream
events and pass them to query query1
for processing. The query results produced on ResultStream
are sent as a response via http-service-response sink with format:
{ "event": { "results": 7 } }
Here the request and response are correlated by passing the messageId
produced by the http-service to the respective http-service-response sink.
Source
http (Source)
HTTP source receives POST requests via HTTP and HTTPS protocols in format such as text
, XML
and JSON
. It also supports basic authentication to ensure events are received from authorized users/systems.
The request headers and properties can be accessed via transport properties in the format trp:<header>
.
@source(type="http", receiver.url="<STRING>", basic.auth.enabled="<STRING>", worker.count="<INT>", socket.idle.timeout="<INT>", ssl.verify.client="<STRING>", ssl.protocol="<STRING>", tls.store.type="<STRING>", ssl.configurations="<STRING>", request.size.validation.configurations="<STRING>", header.validation.configurations="<STRING>", server.bootstrap.configurations="<STRING>", trace.log.enabled="<BOOL>", @map(...)))
QUERY PARAMETERS
Name | Description | Default Value | Possible Data Types | Optional | Dynamic |
---|---|---|---|---|---|
receiver.url | The URL on which events should be received. To enable SSL use |
http://0.0.0.0:9763/<appNAme>/<streamName> |
STRING | Yes | No |
basic.auth.enabled | This only works in VM, Docker and Kubernetes. |
false | STRING | Yes | No |
worker.count | The number of active worker threads to serve the incoming events. By default the value is set to |
1 | INT | Yes | No |
socket.idle.timeout | Idle timeout for HTTP connection in millis. |
120000 | INT | Yes | No |
ssl.verify.client | The type of client certificate verification. Supported values are |
- | STRING | Yes | No |
ssl.protocol | SSL/TLS protocol. |
TLS | STRING | Yes | No |
tls.store.type | TLS store type. |
JKS | STRING | Yes | No |
ssl.configurations | SSL/TSL configurations in format |
- | STRING | Yes | No |
request.size.validation.configurations | Configurations to validate the HTTP request size. |
- | STRING | Yes | No |
header.validation.configurations | Configurations to validate HTTP headers. |
- | STRING | Yes | No |
server.bootstrap.configurations | Server bootstrap configurations in format |
- | STRING | Yes | No |
trace.log.enabled | Enable trace log for traffic monitoring. |
false | BOOL | Yes | No |
System Parameters
Name | Description | Default Value | Possible Parameters |
---|---|---|---|
serverBootstrapBossGroupSize | Number of boss threads to accept incoming connections. |
Number of available processors | Any positive integer |
serverBootstrapWorkerGroupSize | Number of worker threads to accept the connections from boss threads and perform non-blocking read and write from one or more channels. |
(Number of available processors) * 2 | Any positive integer |
serverBootstrapClientGroupSize | Number of client threads to perform non-blocking read and write to one or more channels. |
(Number of available processors) * 2 | Any positive integer |
defaultHost | The default host of the transport. |
0.0.0.0 | Any valid host |
defaultScheme | The default protocol. |
http | http https |
defaultHttpPort | The default HTTP port when default scheme is |
8280 | Any valid port |
defaultHttpsPort | The default HTTPS port when default scheme is |
8243 | Any valid port |
keyStoreLocation | The default keystore file path. |
${carbon.home}/resources/security/wso2carbon.jks |
Path to .jks file |
keyStorePassword | The default keystore password. |
wso2carbon | Keystore password as string |
Examples EXAMPLE 1
@app.name('StockProcessor')
@source(type='http', @map(type = 'json'))
define stream StockStream (symbol string, price float, volume long);
Above HTTP source listeners on url http://0.0.0.0:9763/StockProcessor/StockStream
for JSON messages on the format:
{ "event": { "symbol": "FB", "price": 24.5, "volume": 5000 } }
It maps the incoming messages and sends them to StockStream
for processing.
EXAMPLE 2
@source(type='http', receiver.url='http://localhost:5005/stocks',
@map(type = 'xml'))
define stream StockStream (symbol string, price float, volume long);
Above HTTP source listeners on url http://localhost:5005/stocks
for JSON messages on the format:
<events> <event> <symbol>Fb</symbol> <price>55.6</price> <volume>100</volume> </event> </events>
It maps the incoming messages and sends them to StockStream
for processing.
http-call-response (Source)
The http-call-response source receives the responses for the calls made by its corresponding http-call sink, and maps them from formats such as text
, XML
and JSON
.
To handle messages with different http status codes having different formats, multiple http-call-response sources are allowed to associate with a single http-call sink.
It allows accessing the attributes of the event that initiated the call, and the response headers and properties via transport properties in the format trp:<attribute name>
and trp:<header/property>
respectively.
@source(type="http-call-response", sink.id="<STRING>", http.status.code="<STRING>", allow.streaming.responses="<BOOL>", @map(...)))
QUERY PARAMETERS
Name | Description | Default Value | Possible Data Types | Optional | Dynamic |
---|---|---|---|---|---|
sink.id | Identifier to correlate the http-call-response source with its corresponding http-call sink that published the messages. |
STRING | No | No | |
http.status.code | The matching http responses status code regex, that is used to filter the the messages which will be processed by the source.Eg: |
200 | STRING | Yes | No |
allow.streaming.responses | Enable consuming responses on a streaming manner. |
false | BOOL | Yes | No |
Examples EXAMPLE 1
@sink(type='http-call', method='POST',
publisher.url='http://localhost:8005/registry/employee',
sink.id='employee-info', @map(type='json'))
define stream EmployeeRequestStream (name string, id int);
@source(type='http-call-response', sink.id='employee-info',
http.status.code='2\\d+',
@map(type='json',
@attributes(name='trp:name', id='trp:id',
location='$.town', age='$.age')))
define stream EmployeeResponseStream(name string, id int,
location string, age int);
@source(type='http-call-response', sink.id='employee-info',
http.status.code='4\\d+',
@map(type='text', regex.A='((.|\n)*)',
@attributes(error='A[1]')))
define stream EmployeeErrorStream(error string);
When events arrive in EmployeeRequestStream
, http-call sink makes calls to endpoint on url http://localhost:8005/registry/employee
with POST
method and Content-Type application/json
.
If the arriving event has attributes name
:John
and id
:1423
it will send a message with default JSON mapping as follows:
{ "event": { "name": "John", "id": 1423 } }
When the endpoint responds with status code in the range of 200 the message will be received by the http-call-response source associated with the EmployeeResponseStream
stream, because it is correlated with the sink by the same sink.id
employee-info
and as that expects messages with http.status.code
in regex format 2\\d+
. If the response message is in the format
{ "town": "NY", "age": 24 }
the source maps the location
and age
attributes by executing JSON path on the message and maps the name
and id
attributes by extracting them from the request event via as transport properties.
If the response status code is in the range of 400 then the message will be received by the http-call-response source associated with the EmployeeErrorStream
stream, because it is correlated with the sink by the same sink.id
employee-info
and it expects messages with http.status.code
in regex format 4\\d+
, and maps the error response to the error
attribute of the event.
http-request (Source)
Deprecated
(Use http-service source instead).
The http-request source receives POST requests via HTTP and HTTPS protocols in format such as text
, XML
and JSON
and sends responses via its corresponding http-response sink correlated through a unique source.id
.
For request and response correlation, it generates a messageId
upon each incoming request and expose it via transport properties in the format trp:messageId
to correlate them with the responses at the http-response sink.
The request headers and properties can be accessed via transport properties in the format trp:<header>
.
It also supports basic authentication to ensure events are received from authorized users/systems.
@source(type="http-request", receiver.url="<STRING>", source.id="<STRING>", connection.timeout="<INT>", basic.auth.enabled="<STRING>", worker.count="<INT>", socket.idle.timeout="<INT>", ssl.verify.client="<STRING>", ssl.protocol="<STRING>", tls.store.type="<STRING>", ssl.configurations="<STRING>", request.size.validation.configurations="<STRING>", header.validation.configurations="<STRING>", server.bootstrap.configurations="<STRING>", trace.log.enabled="<BOOL>", @map(...)))
QUERY PARAMETERS
Name | Description | Default Value | Possible Data Types | Optional | Dynamic |
---|---|---|---|---|---|
receiver.url | The URL on which events should be received. To enable SSL use |
http://0.0.0.0:9763/<appNAme>/<streamName> |
STRING | Yes | No |
source.id | Identifier to correlate the http-request source to its corresponding http-response sinks to send responses. |
STRING | No | No | |
connection.timeout | Connection timeout in millis. The system will send a timeout, if a corresponding response is not sent by an associated http-response sink within the given time. |
120000 | INT | Yes | No |
basic.auth.enabled | This only works in VM, Docker and Kubernetes. |
false | STRING | Yes | No |
worker.count | The number of active worker threads to serve the incoming events. By default the value is set to |
1 | INT | Yes | No |
socket.idle.timeout | Idle timeout for HTTP connection in millis. |
120000 | INT | Yes | No |
ssl.verify.client | The type of client certificate verification. Supported values are |
- | STRING | Yes | No |
ssl.protocol | SSL/TLS protocol. |
TLS | STRING | Yes | No |
tls.store.type | TLS store type. |
JKS | STRING | Yes | No |
ssl.configurations | SSL/TSL configurations in format |
- | STRING | Yes | No |
request.size.validation.configurations | Configurations to validate the HTTP request size. |
- | STRING | Yes | No |
header.validation.configurations | Configurations to validate HTTP headers. |
- | STRING | Yes | No |
server.bootstrap.configurations | Server bootstrap configurations in format |
- | STRING | Yes | No |
trace.log.enabled | Enable trace log for traffic monitoring. |
false | BOOL | Yes | No |
System Parameters
Name | Description | Default Value | Possible Parameters |
---|---|---|---|
serverBootstrapBossGroupSize | Number of boss threads to accept incoming connections. |
Number of available processors | Any positive integer |
serverBootstrapWorkerGroupSize | Number of worker threads to accept the connections from boss threads and perform non-blocking read and write from one or more channels. |
(Number of available processors) * 2 | Any positive integer |
serverBootstrapClientGroupSize | Number of client threads to perform non-blocking read and write to one or more channels. |
(Number of available processors) * 2 | Any positive integer |
defaultHost | The default host of the transport. |
0.0.0.0 | Any valid host |
defaultScheme | The default protocol. |
http | http https |
defaultHttpPort | The default HTTP port when default scheme is |
8280 | Any valid port |
defaultHttpsPort | The default HTTPS port when default scheme is |
8243 | Any valid port |
keyStoreLocation | The default keystore file path. |
${carbon.home}/resources/security/wso2carbon.jks |
Path to .jks file |
keyStorePassword | The default keystore password. |
wso2carbon | Keystore password as string |
Examples EXAMPLE 1
@source(type='http-request', receiver.url='http://localhost:5005/add',
source.id='adder',
@map(type='json, @attributes(messageId='trp:messageId',
value1='$.event.value1',
value2='$.event.value2')))
define stream AddStream (messageId string, value1 long, value2 long);
@sink(type='http-response', source.id='adder',
message.id='{{messageId}}', @map(type = 'json'))
define stream ResultStream (messageId string, results long);
@info(name = 'query1')
from AddStream
select messageId, value1 + value2 as results
insert into ResultStream;
Above sample listens events on http://localhost:5005/stocks
url for JSON messages on the format:
{ "event": { "value1": 3, "value2": 4 } }
Map the vents into AddStream, process the events through query query1
, and sends the results produced on ResultStream via http-response sink on the message format:
{ "event": { "results": 7 } }
http-response (Source)
Deprecated
(Use http-call-response source instead).
The http-response source receives the responses for the calls made by its corresponding http-request sink, and maps them from formats such as text
, XML
and JSON
.
To handle messages with different http status codes having different formats, multiple http-response sources are allowed to associate with a single http-request sink. It allows accessing the attributes of the event that initiated the call, and the response headers and properties via transport properties in the format trp:<attribute name>
and trp:<header/property>
respectively.
@source(type="http-response", sink.id="<STRING>", http.status.code="<STRING>", allow.streaming.responses="<BOOL>", @map(...)))
QUERY PARAMETERS
Name | Description | Default Value | Possible Data Types | Optional | Dynamic |
---|---|---|---|---|---|
sink.id | Identifier to correlate the http-response source with its corresponding http-request sink that published the messages. |
STRING | No | No | |
http.status.code | The matching http responses status code regex, that is used to filter the the messages which will be processed by the source.Eg: |
200 | STRING | Yes | No |
allow.streaming.responses | Enable consuming responses on a streaming manner. |
false | BOOL | Yes | No |
Examples EXAMPLE 1
@sink(type='http-request', method='POST',
publisher.url='http://localhost:8005/registry/employee',
sink.id='employee-info', @map(type='json'))
define stream EmployeeRequestStream (name string, id int);
@source(type='http-response', sink.id='employee-info',
http.status.code='2\\d+',
@map(type='json',
@attributes(name='trp:name', id='trp:id',
location='$.town', age='$.age')))
define stream EmployeeResponseStream(name string, id int,
location string, age int);
@source(type='http-response', sink.id='employee-info',
http.status.code='4\\d+',
@map(type='text', regex.A='((.|\n)*)',
@attributes(error='A[1]')))
define stream EmployeeErrorStream(error string);
When events arrive in EmployeeRequestStream
, http-request sink makes calls to endpoint on url http://localhost:8005/registry/employee
with POST
method and Content-Type application/json
.
If the arriving event has attributes name
:John
and id
:1423
it will send a message with default JSON mapping as follows:
{ "event": { "name": "John", "id": 1423 } }
When the endpoint responds with status code in the range of 200 the message will be received by the http-response source associated with the EmployeeResponseStream
stream, because it is correlated with the sink by the same sink.id
employee-info
and as that expects messages with http.status.code
in regex format 2\\d+
. If the response message is in the format
{ "town": "NY", "age": 24 }
the source maps the location
and age
attributes by executing JSON path on the message and maps the name
and id
attributes by extracting them from the request event via as transport properties.
If the response status code is in the range of 400 then the message will be received by the http-response source associated with the EmployeeErrorStream
stream, because it is correlated with the sink by the same sink.id
employee-info
and it expects messages with http.status.code
in regex format 4\\d+
, and maps the error response to the error
attribute of the event.
http-service (Source)
The http-service source receives POST requests via HTTP and HTTPS protocols in format such as text
, XML
and JSON
and sends responses via its corresponding http-service-response sink correlated through a unique source.id
.
For request and response correlation, it generates a messageId
upon each incoming request and expose it via transport properties in the format trp:messageId
to correlate them with the responses at the http-service-response sink.
The request headers and properties can be accessed via transport properties in the format trp:<header>
.
It also supports basic authentication to ensure events are received from authorized users/systems.
@source(type="http-service", receiver.url="<STRING>", source.id="<STRING>", connection.timeout="<INT>", basic.auth.enabled="<STRING>", worker.count="<INT>", socket.idle.timeout="<INT>", ssl.verify.client="<STRING>", ssl.protocol="<STRING>", tls.store.type="<STRING>", ssl.configurations="<STRING>", request.size.validation.configurations="<STRING>", header.validation.configurations="<STRING>", server.bootstrap.configurations="<STRING>", trace.log.enabled="<BOOL>", @map(...)))
QUERY PARAMETERS
Name | Description | Default Value | Possible Data Types | Optional | Dynamic |
---|---|---|---|---|---|
receiver.url | The URL on which events should be received. To enable SSL use |
http://0.0.0.0:9763/<appNAme>/<streamName> |
STRING | Yes | No |
source.id | Identifier to correlate the http-service source to its corresponding http-service-response sinks to send responses. |
STRING | No | No | |
connection.timeout | Connection timeout in millis. The system will send a timeout, if a corresponding response is not sent by an associated http-service-response sink within the given time. |
120000 | INT | Yes | No |
basic.auth.enabled | This only works in VM, Docker and Kubernetes. |
false | STRING | Yes | No |
worker.count | The number of active worker threads to serve the incoming events. By default the value is set to |
1 | INT | Yes | No |
socket.idle.timeout | Idle timeout for HTTP connection in millis. |
120000 | INT | Yes | No |
ssl.verify.client | The type of client certificate verification. Supported values are |
- | STRING | Yes | No |
ssl.protocol | SSL/TLS protocol. |
TLS | STRING | Yes | No |
tls.store.type | TLS store type. |
JKS | STRING | Yes | No |
ssl.configurations | SSL/TSL configurations in format |
- | STRING | Yes | No |
request.size.validation.configurations | Configurations to validate the HTTP request size. |
- | STRING | Yes | No |
header.validation.configurations | Configurations to validate HTTP headers. |
- | STRING | Yes | No |
server.bootstrap.configurations | Server bootstrap configurations in format |
- | STRING | Yes | No |
trace.log.enabled | Enable trace log for traffic monitoring. |
false | BOOL | Yes | No |
System Parameters
Name | Description | Default Value | Possible Parameters |
---|---|---|---|
serverBootstrapBossGroupSize | Number of boss threads to accept incoming connections. |
Number of available processors | Any positive integer |
serverBootstrapWorkerGroupSize | Number of worker threads to accept the connections from boss threads and perform non-blocking read and write from one or more channels. |
(Number of available processors) * 2 | Any positive integer |
serverBootstrapClientGroupSize | Number of client threads to perform non-blocking read and write to one or more channels. |
(Number of available processors) * 2 | Any positive integer |
defaultHost | The default host of the transport. |
0.0.0.0 | Any valid host |
defaultScheme | The default protocol. |
http | http https |
defaultHttpPort | The default HTTP port when default scheme is |
8280 | Any valid port |
defaultHttpsPort | The default HTTPS port when default scheme is |
8243 | Any valid port |
keyStoreLocation | The default keystore file path. |
${carbon.home}/resources/security/wso2carbon.jks |
Path to .jks file |
keyStorePassword | The default keystore password. |
wso2carbon | Keystore password as string |
Examples EXAMPLE 1
@source(type='http-service', receiver.url='http://localhost:5005/add',
source.id='adder',
@map(type='json, @attributes(messageId='trp:messageId',
value1='$.event.value1',
value2='$.event.value2')))
define stream AddStream (messageId string, value1 long, value2 long);
@sink(type='http-service-response', source.id='adder',
message.id='{{messageId}}', @map(type = 'json'))
define stream ResultStream (messageId string, results long);
@info(name = 'query1')
from AddStream
select messageId, value1 + value2 as results
insert into ResultStream;
Above sample listens events on http://localhost:5005/stocks
url for JSON messages on the format:
{ "event": { "value1": 3, "value2": 4 } }
Map the vents into AddStream, process the events through query query1
, and sends the results produced on ResultStream via http-service-response sink on the message format:
{ "event": { "results": 7 } }
sse (Source)
HTTP SSE source send a request to a given url and listen to the response stream.
Syntax@source(type="sse", event.source.url="<STRING>", method="<STRING>", basic.auth.username="<STRING>", basic.auth.password="<STRING>", worker.count="<INT>", headers="<STRING>", https.truststore.file="<STRING>", https.truststore.password="<STRING>", client.bootstrap.configurations="<STRING>", @map(...)))
QUERY PARAMETERS
Name | Description | Default Value | Possible Data Types | Optional | Dynamic |
---|---|---|---|---|---|
event.source.url | The sse endpoint url which should be listened. |
STRING | No | No | |
method | The HTTP method used for calling the endpoint. |
GET | STRING | Yes | No |
basic.auth.username | The username to be included in the authentication header when calling endpoints protected by basic authentication. |
- | STRING | Yes | No |
basic.auth.password | The password to be included in the authentication header when calling endpoints protected by basic authentication. |
- | STRING | Yes | No |
worker.count | The number of active worker threads to serve the incoming events. By default the value is set to |
1 | INT | Yes | No |
headers | HTTP request headers in format |
Content-Type and Content-Length headers | STRING | Yes | No |
https.truststore.file | The file path of the client truststore when sending messages through |
${carbon.home}/resources/security/client-truststore.jks |
STRING | Yes | No |
https.truststore.password | The password for the client-truststore. |
wso2carbon | STRING | Yes | No |
client.bootstrap.configurations | Client bootstrap configurations in format |
- | STRING | Yes | No |
System Parameters
Name | Description | Default Value | Possible Parameters |
---|---|---|---|
defaultScheme | The default protocol. |
http | http https |
defaultHttpPort | The default HTTP port when default scheme is |
8280 | Any valid port |
defaultHost | The default host of the transport. |
0.0.0.0 | Any valid host |
Examples EXAMPLE 1
@Source(type='sse', event.source.url='http://localhost:8080/sse', @map(type='json')) define stream IncomingStream (param1 string);
This subscribes to the events which gets published by the SSE server at event.source.url