API Docs - v2.0.8
Sink
Tested Siddhi Core version: 5.0.0
It could also support other Siddhi Core minor versions.
http (Sink)
This extension publish the HTTP events in any HTTP method POST, GET, PUT, DELETE via HTTP or https protocols. As the additional features this component can provide basic authentication as well as user can publish events using custom client truststore files when publishing events via https protocol. And also user can add any number of headers including HTTP_METHOD header for each event dynamically.
Following content types will be set by default according to the type of sink mapper used.
You can override them by setting the new content types in headers.
- TEXT : text/plain
- XML : application/xml
- JSON : application/json
- KEYVALUE : application/x-www-form-urlencoded
Syntax
@sink(type="http", publisher.url="<STRING>", basic.auth.username="<STRING>", basic.auth.password="<STRING>", https.truststore.file="<STRING>", https.truststore.password="<STRING>", headers="<STRING>", method="<STRING>", socket.idle.timeout="<INT>", chunk.disabled="<BOOL>", ssl.protocol="<STRING>", parameters="<STRING>", ciphers="<STRING>", ssl.enabled.protocols="<STRING>", client.enable.session.creation="<STRING>", follow.redirect="<BOOL>", max.redirect.count="<INT>", tls.store.type="<STRING>", proxy.host="<STRING>", proxy.port="<STRING>", proxy.username="<STRING>", proxy.password="<STRING>", client.bootstrap.configuration="<STRING>", client.bootstrap.nodelay="<BOOL>", client.bootstrap.keepalive="<BOOL>", client.bootstrap.sendbuffersize="<INT>", client.bootstrap.recievebuffersize="<INT>", client.bootstrap.connect.timeout="<INT>", client.bootstrap.socket.reuse="<BOOL>", client.bootstrap.socket.timeout="<STRING>", connection.pool.count="<INT>", max.pool.active.connections="<INT>", min.pool.idle.connections="<INT>", max.pool.idle.connections="<INT>", min.evictable.idle.time="<STRING>", time.between.eviction.runs="<STRING>", max.wait.time="<STRING>", test.on.borrow="<BOOL>", test.while.idle="<BOOL>", exhausted.action="<INT>", oauth.username="<STRING>", oauth.password="<STRING>", consumer.key="<STRING>", consumer.secret="<STRING>", refresh.token="<STRING>", token.url="<STRING>", hostname.verification.enabled="<BOOL>", @map(...)))
QUERY PARAMETERS
Name | Description | Default Value | Possible Data Types | Optional | Dynamic |
---|---|---|---|---|---|
publisher.url | The URL to which the outgoing events should be published via HTTP. This is a mandatory parameter and if this is not specified, an error is logged in the CLI. If user wants to enable SSL for the events, use https instead of http in the publisher.url.e.g., http://localhost:8080/endpoint , https://localhost:8080/endpoint |
STRING | No | No | |
basic.auth.username | The username to be included in the authentication header of the basic authentication enabled events. It is required to specify both username and password to enable basic authentication. If one of the parameter is not given by user then an error is logged in the CLI. | STRING | Yes | No | |
basic.auth.password | The password to include in the authentication header of the basic authentication enabled events. It is required to specify both username and password to enable basic authentication. If one of the parameter is not given by user then an error is logged in the CLI. | STRING | Yes | No | |
https.truststore.file | The file path to the location of the truststore of the client that sends the HTTP events through 'https' protocol. A custom client-truststore can be specified if required. | ${carbon.home}/resources/security/client-truststore.jks | STRING | Yes | No |
https.truststore.password | The password for the client-truststore. A custom password can be specified if required. If no custom password is specified and the protocol of URL is 'https' then, the system uses default password. | wso2carbon | STRING | Yes | No |
headers | The headers that should be included as HTTP request headers. There can be any number of headers concatenated in following format. "'header1:value1','header2:value2'". User can include Content-Type header if he needs to use a specific content-type for the payload. Or else, system decides the Content-Type by considering the type of sink mapper, in following way. - @map(xml):application/xml - @map(json):application/json - @map(text):plain/text ) - if user does not include any mapping type then the system gets 'plain/text' as default Content-Type header. Note that providing content-length as a header is not supported. The size of the payload will be automatically calculated and included in the content-length header. |
STRING | Yes | No | |
method | For HTTP events, HTTP_METHOD header should be included as a request header. If the parameter is null then system uses 'POST' as a default header. | POST | STRING | Yes | No |
socket.idle.timeout | Socket timeout value in millisecond | 6000 | INT | Yes | No |
chunk.disabled | This parameter is used to disable/enable chunked transfer encoding | false | BOOL | Yes | No |
ssl.protocol | The SSL protocol version | TLS | STRING | Yes | No |
parameters | Parameters other than basics such as ciphers,sslEnabledProtocols,client.enable.session.creation. Expected format of these parameters is as follows: "'ciphers:xxx','sslEnabledProtocols,client.enable:xxx'" | null | STRING | Yes | No |
ciphers | List of ciphers to be used. This parameter should include under parameters Ex: 'ciphers:TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256' | null | STRING | Yes | No |
ssl.enabled.protocols | SSL/TLS protocols to be enabled. This parameter should be in camel case format(sslEnabledProtocols) under parameters. Ex 'sslEnabledProtocols:true' | null | STRING | Yes | No |
client.enable.session.creation | Enable HTTP session creation.This parameter should include under parameters Ex: 'client.enable.session.creation:true' | null | STRING | Yes | No |
follow.redirect | Redirect related enabled. | true | BOOL | Yes | No |
max.redirect.count | Maximum redirect count. | 5 | INT | Yes | No |
tls.store.type | TLS store type to be used. | JKS | STRING | Yes | No |
proxy.host | Proxy server host | null | STRING | Yes | No |
proxy.port | Proxy server port | null | STRING | Yes | No |
proxy.username | Proxy server username | null | STRING | Yes | No |
proxy.password | Proxy server password | null | STRING | Yes | No |
client.bootstrap.configuration | Client bootsrap configurations. Expected format of these parameters is as follows: "'client.bootstrap.nodelay:xxx','client.bootstrap.keepalive:xxx'" | TODO | STRING | Yes | No |
client.bootstrap.nodelay | This is mapped to TCP_NODELAY socket option which allows the network to bypass Nagle Delays by disabling Nagle's algorithm, and sending the data as soon as it's available . Setting this parameter to 'true' forces a socket to send the data in its buffer, whatever the packet size. |
true | BOOL | Yes | No |
client.bootstrap.keepalive | This parameter defines whether the tcp connection should remain open for multiple HTTP requests/responses. If this is set to 'false', HTTP connections will be closed after each request. | true | BOOL | Yes | No |
client.bootstrap.sendbuffersize | Http client send buffer size. | 1048576 | INT | Yes | No |
client.bootstrap.recievebuffersize | Http client receive buffer size. | 1048576 | INT | Yes | No |
client.bootstrap.connect.timeout | Http client connection timeout. | 15000 | INT | Yes | No |
client.bootstrap.socket.reuse | To enable http socket reuse. | false | BOOL | Yes | No |
client.bootstrap.socket.timeout | Http client socket timeout. | 15 | STRING | Yes | No |
connection.pool.count | Number of connection pools that need to be created for the particular client. | 0 | INT | Yes | No |
max.pool.active.connections | Maximum possible number of active connection per pool for the client. | -1 | INT | Yes | No |
min.pool.idle.connections | Minimum allowed number of idle connections that can be existed in a pool of the client. | 0 | INT | Yes | No |
max.pool.idle.connections | Maximum number of idle connections that can be existed in a pool of the client. | 100 | INT | Yes | No |
min.evictable.idle.time | Minimum amount of time (in milliseconds) a connection may sit idle in the pool before it is eligible for eviction. | 300000ms | STRING | Yes | No |
time.between.eviction.runs | Time between two eviction operations (in milliseconds) on the connection pool. | 30000ms | STRING | Yes | No |
max.wait.time | The maximum number of milliseconds that the pool will wait (when there are no available connections) for a connection to be returned. | 60000 | STRING | Yes | No |
test.on.borrow | The indication of whether objects will be validated before being borrowed from the pool. If the object validation is failed, it will be dropped from the pool, and will attempt to borrow another. | true | BOOL | Yes | No |
test.while.idle | The indication of whether objects will be validated by the idle object evictor (if any). If the object validation is failed, it will be dropped from the pool. | true | BOOL | Yes | No |
exhausted.action | Action which should be taken when the maximum number of active connections are being used. This action is indicated as an integer. Possible action are as following. 0 - Fail the request when pool is exhausted. 1 - Block the request when pool is exhausted, until a connection returns to the pool. 2 - Grow the connection pool size when it's exhausted. |
1 (Block when exhausted) | INT | Yes | No |
oauth.username | The username to be included in the authentication header of the oauth authentication enabled events. It is required to specify both username and password to enable oauth authentication. If one of the parameter is not given by user then an error is logged in the CLI. It is only applicable for for Oauth requests | NONE | STRING | Yes | No |
oauth.password | The password to be included in the authentication header of the oauth authentication enabled events. It is required to specify both username and password to enable oauth authentication. If one of the parameter is not given by user then an error is logged in the CLI. It is only applicable for for Oauth requests | NONE | STRING | Yes | No |
consumer.key | consumer key for the Http request. It is only applicable for for Oauth requests | NONE | STRING | Yes | No |
consumer.secret | consumer secret for the Http request. It is only applicable for for Oauth requests | NONE | STRING | Yes | No |
refresh.token | refresh token for the Http request. It is only applicable for for Oauth requests | STRING | Yes | No | |
token.url | token url for generate a new access token. It is only applicable for for Oauth requests | STRING | Yes | No | |
hostname.verification.enabled | To enable hostname verification | true | BOOL | Yes | No |
System Parameters
Name | Description | Default Value | Possible Parameters |
---|---|---|---|
clientBootstrapBossGroupSize | property to configure number of boss threads, which accepts incoming connections until the ports are unbound. Once connection accepts successfully, boss thread passes the accepted channel to one of the worker threads. | Number of available processors | Any integer |
clientBootstrapWorkerGroupSize | property to configure number of worker threads, which performs non blocking read and write for one or more channels in non-blocking mode. | (Number of available processors)2 | Any integer |
clientBootstrapClientGroupSize | property to configure number of client threads, which performs non blocking read and write for one or more channels in non-blocking mode. | (Number of available processors)2 | Any integer |
trustStoreLocation | The default truststore file path. | ${carbon.home}/resources/security/client-truststore.jks | Path to client-truststore.jks |
trustStorePassword | The default truststore password. | wso2carbon | Truststore password |
Examples EXAMPLE 1
@sink(type='http',publisher.url='http://localhost:8009/foo', method='{{method}}',headers="'content-type:xml','content-length:94'", client.bootstrap.configuration="'client.bootstrap.socket.timeout:20', 'client.bootstrap.worker.group.size:10'", client.pool.configuration="'client.connection.pool.count:10','client.max.active.connections.per.pool:1'", @map(type='xml', @payload('{{payloadBody}}')))
define stream FooStream (payloadBody String, method string, headers string);
If it is xml mapping expected input should be in following format for FooStream:
{
<events>
<event>
<symbol>WSO2</symbol>
<price>55.6</price>
<volume>100</volume>
</event>
</events>,
POST,
Content-Length:24#Content-Location:USA#Retry-After:120
}
Above event will generate output as below.
~Output http event payload
<events>
<event>
<symbol>WSO2</symbol>
<price>55.6</price>
<volume>100</volume>
</event>
</events>
~Output http event headers
Content-Length:24,
Content-Location:'USA',
Retry-After:120,
Content-Type:'application/xml',
HTTP_METHOD:'POST',
~Output http event properties
HTTP_METHOD:'POST',
HOST:'localhost',
PORT:8009,
PROTOCOL:'http',
TO:'/foo'
http-request (Sink)
This extension publish the HTTP events in any HTTP method POST, GET, PUT, DELETE via HTTP or https protocols. As the additional features this component can provide basic authentication as well as user can publish events using custom client truststore files when publishing events via https protocol. And also user can add any number of headers including HTTP_METHOD header for each event dynamically.
Following content types will be set by default according to the type of sink mapper used.
You can override them by setting the new content types in headers.
- TEXT : text/plain
- XML : application/xml
- JSON : application/json
- KEYVALUE : application/x-www-form-urlencoded
HTTP request sink is correlated with the The HTTP reponse source, through a unique sink.id
.It sends the request to the defined url and the response is received by the response source which has the same 'sink.id'.
Syntax
@sink(type="http-request", publisher.url="<STRING>", basic.auth.username="<STRING>", basic.auth.password="<STRING>", https.truststore.file="<STRING>", https.truststore.password="<STRING>", headers="<STRING>", method="<STRING>", socket.idle.timeout="<INT>", chunk.disabled="<BOOL>", ssl.protocol="<STRING>", parameters="<STRING>", ciphers="<STRING>", ssl.enabled.protocols="<STRING>", client.enable.session.creation="<STRING>", follow.redirect="<BOOL>", max.redirect.count="<INT>", tls.store.type="<STRING>", proxy.host="<STRING>", proxy.port="<STRING>", proxy.username="<STRING>", proxy.password="<STRING>", client.bootstrap.configuration="<STRING>", client.bootstrap.nodelay="<BOOL>", client.bootstrap.keepalive="<BOOL>", client.bootstrap.sendbuffersize="<INT>", client.bootstrap.recievebuffersize="<INT>", client.bootstrap.connect.timeout="<INT>", client.bootstrap.socket.reuse="<BOOL>", client.bootstrap.socket.timeout="<STRING>", client.threadpool.configurations="<STRING>", client.connection.pool.count="<INT>", client.max.active.connections.per.pool="<INT>", client.min.idle.connections.per.pool="<INT>", client.max.idle.connections.per.pool="<INT>", client.min.eviction.idle.time="<STRING>", sender.thread.count="<STRING>", event.group.executor.thread.size="<STRING>", max.wait.for.client.connection.pool="<STRING>", sink.id="<STRING>", downloading.enabled="<BOOL>", download.path="<STRING>", oauth.username="<STRING>", oauth.password="<STRING>", consumer.key="<STRING>", consumer.secret="<STRING>", refresh.token="<STRING>", blocking.io="<BOOL>", @map(...)))
QUERY PARAMETERS
Name | Description | Default Value | Possible Data Types | Optional | Dynamic |
---|---|---|---|---|---|
publisher.url | The URL to which the outgoing events should be published via HTTP. This is a mandatory parameter and if this is not specified, an error is logged in the CLI. If user wants to enable SSL for the events, use https instead of http in the publisher.url.e.g., http://localhost:8080/endpoint , https://localhost:8080/endpoint This can be used as a dynamic parameter as well. |
STRING | No | Yes | |
basic.auth.username | The username to be included in the authentication header of the basic authentication enabled events. It is required to specify both username and password to enable basic authentication. If one of the parameter is not given by user then an error is logged in the CLI. | STRING | Yes | No | |
basic.auth.password | The password to include in the authentication header of the basic authentication enabled events. It is required to specify both username and password to enable basic authentication. If one of the parameter is not given by user then an error is logged in the CLI. | STRING | Yes | No | |
https.truststore.file | The file path to the location of the truststore of the client that sends the HTTP events through 'https' protocol. A custom client-truststore can be specified if required. | ${carbon.home}/resources/security/client-truststore.jks | STRING | Yes | No |
https.truststore.password | The password for the client-truststore. A custom password can be specified if required. If no custom password is specified and the protocol of URL is 'https' then, the system uses default password. | wso2carbon | STRING | Yes | No |
headers | The headers that should be included as HTTP request headers. There can be any number of headers concatenated in following format. "'header1:value1','header2:value2'". User can include Content-Type header if he needs to use a specific content-type for the payload. Or else, system decides the Content-Type by considering the type of sink mapper, in following way. - @map(xml):application/xml - @map(json):application/json - @map(text):plain/text ) - if user does not include any mapping type then the system gets 'plain/text' as default Content-Type header. Note that providing content-length as a header is not supported. The size of the payload will be automatically calculated and included in the content-length header. |
STRING | Yes | No | |
method | For HTTP events, HTTP_METHOD header should be included as a request header. If the parameter is null then system uses 'POST' as a default header. | POST | STRING | Yes | No |
socket.idle.timeout | Socket timeout value in millisecond | 6000 | INT | Yes | No |
chunk.disabled | port: Port number of the remote service | false | BOOL | Yes | No |
ssl.protocol | The SSL protocol version | TLS | STRING | Yes | No |
parameters | Parameters other than basics such as ciphers,sslEnabledProtocols,client.enable.session.creation. Expected format of these parameters is as follows: "'ciphers:xxx','sslEnabledProtocols,client.enable:xxx'" | null | STRING | Yes | No |
ciphers | List of ciphers to be used. This parameter should include under parameters Ex: 'ciphers:TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256' | null | STRING | Yes | No |
ssl.enabled.protocols | SSL/TLS protocols to be enabled. This parameter should be in camel case format(sslEnabledProtocols) under parameters. Ex 'sslEnabledProtocols:true' | null | STRING | Yes | No |
client.enable.session.creation | Enable HTTP session creation.This parameter should include under parameters Ex: 'client.enable.session.creation:true' | null | STRING | Yes | No |
follow.redirect | Redirect related enabled. | true | BOOL | Yes | No |
max.redirect.count | Maximum redirect count. | 5 | INT | Yes | No |
tls.store.type | TLS store type to be used. | JKS | STRING | Yes | No |
proxy.host | Proxy server host | null | STRING | Yes | No |
proxy.port | Proxy server port | null | STRING | Yes | No |
proxy.username | Proxy server username | null | STRING | Yes | No |
proxy.password | Proxy server password | null | STRING | Yes | No |
client.bootstrap.configuration | Client bootsrap configurations. Expected format of these parameters is as follows: "'client.bootstrap.nodelay:xxx','client.bootstrap.keepalive:xxx'" | TODO | STRING | Yes | No |
client.bootstrap.nodelay | Http client no delay. | true | BOOL | Yes | No |
client.bootstrap.keepalive | Http client keep alive. | true | BOOL | Yes | No |
client.bootstrap.sendbuffersize | Http client send buffer size. | 1048576 | INT | Yes | No |
client.bootstrap.recievebuffersize | Http client receive buffer size. | 1048576 | INT | Yes | No |
client.bootstrap.connect.timeout | Http client connection timeout. | 15000 | INT | Yes | No |
client.bootstrap.socket.reuse | To enable http socket reuse. | false | BOOL | Yes | No |
client.bootstrap.socket.timeout | Http client socket timeout. | 15 | STRING | Yes | No |
client.threadpool.configurations | Thread pool configuration. Expected format of these parameters is as follows: "'client.connection.pool.count:xxx','client.max.active.connections.per.pool:xxx'" | TODO | STRING | Yes | No |
client.connection.pool.count | Connection pool count. | 0 | INT | Yes | No |
client.max.active.connections.per.pool | Active connections per pool. | -1 | INT | Yes | No |
client.min.idle.connections.per.pool | Minimum ideal connection per pool. | 0 | INT | Yes | No |
client.max.idle.connections.per.pool | Maximum ideal connection per pool. | 100 | INT | Yes | No |
client.min.eviction.idle.time | Minimum eviction idle time. | 5 * 60 * 1000 | STRING | Yes | No |
sender.thread.count | Http sender thread count. | 20 | STRING | Yes | No |
event.group.executor.thread.size | Event group executor thread size. | 15 | STRING | Yes | No |
max.wait.for.client.connection.pool | Maximum wait for client connection pool. | 60000 | STRING | Yes | No |
sink.id | Identifier of the sink. This is used to co-relate with the corresponding http-response source which needs to process the repose for the request sent by this sink. | STRING | No | No | |
downloading.enabled | If this is set to 'true' then the response received by the response source will be written to a file. If downloading is enabled, the download.path parameter is mandatory. | false | BOOL | Yes | No |
download.path | If downloading is enabled, the path of the file which is going to be downloaded should be specified using 'download.path' parameter. This should be an absolute path including the file name. | null | STRING | Yes | Yes |
oauth.username | The username to be included in the authentication header of the oauth authentication enabled events. It is required to specify both username and password to enable oauth authentication. If one of the parameter is not given by user then an error is logged in the CLI. It is only applicable for for Oauth requests | STRING | Yes | No | |
oauth.password | The password to be included in the authentication header of the oauth authentication enabled events. It is required to specify both username and password to enable oauth authentication. If one of the parameter is not given by user then an error is logged in the CLI. It is only applicable for for Oauth requests | STRING | Yes | No | |
consumer.key | consumer key for the Http request. It is only applicable for for Oauth requests | STRING | Yes | No | |
consumer.secret | consumer secret for the Http request. It is only applicable for for Oauth requests | STRING | Yes | No | |
refresh.token | refresh token for the Http request. It is only applicable for for Oauth requests | STRING | Yes | No | |
blocking.io | If this is set to 'true', after sending a request, http-request sink waits until it receives the response for that request, before sending any other request. | false | BOOL | Yes | No |
Examples EXAMPLE 1
@sink(type='http-request', sink.id='foo', publisher.url='http://localhost:8009/foo', @map(type='xml', @payload('{{payloadBody}}')))
define stream FooStream (payloadBody String, method string, headers string);
@source(type='http-response', sink.id='foo', http.status.code='2\\d+',
@map(type='text', regex.A='((.|\n)*)', @attributes(headers='trp:headers', fileName='A[1]')))
define stream responseStream2xx(fileName string, headers string);
@source(type='http-response', sink.id='foo', http.status.code='4\\d+',
@map(type='text', regex.A='((.|\n)*)', @attributes(errorMsg='A[1]')))
define stream responseStream4xx(errorMsg string);
In above example, the payload body for 'FooStream' will be in following format.
{
<events>
<event>
<symbol>WSO2</symbol>
<price>55.6</price>
<volume>100</volume>
</event>
</events>,
This message will sent as the body of a POST request with the content-type 'application/xml' to the endpoint defined as the 'publisher.url' and in order to process the responses for these requests, there should be a source of type 'http-response' defined with the same sink id 'foo' in the siddhi app.
The responses with 2xx status codes will be received by the http-response source which has the http.status.code defined by the regex '2\\d+'.
If the response has a 4xx status code, it will be received by the http-response source which has the http.status.code defined by the regex '4\\d+'.
EXAMPLE 2
define stream FooStream (name String, id int, headers String, downloadPath string);
@sink(type='http-request',
downloading.enabled='true',
download.path='{{downloadPath}}',publisher.url='http://localhost:8005/files',
method='GET', headers='{{headers}}',sink.id='download-sink',
@map(type='json'))
define stream BarStream (name String, id int, headers String, downloadPath string);
@source(type='http-response', sink.id='download-sink', http.status.code='2\\d+',
@map(type='text', regex.A='((.|\n)*)', @attributes(headers='trp:headers', fileName='A[1]')))
define stream responseStream2xx(fileName string, headers string);
@source(type='http-response', sink.id='download-sink', http.status.code='4\\d+',
@map(type='text', regex.A='((.|\n)*)', @attributes(errorMsg='A[1]')))
define stream responseStream4xx(errorMsg string);
In above example, http-request sink will send a GET request to the publisher url and the requested file will be received as the response by a corresponding http-response source.
If the http status code of the response is a successful one (2xx), it will be received by the http-response source which has the http.status.code '2\\d+' and downloaded as a local file. Then the event received to the responseStream2xx will have the headers included in the request and the downloaded file name.
If the http status code of the response is a 4xx code, it will be received by the http-response source which has the http.status.code '4\\d+'. Then the event received to the responseStream4xx will have the response message body in text format.
http-response (Sink)
HTTP response sink is correlated with the The HTTP request source, through a unique source.id
, and it send a response to the HTTP request source having the same source.id
. The response message can be formatted in text
, XML
or JSON
and can be sent with appropriate headers.
Syntax
@sink(type="http-response", source.id="<STRING>", message.id="<STRING>", headers="<STRING>", @map(...)))
QUERY PARAMETERS
Name | Description | Default Value | Possible Data Types | Optional | Dynamic |
---|---|---|---|---|---|
source.id | Identifier of the source. | STRING | No | No | |
message.id | Identifier of the message. | STRING | No | Yes | |
headers | The headers that should be included as HTTP response headers. There can be any number of headers concatenated on following format. "'header1:value1','header2:value2'" User can include content-type header if he/she need to have any specific type for payload. If not system get the mapping type as the content-Type header (ie.@map(xml) :application/xml , @map(json) :application/json , @map(text) :plain/text ) and if user does not include any mapping type then system gets the plain/text as default Content-Type header. If user does not include Content-Length header then system calculate the bytes size of payload and include it as content-length header. |
STRING | Yes | No |
Examples EXAMPLE 1
@sink(type='http-response', source.id='sampleSourceId', message.id='{{messageId}}', headers="'content-type:json','content-length:94'"@map(type='json', @payload('{{payloadBody}}')))
define stream FooStream (payloadBody String, messageId string, headers string);
If it is json mapping expected input should be in following format for FooStream:
{
{"events":
{"event":
"symbol":WSO2,
"price":55.6,
"volume":100,
}
},
0cf708b1-7eae-440b-a93e-e72f801b486a,
Content-Length:24#Content-Location:USA
}
Above event will generate response for the matching source message as below.
~Output http event payload
{"events":
{"event":
"symbol":WSO2,
"price":55.6,
"volume":100,
}
}
~Output http event headers
Content-Length:24,
Content-Location:'USA',
Content-Type:'application/json'
Source
http (Source)
The HTTP source receives POST requests via HTTP or HTTPS in format such as text
, XML
and JSON
. In WSO2 SP, if required, you can enable basic authentication to ensure that events are received only from users who are authorized to access the service.
Syntax
@source(type="http", receiver.url="<STRING>", basic.auth.enabled="<STRING>", worker.count="<INT>", socket.idle.timeout="<INT>", ssl.verify.client="<STRING>", ssl.protocol="<STRING>", tls.store.type="<STRING>", parameters="<STRING>", ciphers="<STRING>", ssl.enabled.protocols="<STRING>", server.enable.session.creation="<STRING>", server.supported.snimatchers="<STRING>", server.suported.server.names="<STRING>", request.size.validation.configuration="<STRING>", request.size.validation="<STRING>", request.size.validation.maximum.value="<STRING>", request.size.validation.reject.status.code="<STRING>", request.size.validation.reject.message="<STRING>", request.size.validation.reject.message.content.type="<STRING>", header.size.validation="<STRING>", header.validation.maximum.request.line="<STRING>", header.validation.maximum.size="<STRING>", header.validation.maximum.chunk.size="<STRING>", header.validation.reject.status.code="<STRING>", header.validation.reject.message="<STRING>", header.validation.reject.message.content.type="<STRING>", server.bootstrap.configuration="<OBJECT>", server.bootstrap.nodelay="<BOOL>", server.bootstrap.keepalive="<BOOL>", server.bootstrap.sendbuffersize="<INT>", server.bootstrap.recievebuffersize="<INT>", server.bootstrap.connect.timeout="<INT>", server.bootstrap.socket.reuse="<BOOL>", server.bootstrap.socket.timeout="<BOOL>", server.bootstrap.socket.backlog="<BOOL>", trace.log.enabled="<BOOL>", @map(...)))
QUERY PARAMETERS
Name | Description | Default Value | Possible Data Types | Optional | Dynamic |
---|---|---|---|---|---|
receiver.url | The URL to which the events should be received. User can provide any valid url and if the url is not provided the system will use the following format http://0.0.0.0:9763/<appNAme>/<streamName> If the user want to use SSL the url should be given in following format https://localhost:8080/<streamName> |
http://0.0.0.0:9763/ |
STRING | Yes | No |
basic.auth.enabled | This works only in WSO2 SP. If this is set to true , basic authentication is enabled for incoming events, and the credentials with which each event is sent are verified to ensure that the user is authorized to access the service. If basic authentication fails, the event is not authenticated and an authentication error is logged in the CLI. |
false | STRING | Yes | No |
worker.count | The number of active worker threads to serve the incoming events. The value is 1 by default. This will ensure that the events are directed to the event stream in the same order in which they arrive. By increasing this value the performance might increase at the cost of loosing event ordering. | 1 | INT | Yes | No |
socket.idle.timeout | Idle timeout for HTTP connection. | 120000 | INT | Yes | No |
ssl.verify.client | The type of client certificate verification. | null | STRING | Yes | No |
ssl.protocol | ssl/tls related options | TLS | STRING | Yes | No |
tls.store.type | TLS store type. | JKS | STRING | Yes | No |
parameters | Parameters other than basics such as ciphers,sslEnabledProtocols,client.enable.session.creation. Expected format of these parameters is as follows: "'ciphers:xxx','sslEnabledProtocols,client.enable:xxx'" | null | STRING | Yes | No |
ciphers | List of ciphers to be used. This parameter should include under parameters Ex: 'ciphers:TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256' | null | STRING | Yes | No |
ssl.enabled.protocols | SSL/TLS protocols to be enabled. This parameter should be in camel case format(sslEnabledProtocols) under parameters. Ex 'sslEnabledProtocols:true' | null | STRING | Yes | No |
server.enable.session.creation | Enable HTTP session creation.This parameter should include under parameters Ex: 'client.enable.session.creation:true' | null | STRING | Yes | No |
server.supported.snimatchers | Http SNIMatcher to be added. This parameter should include under parameters Ex: 'server.supported.snimatchers:SNIMatcher' | null | STRING | Yes | No |
server.suported.server.names | Http supported servers. This parameter should include under parameters Ex: 'server.suported.server.names:server' | null | STRING | Yes | No |
request.size.validation.configuration | Parameters that responsible for validating the http request and request headers. Expected format of these parameters is as follows: "'request.size.validation:xxx','request.size.validation.maximum.value:xxx'" | null | STRING | Yes | No |
request.size.validation | To enable the request size validation. | false | STRING | Yes | No |
request.size.validation.maximum.value | If request size is validated then maximum size. | Integer.MAX_VALUE | STRING | Yes | No |
request.size.validation.reject.status.code | If request is exceed maximum size and request.size.validation is enabled then status code to be send as response. | 401 | STRING | Yes | No |
request.size.validation.reject.message | If request is exceed maximum size and request.size.validation is enabled then status message to be send as response. | Message is bigger than the valid size | STRING | Yes | No |
request.size.validation.reject.message.content.type | If request is exceed maximum size and request.size.validation is enabled then content type to be send as response. | plain/text | STRING | Yes | No |
header.size.validation | To enable the header size validation. | false | STRING | Yes | No |
header.validation.maximum.request.line | If header header validation is enabled then the maximum request line. | 4096 | STRING | Yes | No |
header.validation.maximum.size | If header header validation is enabled then the maximum expected header size. | 8192 | STRING | Yes | No |
header.validation.maximum.chunk.size | If header header validation is enabled then the maximum expected chunk size. | 8192 | STRING | Yes | No |
header.validation.reject.status.code | 401 | If header is exceed maximum size and header.size.validation is enabled then status code to be send as response. | STRING | Yes | No |
header.validation.reject.message | If header is exceed maximum size and header.size.validation is enabled then message to be send as response. | Message header is bigger than the valid size | STRING | Yes | No |
header.validation.reject.message.content.type | If header is exceed maximum size and header.size.validation is enabled then content type to be send as response. | plain/text | STRING | Yes | No |
server.bootstrap.configuration | Parameters that for bootstrap configurations of the server. Expected format of these parameters is as follows: "'ciphers:xxx','sslEnabledProtocols,client.enable:xxx'" | null | OBJECT | Yes | No |
server.bootstrap.nodelay | Http server no delay. | true | BOOL | Yes | No |
server.bootstrap.keepalive | Http server keep alive. | true | BOOL | Yes | No |
server.bootstrap.sendbuffersize | Http server send buffer size. | 1048576 | INT | Yes | No |
server.bootstrap.recievebuffersize | Http server receive buffer size. | 1048576 | INT | Yes | No |
server.bootstrap.connect.timeout | Http server connection timeout. | 15000 | INT | Yes | No |
server.bootstrap.socket.reuse | To enable http socket reuse. | false | BOOL | Yes | No |
server.bootstrap.socket.timeout | Http server socket timeout. | 15 | BOOL | Yes | No |
server.bootstrap.socket.backlog | THttp server socket backlog. | 100 | BOOL | Yes | No |
trace.log.enabled | Http traffic monitoring. | false | BOOL | Yes | No |
System Parameters
Name | Description | Default Value | Possible Parameters |
---|---|---|---|
serverBootstrapBossGroupSize | property to configure number of boss threads, which accepts incoming connections until the ports are unbound. Once connection accepts successfully, boss thread passes the accepted channel to one of the worker threads. | Number of available processors | Any integer |
serverBootstrapWorkerGroupSize | property to configure number of worker threads, which performs non blocking read and write for one or more channels in non-blocking mode. | (Number of available processors)2 | Any integer |
serverBootstrapClientGroupSize | property to configure number of client threads, which performs non blocking read and write for one or more channels in non-blocking mode. | (Number of available processors)2 | Any integer |
defaultHost | The default host of the transport. | 0.0.0.0 | Any valid host |
defaultHttpPort | The default port if the default scheme is 'http'. | 8280 | Any valid port |
defaultHttpsPort | The default port if the default scheme is 'https'. | 8243 | Any valid port |
defaultScheme | The default protocol. | http | http https |
keyStoreLocation | The default keystore file path. | ${carbon.home}/resources/security/wso2carbon.jks | Path to wso2carbon.jks file |
keyStorePassword | The default keystore password. | wso2carbon | String of keystore password |
Examples EXAMPLE 1
@source(type='http', receiver.url='http://localhost:9055/endpoints/RecPro', socketIdleTimeout='150000', parameters="'ciphers : TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256', 'sslEnabledProtocols:TLSv1.1,TLSv1.2'",request.size.validation.configuration="request.size.validation:true",server.bootstrap.configuration="server.bootstrap.socket.timeout:25" @map(type='xml'))
define stream FooStream (symbol string, price float, volume long);
Above source listenerConfiguration performs a default XML input mapping. The expected input is as follows:
<events>
<event>
<symbol>WSO2</symbol>
<price>55.6</price>
<volume>100</volume>
</event>
</events>
If basic authentication is enabled via the basic.auth.enabled='true
setting, each input event is also expected to contain the Authorization:'Basic encodeBase64(username:Password)'
header.
http-request (Source)
The HTTP request is correlated with the HTTP response sink, through a unique source.id
, and for each POST requests it receives via HTTP or HTTPS in format such as text
, XML
and JSON
it sends the response via the HTTP response sink. The individual request and response messages are correlated at the sink using the message.id
of the events. If required, you can enable basic authentication at the source to ensure that events are received only from users who are authorized to access the service.
Syntax
@source(type="http-request", receiver.url="<STRING>", source.id="<STRING>", connection.timeout="<INT>", basic.auth.enabled="<STRING>", worker.count="<INT>", socket.idle.timeout="<INT>", ssl.verify.client="<STRING>", ssl.protocol="<STRING>", tls.store.type="<STRING>", parameters="<STRING>", ciphers="<STRING>", ssl.enabled.protocols="<STRING>", server.enable.session.creation="<STRING>", server.supported.snimatchers="<STRING>", server.suported.server.names="<STRING>", request.size.validation.configuration="<STRING>", request.size.validation="<STRING>", request.size.validation.maximum.value="<STRING>", request.size.validation.reject.status.code="<STRING>", request.size.validation.reject.message="<STRING>", request.size.validation.reject.message.content.type="<STRING>", header.size.validation="<STRING>", header.validation.maximum.request.line="<STRING>", header.validation.maximum.size="<STRING>", header.validation.maximum.chunk.size="<STRING>", header.validation.reject.status.code="<STRING>", header.validation.reject.message="<STRING>", header.validation.reject.message.content.type="<STRING>", server.bootstrap.configuration="<OBJECT>", server.bootstrap.nodelay="<BOOL>", server.bootstrap.keepalive="<BOOL>", server.bootstrap.sendbuffersize="<INT>", server.bootstrap.recievebuffersize="<INT>", server.bootstrap.connect.timeout="<INT>", server.bootstrap.socket.reuse="<BOOL>", server.bootstrap.socket.timeout="<BOOL>", server.bootstrap.socket.backlog="<BOOL>", trace.log.enabled="<BOOL>", @map(...)))
QUERY PARAMETERS
Name | Description | Default Value | Possible Data Types | Optional | Dynamic |
---|---|---|---|---|---|
receiver.url | The URL to which the events should be received. User can provide any valid url and if the url is not provided the system will use the following format http://0.0.0.0:9763/<appNAme>/<streamName> If the user want to use SSL the url should be given in following format https://localhost:8080/<streamName> |
http://0.0.0.0:9763/ |
STRING | Yes | No |
source.id | Identifier need to map the source to sink. | STRING | No | No | |
connection.timeout | Connection timeout in milliseconds. If the mapped http-response sink does not get a correlated message, after this timeout value, a timeout response is sent | 120000 | INT | Yes | No |
basic.auth.enabled | If this is set to true , basic authentication is enabled for incoming events, and the credentials with which each event is sent are verified to ensure that the user is authorized to access the service. If basic authentication fails, the event is not authenticated and an authentication error is logged in the CLI. By default this values 'false' |
false | STRING | Yes | No |
worker.count | The number of active worker threads to serve the incoming events. The value is 1 by default. This will ensure that the events are directed to the event stream in the same order in which they arrive. By increasing this value the performance might increase at the cost of loosing event ordering. | 1 | INT | Yes | No |
socket.idle.timeout | Idle timeout for HTTP connection. | 120000 | INT | Yes | No |
ssl.verify.client | The type of client certificate verification. | null | STRING | Yes | No |
ssl.protocol | ssl/tls related options | TLS | STRING | Yes | No |
tls.store.type | TLS store type. | JKS | STRING | Yes | No |
parameters | Parameters other than basics such as ciphers,sslEnabledProtocols,client.enable.session.creation. Expected format of these parameters is as follows: "'ciphers:xxx','sslEnabledProtocols,client.enable:xxx'" | null | STRING | Yes | No |
ciphers | List of ciphers to be used. This parameter should include under parameters Ex: 'ciphers:TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256' | null | STRING | Yes | No |
ssl.enabled.protocols | SSL/TLS protocols to be enabled. This parameter should be in camel case format(sslEnabledProtocols) under parameters. Ex 'sslEnabledProtocols:true' | null | STRING | Yes | No |
server.enable.session.creation | Enable HTTP session creation.This parameter should include under parameters Ex: 'client.enable.session.creation:true' | null | STRING | Yes | No |
server.supported.snimatchers | Http SNIMatcher to be added. This parameter should include under parameters Ex: 'server.supported.snimatchers:SNIMatcher' | null | STRING | Yes | No |
server.suported.server.names | Http supported servers. This parameter should include under parameters Ex: 'server.suported.server.names:server' | null | STRING | Yes | No |
request.size.validation.configuration | Parameters that responsible for validating the http request and request headers. Expected format of these parameters is as follows: "'request.size.validation:xxx','request.size.validation.maximum.value:xxx'" | null | STRING | Yes | No |
request.size.validation | To enable the request size validation. | false | STRING | Yes | No |
request.size.validation.maximum.value | If request size is validated then maximum size. | Integer.MAX_VALUE | STRING | Yes | No |
request.size.validation.reject.status.code | If request is exceed maximum size and request.size.validation is enabled then status code to be send as response. | 401 | STRING | Yes | No |
request.size.validation.reject.message | If request is exceed maximum size and request.size.validation is enabled then status message to be send as response. | Message is bigger than the valid size | STRING | Yes | No |
request.size.validation.reject.message.content.type | If request is exceed maximum size and request.size.validation is enabled then content type to be send as response. | plain/text | STRING | Yes | No |
header.size.validation | To enable the header size validation. | false | STRING | Yes | No |
header.validation.maximum.request.line | If header header validation is enabled then the maximum request line. | 4096 | STRING | Yes | No |
header.validation.maximum.size | If header header validation is enabled then the maximum expected header size. | 8192 | STRING | Yes | No |
header.validation.maximum.chunk.size | If header header validation is enabled then the maximum expected chunk size. | 8192 | STRING | Yes | No |
header.validation.reject.status.code | 401 | If header is exceed maximum size and header.size.validation is enabled then status code to be send as response. | STRING | Yes | No |
header.validation.reject.message | If header is exceed maximum size and header.size.validation is enabled then message to be send as response. | Message header is bigger than the valid size | STRING | Yes | No |
header.validation.reject.message.content.type | If header is exceed maximum size and header.size.validation is enabled then content type to be send as response. | plain/text | STRING | Yes | No |
server.bootstrap.configuration | Parameters that for bootstrap configurations of the server. Expected format of these parameters is as follows: "'ciphers:xxx','sslEnabledProtocols,client.enable:xxx'" | null | OBJECT | Yes | No |
server.bootstrap.nodelay | Http server no delay. | true | BOOL | Yes | No |
server.bootstrap.keepalive | Http server keep alive. | true | BOOL | Yes | No |
server.bootstrap.sendbuffersize | Http server send buffer size. | 1048576 | INT | Yes | No |
server.bootstrap.recievebuffersize | Http server receive buffer size. | 1048576 | INT | Yes | No |
server.bootstrap.connect.timeout | Http server connection timeout. | 15000 | INT | Yes | No |
server.bootstrap.socket.reuse | To enable http socket reuse. | false | BOOL | Yes | No |
server.bootstrap.socket.timeout | Http server socket timeout. | 15 | BOOL | Yes | No |
server.bootstrap.socket.backlog | THttp server socket backlog. | 100 | BOOL | Yes | No |
trace.log.enabled | Http traffic monitoring. | false | BOOL | Yes | No |
System Parameters
Name | Description | Default Value | Possible Parameters |
---|---|---|---|
serverBootstrapBossGroupSize | property to configure number of boss threads, which accepts incoming connections until the ports are unbound. Once connection accepts successfully, boss thread passes the accepted channel to one of the worker threads. | Number of available processors | Any integer |
serverBootstrapWorkerGroupSize | property to configure number of worker threads, which performs non blocking read and write for one or more channels in non-blocking mode. | (Number of available processors)2 | Any integer |
serverBootstrapClientGroupSize | property to configure number of client threads, which performs non blocking read and write for one or more channels in non-blocking mode. | (Number of available processors)2 | Any integer |
defaultHost | The default host of the transport. | 0.0.0.0 | Any valid host |
defaultHttpPort | The default port if the default scheme is 'http'. | 8280 | Any valid port |
defaultHttpsPort | The default port if the default scheme is 'https'. | 8243 | Any valid port |
defaultScheme | The default protocol. | http | http https |
keyStoreLocation | The default keystore file path. | ${carbon.home}/resources/security/wso2carbon.jks | Path to wso2carbon.jks file |
keyStorePassword | The default keystore password. | wso2carbon | String of keystore password |
certPassword | The default cert password. | wso2carbon | String of cert password |
Examples EXAMPLE 1
@source(type='http-request', source.id='sampleSourceId, receiver.url='http://localhost:9055/endpoints/RecPro', connection.timeout='150000', parameters="'ciphers : TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256', 'sslEnabledProtocols:TLSv1.1,TLSv1.2'", request.size.validation.configuration="request.size.validation:true", server.bootstrap.configuration="server.bootstrap.socket.timeout:25", @map(type='json, @attributes(messageId='trp:messageId', symbol='$.events.event.symbol', price='$.events.event.price', volume='$.events.event.volume')))
define stream FooStream (messageId string, symbol string, price float, volume long);
The expected input is as follows:
{"events":
{"event":
"symbol":WSO2,
"price":55.6,
"volume":100,
}
}
If basic authentication is enabled via the basic.auth.enabled='true
setting, each input event is also expected to contain the Authorization:'Basic encodeBase64(username:Password)'
header.
http-response (Source)
The http-response source co-relates with http-request sink with the parameter 'sink.id'.
This receives responses for the requests sent by the http-request sink which has the same sink id.
Response messages can be in formats such as TEXT, JSON and XML.
In order to handle the responses with different http status codes, user is allowed to defined the acceptable response source code using the parameter 'http.status.code'
Syntax
@source(type="http-response", sink.id="<STRING>", http.status.code="<STRING>", allow.streaming.responses="<BOOL>", @map(...)))
QUERY PARAMETERS
Name | Description | Default Value | Possible Data Types | Optional | Dynamic |
---|---|---|---|---|---|
sink.id | This parameter is used to map the http-response source to a http-request sink. Then this source will accepts the response messages for the requests sent by corresponding http-request sink. | STRING | No | No | |
http.status.code | Acceptable http status code for the responses. This can be a complete string or a regex. Only the responses with matching status codes to the defined value, will be received by the http-response source. Eg: 'http.status.code = '200', http.status.code = '2\d+'' |
200 | STRING | Yes | No |
allow.streaming.responses | If responses can be received multiple times for a single request, this option should be enabled. If this is not enabled, for every request, response will be extracted only once. | false | BOOL | Yes | No |
Examples EXAMPLE 1
@sink(type='http-request',
downloading.enabled='true',
publisher.url='http://localhost:8005/registry/employee',
method='POST', headers='{{headers}}',sink.id='employee-info',
@map(type='json'))
define stream BarStream (name String, id int, headers String, downloadPath string);
@source(type='http-response' , sink.id='employee-info', http.status.code='2\\d+',
@map(type='text', regex.A='((.|\n)*)', @attributes(message='A[1]')))
define stream responseStream2xx(message string);@source(type='http-response' , sink.id='employee-info', http.status.code='4\\d+' ,
@map(type='text', regex.A='((.|\n)*)', @attributes(message='A[1]')))
define stream responseStream4xx(message string);
In above example, the defined http-request sink will send a POST requests to the endpoint defined by 'publisher.url'.
Then for those requests, the source with the response code '2\\d+' and sink.id 'employee-info' will receive the responses with 2xx status codes.
The http-response source which has 'employee-info' as the 'sink.id' and '4\\d+' as the http.response.code will receive all the responses with 4xx status codes.
. Then the body of the response message will be extracted using text mapper and converted into siddhi events.
.