Skip to content

API Docs - v2.2.4

Tested Siddhi Core version: 5.1.13

It could also support other Siddhi Core minor versions.

Sink

http (Sink)

HTTP sink publishes messages via HTTP or HTTPS protocols using methods such as POST, GET, PUT, and DELETE on formats text, XML and JSON. It can also publish to endpoints protected by basic authentication or OAuth 2.0.

Syntax

@sink(type="http", publisher.url="<STRING>", basic.auth.username="<STRING>", basic.auth.password="<STRING>", https.truststore.file="<STRING>", https.truststore.password="<STRING>", oauth.username="<STRING>", oauth.password="<STRING>", consumer.key="<STRING>", consumer.secret="<STRING>", token.url="<STRING>", refresh.token="<STRING>", headers="<STRING>", method="<STRING>", socket.idle.timeout="<INT>", chunk.disabled="<BOOL>", ssl.protocol="<STRING>", ssl.verification.disabled="<BOOL>", tls.store.type="<STRING>", ssl.configurations="<STRING>", proxy.host="<STRING>", proxy.port="<STRING>", proxy.username="<STRING>", proxy.password="<STRING>", client.bootstrap.configurations="<STRING>", max.pool.active.connections="<INT>", min.pool.idle.connections="<INT>", max.pool.idle.connections="<INT>", min.evictable.idle.time="<STRING>", time.between.eviction.runs="<STRING>", max.wait.time="<STRING>", test.on.borrow="<BOOL>", test.while.idle="<BOOL>", exhausted.action="<INT>", hostname.verification.enabled="<BOOL>", @map(...)))

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
publisher.url

The URL to which the outgoing events should be published.
Examples:
http://localhost:8080/endpoint,
https://localhost:8080/endpoint

STRING No No
basic.auth.username

The username to be included in the authentication header when calling endpoints protected by basic authentication. basic.auth.password property should be also set when using this property.

- STRING Yes No
basic.auth.password

The password to be included in the authentication header when calling endpoints protected by basic authentication. basic.auth.username property should be also set when using this property.

- STRING Yes No
https.truststore.file

The file path of the client truststore when sending messages through https protocol.

${carbon.home}/resources/security/client-truststore.jks STRING Yes No
https.truststore.password

The password for the client-truststore.

wso2carbon STRING Yes No
oauth.username

The username to be included in the authentication header when calling endpoints protected by OAuth 2.0. oauth.password property should be also set when using this property.

- STRING Yes No
oauth.password

The password to be included in the authentication header when calling endpoints protected by OAuth 2.0. oauth.username property should be also set when using this property.

- STRING Yes No
consumer.key

Consumer key used for calling endpoints protected by OAuth 2.0

- STRING Yes No
consumer.secret

Consumer secret used for calling endpoints protected by OAuth 2.0

- STRING Yes No
token.url

Token URL to generate a new access tokens when calling endpoints protected by OAuth 2.0

- STRING Yes No
refresh.token

Refresh token used for generating new access tokens when calling endpoints protected by OAuth 2.0

- STRING Yes No
headers

HTTP request headers in format "'<key>:<value>','<key>:<value>'".
When Content-Type header is not provided the system derives the Content-Type based on the provided sink mapper as following:
 - @map(type='xml'): application/xml
 - @map(type='json'): application/json
 - @map(type='text'): plain/text
 - @map(type='keyvalue'): application/x-www-form-urlencoded
 - For all other cases system defaults to plain/text
Also the Content-Length header need not to be provided, as the system automatically defines it by calculating the size of the payload.

Content-Type and Content-Length headers STRING Yes No
method

The HTTP method used for calling the endpoint.

POST STRING Yes No
socket.idle.timeout

Socket timeout in millis.

6000 INT Yes No
chunk.disabled

Disable chunked transfer encoding.

false BOOL Yes No
ssl.protocol

SSL/TLS protocol.

TLS STRING Yes No
ssl.verification.disabled

Disable SSL verification.

false BOOL Yes No
tls.store.type

TLS store type.

JKS STRING Yes No
ssl.configurations

SSL/TSL configurations in format "'<key>:<value>','<key>:<value>'".
Some supported parameters:
 - SSL/TLS protocols: 'sslEnabledProtocols:TLSv1.1,TLSv1.2'
 - List of ciphers: 'ciphers:TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256'
 - Enable session creation: 'client.enable.session.creation:true'
 - Supported server names: 'server.suported.server.names:server'
 - Add HTTP SNIMatcher: 'server.supported.snimatchers:SNIMatcher'

- STRING Yes No
proxy.host

Proxy server host

- STRING Yes No
proxy.port

Proxy server port

- STRING Yes No
proxy.username

Proxy server username

- STRING Yes No
proxy.password

Proxy server password

- STRING Yes No
client.bootstrap.configurations

Client bootstrap configurations in format "'<key>:<value>','<key>:<value>'".
Some supported configurations :
 - Client connect timeout in millis: 'client.bootstrap.connect.timeout:15000'
 - Client socket timeout in seconds: 'client.bootstrap.socket.timeout:15'
 - Client socket reuse: 'client.bootstrap.socket.reuse:true'
 - Enable TCP no delay: 'client.bootstrap.nodelay:true'
 - Enable client keep alive: 'client.bootstrap.keepalive:true'
 - Send buffer size: 'client.bootstrap.sendbuffersize:1048576'
 - Receive buffer size: 'client.bootstrap.recievebuffersize:1048576'

- STRING Yes No
max.pool.active.connections

Maximum possible number of active connection per client pool.

-1 INT Yes No
min.pool.idle.connections

Minimum number of idle connections that can exist per client pool.

0 INT Yes No
max.pool.idle.connections

Maximum number of idle connections that can exist per client pool.

100 INT Yes No
min.evictable.idle.time

Minimum time (in millis) a connection may sit idle in the client pool before it become eligible for eviction.

300000 STRING Yes No
time.between.eviction.runs

Time between two eviction operations (in millis) on the client pool.

30000 STRING Yes No
max.wait.time

The maximum time (in millis) the pool will wait (when there are no available connections) for a connection to be returned to the pool.

60000 STRING Yes No
test.on.borrow

Enable connections to be validated before being borrowed from the client pool.

true BOOL Yes No
test.while.idle

Enable connections to be validated during the eviction operation (if any).

true BOOL Yes No
exhausted.action

Action that should be taken when the maximum number of active connections are being used. This action should be indicated as an int and possible action values are following.
0 - Fail the request.
1 - Block the request, until a connection returns to the pool.
2 - Grow the connection pool size.

1 (Block when exhausted) INT Yes No
hostname.verification.enabled

Enable hostname verification.

true BOOL Yes No

System Parameters

Name Description Default Value Possible Parameters
clientBootstrapClientGroupSize

Number of client threads to perform non-blocking read and write to one or more channels.

(Number of available processors) * 2 Any positive integer
clientBootstrapBossGroupSize

Number of boss threads to accept incoming connections.

Number of available processors Any positive integer
clientBootstrapWorkerGroupSize

Number of worker threads to accept the connections from boss threads and perform non-blocking read and write from one or more channels.

(Number of available processors) * 2 Any positive integer
trustStoreLocation

The default truststore file path.

${carbon.home}/resources/security/client-truststore.jks Path to client truststore .jks file
trustStorePassword

The default truststore password.

wso2carbon Truststore password as string

Examples EXAMPLE 1

@sink(type = 'http', publisher.url = 'http://stocks.com/stocks',
      @map(type = 'json'))
define stream StockStream (symbol string, price float, volume long);

Events arriving on the StockStream will be published to the HTTP endpoint http://stocks.com/stocks using POST method with Content-Type application/json by converting those events to the default JSON format as following:

{
  "event": {
    "symbol": "FB",
    "price": 24.5,
    "volume": 5000
  }
}

EXAMPLE 2

@sink(type='http', publisher.url = 'http://localhost:8009/foo',
      client.bootstrap.configurations = "'client.bootstrap.socket.timeout:20'",
      max.pool.active.connections = '1', headers = "{{headers}}",
      @map(type='xml', @payload("""<stock>
{{payloadBody}}
</stock>""")))
define stream FooStream (payloadBody String, headers string);

Events arriving on FooStream will be published to the HTTP endpoint http://localhost:8009/foo using POST method with Content-Type application/xml and setting payloadBody and header attribute values.
If the payloadBody contains

<symbol>WSO2</symbol>
<price>55.6</price>
<volume>100</volume>

and header contains 'topic:foobar' values, then the system will generate an output with the body:

<stock>
<symbol>WSO2</symbol>
<price>55.6</price>
<volume>100</volume>
</stock>

and HTTP headers:
Content-Length:xxx,
Content-Location:'xxx',
Content-Type:'application/xml',
HTTP_METHOD:'POST'

http-call (Sink)

The http-call sink publishes messages to endpoints via HTTP or HTTPS protocols using methods such as POST, GET, PUT, and DELETE on formats text, XML or JSON and consume responses through its corresponding http-call-response source. It also supports calling endpoints protected with basic authentication or OAuth 2.0.

Syntax

@sink(type="http-call", publisher.url="<STRING>", sink.id="<STRING>", basic.auth.username="<STRING>", basic.auth.password="<STRING>", https.truststore.file="<STRING>", https.truststore.password="<STRING>", oauth.username="<STRING>", oauth.password="<STRING>", consumer.key="<STRING>", consumer.secret="<STRING>", token.url="<STRING>", refresh.token="<STRING>", headers="<STRING>", method="<STRING>", downloading.enabled="<BOOL>", download.path="<STRING>", blocking.io="<BOOL>", socket.idle.timeout="<INT>", chunk.disabled="<BOOL>", ssl.protocol="<STRING>", ssl.verification.disabled="<BOOL>", ssl.configurations="<STRING>", proxy.host="<STRING>", proxy.port="<STRING>", proxy.username="<STRING>", proxy.password="<STRING>", client.bootstrap.configurations="<STRING>", max.pool.active.connections="<INT>", min.pool.idle.connections="<INT>", max.pool.idle.connections="<INT>", min.evictable.idle.time="<STRING>", time.between.eviction.runs="<STRING>", max.wait.time="<STRING>", test.on.borrow="<BOOL>", test.while.idle="<BOOL>", exhausted.action="<INT>", hostname.verification.enabled="<BOOL>", @map(...)))

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
publisher.url

The URL which should be called.
Examples:
http://localhost:8080/endpoint,
https://localhost:8080/endpoint

STRING No No
sink.id

Identifier to correlate the http-call sink to its corresponding http-call-response sources to retrieved the responses.

STRING No No
basic.auth.username

The username to be included in the authentication header when calling endpoints protected by basic authentication. basic.auth.password property should be also set when using this property.

- STRING Yes No
basic.auth.password

The password to be included in the authentication header when calling endpoints protected by basic authentication. basic.auth.username property should be also set when using this property.

- STRING Yes No
https.truststore.file

The file path of the client truststore when sending messages through https protocol.

${carbon.home}/resources/security/client-truststore.jks STRING Yes No
https.truststore.password

The password for the client-truststore.

wso2carbon STRING Yes No
oauth.username

The username to be included in the authentication header when calling endpoints protected by OAuth 2.0. oauth.password property should be also set when using this property.

- STRING Yes No
oauth.password

The password to be included in the authentication header when calling endpoints protected by OAuth 2.0. oauth.username property should be also set when using this property.

- STRING Yes No
consumer.key

Consumer key used for calling endpoints protected by OAuth 2.0

- STRING Yes No
consumer.secret

Consumer secret used for calling endpoints protected by OAuth 2.0

- STRING Yes No
token.url

Token URL to generate a new access tokens when calling endpoints protected by OAuth 2.0

- STRING Yes No
refresh.token

Refresh token used for generating new access tokens when calling endpoints protected by OAuth 2.0

- STRING Yes No
headers

HTTP request headers in format "'<key>:<value>','<key>:<value>'".
When the Content-Type header is not provided the system decides the Content-Type based on the provided sink mapper as following:
 - @map(type='xml'): application/xml
 - @map(type='json'): application/json
 - @map(type='text'): plain/text
 - @map(type='keyvalue'): application/x-www-form-urlencoded
 - For all other cases system defaults to plain/text
Also the Content-Length header need not to be provided, as the system automatically defines it by calculating the size of the payload.

Content-Type and Content-Length headers STRING Yes No
method

The HTTP method used for calling the endpoint.

POST STRING Yes No
downloading.enabled

Enable response received by the http-call-response source to be written to a file. When this is enabled the download.path property should be also set.

false BOOL Yes No
download.path

The absolute file path along with the file name where the downloads should be saved.

- STRING Yes Yes
blocking.io

Blocks the request thread until a response it received from HTTP call-response source before sending any other request.

false BOOL Yes No
socket.idle.timeout

Socket timeout in millis.

6000 INT Yes No
chunk.disabled

Disable chunked transfer encoding.

false BOOL Yes No
ssl.protocol

SSL/TLS protocol.

TLS STRING Yes No
ssl.verification.disabled

Disable SSL verification.

false BOOL Yes No
ssl.configurations

SSL/TSL configurations.
Expected format "'<key>:<value>','<key>:<value>'".
Some supported parameters:
 - SSL/TLS protocols: 'sslEnabledProtocols:TLSv1.1,TLSv1.2'
 - List of ciphers: 'ciphers:TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256'
 - Enable session creation: 'client.enable.session.creation:true'
 - Supported server names: 'server.suported.server.names:server'
 - Add HTTP SNIMatcher: 'server.supported.snimatchers:SNIMatcher'

- STRING Yes No
proxy.host

Proxy server host

- STRING Yes No
proxy.port

Proxy server port

- STRING Yes No
proxy.username

Proxy server username

- STRING Yes No
proxy.password

Proxy server password

- STRING Yes No
client.bootstrap.configurations

Client bootstrap configurations in format "'<key>:<value>','<key>:<value>'".
Some supported configurations :
 - Client connect timeout in millis: 'client.bootstrap.connect.timeout:15000'
 - Client socket timeout in seconds: 'client.bootstrap.socket.timeout:15'
 - Client socket reuse: 'client.bootstrap.socket.reuse:true'
 - Enable TCP no delay: 'client.bootstrap.nodelay:true'
 - Enable client keep alive: 'client.bootstrap.keepalive:true'
 - Send buffer size: 'client.bootstrap.sendbuffersize:1048576'
 - Receive buffer size: 'client.bootstrap.recievebuffersize:1048576'

- STRING Yes No
max.pool.active.connections

Maximum possible number of active connection per client pool.

-1 INT Yes No
min.pool.idle.connections

Minimum number of idle connections that can exist per client pool.

0 INT Yes No
max.pool.idle.connections

Maximum number of idle connections that can exist per client pool.

100 INT Yes No
min.evictable.idle.time

Minimum time (in millis) a connection may sit idle in the client pool before it become eligible for eviction.

300000 STRING Yes No
time.between.eviction.runs

Time between two eviction operations (in millis) on the client pool.

30000 STRING Yes No
max.wait.time

The maximum time (in millis) the pool will wait (when there are no available connections) for a connection to be returned to the pool.

60000 STRING Yes No
test.on.borrow

Enable connections to be validated before being borrowed from the client pool.

true BOOL Yes No
test.while.idle

Enable connections to be validated during the eviction operation (if any).

true BOOL Yes No
exhausted.action

Action that should be taken when the maximum number of active connections are being used. This action should be indicated as an int and possible action values are following.
0 - Fail the request.
1 - Block the request, until a connection returns to the pool.
2 - Grow the connection pool size.

1 (Block when exhausted) INT Yes No
hostname.verification.enabled

Enable hostname verification

true BOOL Yes No

System Parameters

Name Description Default Value Possible Parameters
clientBootstrapClientGroupSize

Number of client threads to perform non-blocking read and write to one or more channels.

(Number of available processors) * 2 Any positive integer
clientBootstrapBossGroupSize

Number of boss threads to accept incoming connections.

Number of available processors Any positive integer
clientBootstrapWorkerGroupSize

Number of worker threads to accept the connections from boss threads and perform non-blocking read and write from one or more channels.

(Number of available processors) * 2 Any positive integer
trustStoreLocation

The default truststore file path.

${carbon.home}/resources/security/client-truststore.jks Path to client truststore .jks file
trustStorePassword

The default truststore password.

wso2carbon Truststore password as string

Examples EXAMPLE 1

@sink(type='http-call', sink.id='foo',
      publisher.url='http://localhost:8009/foo',
      @map(type='xml', @payload('{{payloadBody}}')))
define stream FooStream (payloadBody string);

@source(type='http-call-response', sink.id='foo',
        @map(type='text', regex.A='((.|\n)*)',
             @attributes(headers='trp:headers', message='A[1]')))
define stream ResponseStream(message string, headers string);

When events arrive in FooStream, http-call sink makes calls to endpoint on url http://localhost:8009/foo with POST method and Content-Type application/xml.
If the event payloadBody attribute contains following XML:

<item>
    <name>apple</name>
    <price>55</price>
    <quantity>5</quantity>
</item>

the http-call sink maps that and sends it to the endpoint.
When endpoint sends a response it will be consumed by the corresponding http-call-response source correlated via the same sink.id foo and that will map the response message and send it via ResponseStream steam by assigning the message body as message attribute and response headers as headers attribute of the event.

EXAMPLE 2

@sink(type='http-call', publisher.url='http://localhost:8005/files/{{name}}'
      downloading.enabled='true', download.path='{{downloadPath}}{{name}}',
      method='GET', sink.id='download', @map(type='json'))
define stream DownloadRequestStream(name String, id int, downloadPath string);

@source(type='http-call-response', sink.id='download',
        http.status.code='2\\d+',
        @map(type='text', regex.A='((.|\n)*)',
             @attributes(name='trp:name', id='trp:id', file='A[1]')))
define stream ResponseStream2xx(name string, id string, file string);

@source(type='http-call-response', sink.id='download',
        http.status.code='4\\d+',
        @map(type='text', regex.A='((.|\n)*)', @attributes(errorMsg='A[1]')))
define stream ResponseStream4xx(errorMsg string);

When events arrive in DownloadRequestStream with name:foo.txt, id:75 and downloadPath:/user/download/ the http-call sink sends a GET request to the url http://localhost:8005/files/foo.txt to download the file to the given path /user/download/foo.txt and capture the response via its corresponding http-call-response source based on the response status code.
If the response status code is in the range of 200 the message will be received by the http-call-response source associated with the ResponseStream2xx stream which expects http.status.code with regex 2\\d+ while downloading the file to the local file system on the path /user/download/foo.txt and mapping the response message having the absolute file path to event's file attribute.
If the response status code is in the range of 400 then the message will be received by the http-call-response source associated with the ResponseStream4xx stream which expects http.status.code with regex 4\\d+ while mapping the error response to the errorMsg attribute of the event.

http-request (Sink)

Deprecated

(Use http-call sink instead).
The http-request sink publishes messages to endpoints via HTTP or HTTPS protocols using methods such as POST, GET, PUT, and DELETE on formats text, XML or JSON and consume responses through its corresponding http-response source. It also supports calling endpoints protected with basic authentication or OAuth 2.0.

Syntax

@sink(type="http-request", publisher.url="<STRING>", sink.id="<STRING>", basic.auth.username="<STRING>", basic.auth.password="<STRING>", https.truststore.file="<STRING>", https.truststore.password="<STRING>", oauth.username="<STRING>", oauth.password="<STRING>", consumer.key="<STRING>", consumer.secret="<STRING>", token.url="<STRING>", refresh.token="<STRING>", headers="<STRING>", method="<STRING>", downloading.enabled="<BOOL>", download.path="<STRING>", blocking.io="<BOOL>", socket.idle.timeout="<INT>", chunk.disabled="<BOOL>", ssl.protocol="<STRING>", ssl.verification.disabled="<BOOL>", ssl.configurations="<STRING>", proxy.host="<STRING>", proxy.port="<STRING>", proxy.username="<STRING>", proxy.password="<STRING>", client.bootstrap.configurations="<STRING>", max.pool.active.connections="<INT>", min.pool.idle.connections="<INT>", max.pool.idle.connections="<INT>", min.evictable.idle.time="<STRING>", time.between.eviction.runs="<STRING>", max.wait.time="<STRING>", test.on.borrow="<BOOL>", test.while.idle="<BOOL>", exhausted.action="<INT>", hostname.verification.enabled="<BOOL>", @map(...)))

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
publisher.url

The URL which should be called.
Examples:
http://localhost:8080/endpoint,
https://localhost:8080/endpoint

STRING No No
sink.id

Identifier to correlate the http-request sink to its corresponding http-response sources to retrieved the responses.

STRING No No
basic.auth.username

The username to be included in the authentication header when calling endpoints protected by basic authentication. basic.auth.password property should be also set when using this property.

- STRING Yes No
basic.auth.password

The password to be included in the authentication header when calling endpoints protected by basic authentication. basic.auth.username property should be also set when using this property.

- STRING Yes No
https.truststore.file

The file path of the client truststore when sending messages through https protocol.

${carbon.home}/resources/security/client-truststore.jks STRING Yes No
https.truststore.password

The password for the client-truststore.

wso2carbon STRING Yes No
oauth.username

The username to be included in the authentication header when calling endpoints protected by OAuth 2.0. oauth.password property should be also set when using this property.

- STRING Yes No
oauth.password

The password to be included in the authentication header when calling endpoints protected by OAuth 2.0. oauth.username property should be also set when using this property.

- STRING Yes No
consumer.key

Consumer key used for calling endpoints protected by OAuth 2.0

- STRING Yes No
consumer.secret

Consumer secret used for calling endpoints protected by OAuth 2.0

- STRING Yes No
token.url

Token URL to generate a new access tokens when calling endpoints protected by OAuth 2.0

- STRING Yes No
refresh.token

Refresh token used for generating new access tokens when calling endpoints protected by OAuth 2.0

- STRING Yes No
headers

HTTP request headers in format "'<key>:<value>','<key>:<value>'".
When the Content-Type header is not provided the system decides the Content-Type based on the provided sink mapper as following:
 - @map(type='xml'): application/xml
 - @map(type='json'): application/json
 - @map(type='text'): plain/text
 - @map(type='keyvalue'): application/x-www-form-urlencoded
 - For all other cases system defaults to plain/text
Also the Content-Length header need not to be provided, as the system automatically defines it by calculating the size of the payload.

Content-Type and Content-Length headers STRING Yes No
method

The HTTP method used for calling the endpoint.

POST STRING Yes No
downloading.enabled

Enable response received by the http-response source to be written to a file. When this is enabled the download.path property should be also set.

false BOOL Yes No
download.path

The absolute file path along with the file name where the downloads should be saved.

- STRING Yes Yes
blocking.io

Blocks the request thread until a response it received from HTTP call-response source before sending any other request.

false BOOL Yes No
socket.idle.timeout

Socket timeout in millis.

6000 INT Yes No
chunk.disabled

Disable chunked transfer encoding.

false BOOL Yes No
ssl.protocol

SSL/TLS protocol.

TLS STRING Yes No
ssl.verification.disabled

Disable SSL verification.

false BOOL Yes No
ssl.configurations

SSL/TSL configurations in format "'<key>:<value>','<key>:<value>'".
Some supported parameters:
 - SSL/TLS protocols: 'sslEnabledProtocols:TLSv1.1,TLSv1.2'
 - List of ciphers: 'ciphers:TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256'
 - Enable session creation: 'client.enable.session.creation:true'
 - Supported server names: 'server.suported.server.names:server'
 - Add HTTP SNIMatcher: 'server.supported.snimatchers:SNIMatcher'

- STRING Yes No
proxy.host

Proxy server host

- STRING Yes No
proxy.port

Proxy server port

- STRING Yes No
proxy.username

Proxy server username

- STRING Yes No
proxy.password

Proxy server password

- STRING Yes No
client.bootstrap.configurations

Client bootstrap configurations in format "'<key>:<value>','<key>:<value>'".
Some supported configurations :
 - Client connect timeout in millis: 'client.bootstrap.connect.timeout:15000'
 - Client socket timeout in seconds: 'client.bootstrap.socket.timeout:15'
 - Client socket reuse: 'client.bootstrap.socket.reuse:true'
 - Enable TCP no delay: 'client.bootstrap.nodelay:true'
 - Enable client keep alive: 'client.bootstrap.keepalive:true'
 - Send buffer size: 'client.bootstrap.sendbuffersize:1048576'
 - Receive buffer size: 'client.bootstrap.recievebuffersize:1048576'

- STRING Yes No
max.pool.active.connections

Maximum possible number of active connection per client pool.

-1 INT Yes No
min.pool.idle.connections

Minimum number of idle connections that can exist per client pool.

0 INT Yes No
max.pool.idle.connections

Maximum number of idle connections that can exist per client pool.

100 INT Yes No
min.evictable.idle.time

Minimum time (in millis) a connection may sit idle in the client pool before it become eligible for eviction.

300000 STRING Yes No
time.between.eviction.runs

Time between two eviction operations (in millis) on the client pool.

30000 STRING Yes No
max.wait.time

The maximum time (in millis) the pool will wait (when there are no available connections) for a connection to be returned to the pool.

60000 STRING Yes No
test.on.borrow

Enable connections to be validated before being borrowed from the client pool.

true BOOL Yes No
test.while.idle

Enable connections to be validated during the eviction operation (if any).

true BOOL Yes No
exhausted.action

Action that should be taken when the maximum number of active connections are being used. This action should be indicated as an int and possible action values are following.
0 - Fail the request.
1 - Block the request, until a connection returns to the pool.
2 - Grow the connection pool size.

1 (Block when exhausted) INT Yes No
hostname.verification.enabled

Enable hostname verification

true BOOL Yes No

System Parameters

Name Description Default Value Possible Parameters
clientBootstrapClientGroupSize

Number of client threads to perform non-blocking read and write to one or more channels.

(Number of available processors) * 2 Any positive integer
clientBootstrapBossGroupSize

Number of boss threads to accept incoming connections.

Number of available processors Any positive integer
clientBootstrapWorkerGroupSize

Number of worker threads to accept the connections from boss threads and perform non-blocking read and write from one or more channels.

(Number of available processors) * 2 Any positive integer
trustStoreLocation

The default truststore file path.

${carbon.home}/resources/security/client-truststore.jks Path to client truststore .jks file
trustStorePassword

The default truststore password.

wso2carbon Truststore password as string

Examples EXAMPLE 1

@sink(type='http-request', sink.id='foo',
      publisher.url='http://localhost:8009/foo',
      @map(type='xml', @payload('{{payloadBody}}')))
define stream FooStream (payloadBody string);

@source(type='http-response', sink.id='foo',
        @map(type='text', regex.A='((.|\n)*)',
             @attributes(headers='trp:headers', message='A[1]')))
define stream ResponseStream(message string, headers string);

When events arrive in FooStream, http-request sink makes calls to endpoint on url http://localhost:8009/foo with POST method and Content-Type application/xml.
If the event payloadBody attribute contains following XML:

<item>
    <name>apple</name>
    <price>55</price>
    <quantity>5</quantity>
</item>

the http-request sink maps that and sends it to the endpoint.
When endpoint sends a response it will be consumed by the corresponding http-response source correlated via the same sink.id foo and that will map the response message and send it via ResponseStream steam by assigning the message body as message attribute and response headers as headers attribute of the event.

EXAMPLE 2

@sink(type='http-request', publisher.url='http://localhost:8005/files/{{name}}'
      downloading.enabled='true', download.path='{{downloadPath}}{{name}}',
      method='GET', sink.id='download', @map(type='json'))
define stream DownloadRequestStream(name String, id int, downloadPath string);

@source(type='http-response', sink.id='download',
        http.status.code='2\\d+',
        @map(type='text', regex.A='((.|\n)*)',
             @attributes(name='trp:name', id='trp:id', file='A[1]')))
define stream ResponseStream2xx(name string, id string, file string);

@source(type='http-response', sink.id='download',
        http.status.code='4\\d+',
        @map(type='text', regex.A='((.|\n)*)', @attributes(errorMsg='A[1]')))
define stream ResponseStream4xx(errorMsg string);

When events arrive in DownloadRequestStream with name:foo.txt, id:75 and downloadPath:/user/download/ the http-request sink sends a GET request to the url http://localhost:8005/files/foo.txt to download the file to the given path /user/download/foo.txt and capture the response via its corresponding http-response source based on the response status code.
If the response status code is in the range of 200 the message will be received by the http-response source associated with the ResponseStream2xx stream which expects http.status.code with regex 2\\d+ while downloading the file to the local file system on the path /user/download/foo.txt and mapping the response message having the absolute file path to event's file attribute.
If the response status code is in the range of 400 then the message will be received by the http-response source associated with the ResponseStream4xx stream which expects http.status.code with regex 4\\d+ while mapping the error response to the errorMsg attribute of the event.

http-response (Sink)

Deprecated

(Use http-service-response sink instead).
The http-response sink send responses of the requests consumed by its corresponding http-request source, by mapping the response messages to formats such as text, XML and JSON.

Syntax

@sink(type="http-response", source.id="<STRING>", message.id="<STRING>", headers="<STRING>", @map(...)))

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
source.id

Identifier to correlate the http-response sink to its corresponding http-request source which consumed the request.

STRING No No
message.id

Identifier to correlate the response with the request received by http-request source.

STRING No Yes
headers

HTTP request headers in format "'<key>:<value>','<key>:<value>'".
When the Content-Type header is not provided the system decides the Content-Type based on the provided sink mapper as following:
 - @map(type='xml'): application/xml
 - @map(type='json'): application/json
 - @map(type='text'): plain/text
 - @map(type='keyvalue'): application/x-www-form-urlencoded
 - For all other cases system defaults to plain/text
Also the Content-Length header need not to be provided, as the system automatically defines it by calculating the size of the payload.

Content-Type and Content-Length headers STRING Yes No

Examples EXAMPLE 1

@source(type='http-request', receiver.url='http://localhost:5005/add',
        source.id='adder',
        @map(type='json, @attributes(messageId='trp:messageId',
                                     value1='$.event.value1',
                                     value2='$.event.value2')))
define stream AddStream (messageId string, value1 long, value2 long);

@sink(type='http-response', source.id='adder',
      message.id='{{messageId}}', @map(type = 'json'))
define stream ResultStream (messageId string, results long);

@info(name = 'query1')
from AddStream 
select messageId, value1 + value2 as results 
insert into ResultStream;

The http-request source on stream AddStream listens on url http://localhost:5005/stocks for JSON messages with format:

{
  "event": {
    "value1": 3,
    "value2": 4
  }
}


and when events arrive it maps to AddStream events and pass them to query query1 for processing. The query results produced on ResultStream are sent as a response via http-response sink with format:

{
  "event": {
    "results": 7
  }
}

Here the request and response are correlated by passing the messageId produced by the http-request to the respective http-response sink.

http-service-response (Sink)

The http-service-response sink send responses of the requests consumed by its corresponding http-service source, by mapping the response messages to formats such as text, XML and JSON.

Syntax

@sink(type="http-service-response", source.id="<STRING>", message.id="<STRING>", headers="<STRING>", @map(...)))

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
source.id

Identifier to correlate the http-service-response sink to its corresponding http-service source which consumed the request.

STRING No No
message.id

Identifier to correlate the response with the request received by http-service source.

STRING No Yes
headers

HTTP request headers in format "'<key>:<value>','<key>:<value>'".
When the Content-Type header is not provided the system decides the Content-Type based on the provided sink mapper as following:
 - @map(type='xml'): application/xml
 - @map(type='json'): application/json
 - @map(type='text'): plain/text
 - @map(type='keyvalue'): application/x-www-form-urlencoded
 - For all other cases system defaults to plain/text
Also the Content-Length header need not to be provided, as the system automatically defines it by calculating the size of the payload.

Content-Type and Content-Length headers STRING Yes No

Examples EXAMPLE 1

@source(type='http-service', receiver.url='http://localhost:5005/add',
        source.id='adder',
        @map(type='json, @attributes(messageId='trp:messageId',
                                     value1='$.event.value1',
                                     value2='$.event.value2')))
define stream AddStream (messageId string, value1 long, value2 long);

@sink(type='http-service-response', source.id='adder',
      message.id='{{messageId}}', @map(type = 'json'))
define stream ResultStream (messageId string, results long);

@info(name = 'query1')
from AddStream 
select messageId, value1 + value2 as results 
insert into ResultStream;

The http-service source on stream AddStream listens on url http://localhost:5005/stocks for JSON messages with format:

{
  "event": {
    "value1": 3,
    "value2": 4
  }
}


and when events arrive it maps to AddStream events and pass them to query query1 for processing. The query results produced on ResultStream are sent as a response via http-service-response sink with format:

{
  "event": {
    "results": 7
  }
}

Here the request and response are correlated by passing the messageId produced by the http-service to the respective http-service-response sink.

Source

http (Source)

HTTP source receives POST requests via HTTP and HTTPS protocols in format such as text, XML and JSON. It also supports basic authentication to ensure events are received from authorized users/systems.
The request headers and properties can be accessed via transport properties in the format trp:<header>.

Syntax

@source(type="http", receiver.url="<STRING>", basic.auth.enabled="<STRING>", worker.count="<INT>", socket.idle.timeout="<INT>", ssl.verify.client="<STRING>", ssl.protocol="<STRING>", tls.store.type="<STRING>", ssl.configurations="<STRING>", request.size.validation.configurations="<STRING>", header.validation.configurations="<STRING>", server.bootstrap.configurations="<STRING>", trace.log.enabled="<BOOL>", @map(...)))

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
receiver.url

The URL on which events should be received. To enable SSL use https protocol in the url.

http://0.0.0.0:9763/<appNAme>/<streamName> STRING Yes No
basic.auth.enabled

This only works in VM, Docker and Kubernetes.
Where when enabled it authenticates each request using the Authorization:'Basic encodeBase64(username:Password)' header.

false STRING Yes No
worker.count

The number of active worker threads to serve the incoming events. By default the value is set to 1 to ensure events are processed in the same order they arrived. By increasing this value, higher performance can be achieved in the expense of loosing event ordering.

1 INT Yes No
socket.idle.timeout

Idle timeout for HTTP connection in millis.

120000 INT Yes No
ssl.verify.client

The type of client certificate verification. Supported values are require, optional.

- STRING Yes No
ssl.protocol

SSL/TLS protocol.

TLS STRING Yes No
tls.store.type

TLS store type.

JKS STRING Yes No
ssl.configurations

SSL/TSL configurations in format "'<key>:<value>','<key>:<value>'".
Some supported parameters:
 - SSL/TLS protocols: 'sslEnabledProtocols:TLSv1.1,TLSv1.2'
 - List of ciphers: 'ciphers:TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256'
 - Enable session creation: 'client.enable.session.creation:true'
 - Supported server names: 'server.suported.server.names:server'
 - Add HTTP SNIMatcher: 'server.supported.snimatchers:SNIMatcher'

- STRING Yes No
request.size.validation.configurations

Configurations to validate the HTTP request size.
Expected format "'<key>:<value>','<key>:<value>'".
Some supported configurations :
 - Enable request size validation: 'request.size.validation:true'
 If request size is validated
 - Maximum request size: 'request.size.validation.maximum.value:2048'
 - Response status code when request size validation fails: 'request.size.validation.reject.status.code:401'
 - Response message when request size validation fails: 'request.size.validation.reject.message:Message is bigger than the valid size'
 - Response Content-Type when request size validation fails: 'request.size.validation.reject.message.content.type:plain/text'

- STRING Yes No
header.validation.configurations

Configurations to validate HTTP headers.
Expected format "'<key>:<value>','<key>:<value>'".
Some supported configurations :
 - Enable header size validation: 'header.size.validation:true'
 If header size is validated
 - Maximum length of initial line: 'header.validation.maximum.request.line:4096'
 - Maximum length of all headers: 'header.validation.maximum.size:8192'
 - Maximum length of the content or each chunk: 'header.validation.maximum.chunk.size:8192'
 - Response status code when header validation fails: 'header.validation.reject.status.code:401'
 - Response message when header validation fails: 'header.validation.reject.message:Message header is bigger than the valid size'
 - Response Content-Type when header validation fails: 'header.validation.reject.message.content.type:plain/text'

- STRING Yes No
server.bootstrap.configurations

Server bootstrap configurations in format "'<key>:<value>','<key>:<value>'".
Some supported configurations :
 - Server connect timeout in millis: 'server.bootstrap.connect.timeout:15000'
 - Server socket timeout in seconds: 'server.bootstrap.socket.timeout:15'
 - Enable TCP no delay: 'server.bootstrap.nodelay:true'
 - Enable server keep alive: 'server.bootstrap.keepalive:true'
 - Send buffer size: 'server.bootstrap.sendbuffersize:1048576'
 - Receive buffer size: 'server.bootstrap.recievebuffersize:1048576'
 - Number of connections queued: 'server.bootstrap.socket.backlog:100'

- STRING Yes No
trace.log.enabled

Enable trace log for traffic monitoring.

false BOOL Yes No

System Parameters

Name Description Default Value Possible Parameters
serverBootstrapBossGroupSize

Number of boss threads to accept incoming connections.

Number of available processors Any positive integer
serverBootstrapWorkerGroupSize

Number of worker threads to accept the connections from boss threads and perform non-blocking read and write from one or more channels.

(Number of available processors) * 2 Any positive integer
serverBootstrapClientGroupSize

Number of client threads to perform non-blocking read and write to one or more channels.

(Number of available processors) * 2 Any positive integer
defaultHost

The default host of the transport.

0.0.0.0 Any valid host
defaultScheme

The default protocol.

http http
https
defaultHttpPort

The default HTTP port when default scheme is http.

8280 Any valid port
defaultHttpsPort

The default HTTPS port when default scheme is https.

8243 Any valid port
keyStoreLocation

The default keystore file path.

${carbon.home}/resources/security/wso2carbon.jks Path to .jks file
keyStorePassword

The default keystore password.

wso2carbon Keystore password as string

Examples EXAMPLE 1

@app.name('StockProcessor')

@source(type='http', @map(type = 'json'))
define stream StockStream (symbol string, price float, volume long);

Above HTTP source listeners on url http://0.0.0.0:9763/StockProcessor/StockStream for JSON messages on the format:

{
  "event": {
    "symbol": "FB",
    "price": 24.5,
    "volume": 5000
  }
}

It maps the incoming messages and sends them to StockStream for processing.

EXAMPLE 2

@source(type='http', receiver.url='http://localhost:5005/stocks',
        @map(type = 'xml'))
define stream StockStream (symbol string, price float, volume long);

Above HTTP source listeners on url http://localhost:5005/stocks for JSON messages on the format:

<events>
    <event>
        <symbol>Fb</symbol>
        <price>55.6</price>
        <volume>100</volume>
    </event>
</events>


It maps the incoming messages and sends them to StockStream for processing.

http-call-response (Source)

The http-call-response source receives the responses for the calls made by its corresponding http-call sink, and maps them from formats such as text, XML and JSON.
To handle messages with different http status codes having different formats, multiple http-call-response sources are allowed to associate with a single http-call sink.
It allows accessing the attributes of the event that initiated the call, and the response headers and properties via transport properties in the format trp:<attribute name> and trp:<header/property> respectively.

Syntax

@source(type="http-call-response", sink.id="<STRING>", http.status.code="<STRING>", allow.streaming.responses="<BOOL>", @map(...)))

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
sink.id

Identifier to correlate the http-call-response source with its corresponding http-call sink that published the messages.

STRING No No
http.status.code

The matching http responses status code regex, that is used to filter the the messages which will be processed by the source.Eg: http.status.code = '200',
http.status.code = '4\d+'

200 STRING Yes No
allow.streaming.responses

Enable consuming responses on a streaming manner.

false BOOL Yes No

Examples EXAMPLE 1

@sink(type='http-call', method='POST',
      publisher.url='http://localhost:8005/registry/employee',
      sink.id='employee-info', @map(type='json')) 
define stream EmployeeRequestStream (name string, id int);

@source(type='http-call-response', sink.id='employee-info',
        http.status.code='2\\d+',
        @map(type='json',
             @attributes(name='trp:name', id='trp:id',
                         location='$.town', age='$.age')))
define stream EmployeeResponseStream(name string, id int,
                                     location string, age int);

@source(type='http-call-response', sink.id='employee-info',
        http.status.code='4\\d+',
        @map(type='text', regex.A='((.|\n)*)',
             @attributes(error='A[1]')))
define stream EmployeeErrorStream(error string);

When events arrive in EmployeeRequestStream, http-call sink makes calls to endpoint on url http://localhost:8005/registry/employee with POST method and Content-Type application/json.
If the arriving event has attributes name:John and id:1423 it will send a message with default JSON mapping as follows:

{
  "event": {
    "name": "John",
    "id": 1423
  }
}

When the endpoint responds with status code in the range of 200 the message will be received by the http-call-response source associated with the EmployeeResponseStream stream, because it is correlated with the sink by the same sink.id employee-info and as that expects messages with http.status.code in regex format 2\\d+. If the response message is in the format

{
  "town": "NY",
  "age": 24
}

the source maps the location and age attributes by executing JSON path on the message and maps the name and id attributes by extracting them from the request event via as transport properties.
If the response status code is in the range of 400 then the message will be received by the http-call-response source associated with the EmployeeErrorStream stream, because it is correlated with the sink by the same sink.id employee-info and it expects messages with http.status.code in regex format 4\\d+, and maps the error response to the error attribute of the event.

http-request (Source)

Deprecated

(Use http-service source instead).
The http-request source receives POST requests via HTTP and HTTPS protocols in format such as text, XML and JSON and sends responses via its corresponding http-response sink correlated through a unique source.id.
For request and response correlation, it generates a messageId upon each incoming request and expose it via transport properties in the format trp:messageId to correlate them with the responses at the http-response sink.
The request headers and properties can be accessed via transport properties in the format trp:<header>.
It also supports basic authentication to ensure events are received from authorized users/systems.

Syntax

@source(type="http-request", receiver.url="<STRING>", source.id="<STRING>", connection.timeout="<INT>", basic.auth.enabled="<STRING>", worker.count="<INT>", socket.idle.timeout="<INT>", ssl.verify.client="<STRING>", ssl.protocol="<STRING>", tls.store.type="<STRING>", ssl.configurations="<STRING>", request.size.validation.configurations="<STRING>", header.validation.configurations="<STRING>", server.bootstrap.configurations="<STRING>", trace.log.enabled="<BOOL>", @map(...)))

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
receiver.url

The URL on which events should be received. To enable SSL use https protocol in the url.

http://0.0.0.0:9763/<appNAme>/<streamName> STRING Yes No
source.id

Identifier to correlate the http-request source to its corresponding http-response sinks to send responses.

STRING No No
connection.timeout

Connection timeout in millis. The system will send a timeout, if a corresponding response is not sent by an associated http-response sink within the given time.

120000 INT Yes No
basic.auth.enabled

This only works in VM, Docker and Kubernetes.
Where when enabled it authenticates each request using the Authorization:'Basic encodeBase64(username:Password)' header.

false STRING Yes No
worker.count

The number of active worker threads to serve the incoming events. By default the value is set to 1 to ensure events are processed in the same order they arrived. By increasing this value, higher performance can be achieved in the expense of loosing event ordering.

1 INT Yes No
socket.idle.timeout

Idle timeout for HTTP connection in millis.

120000 INT Yes No
ssl.verify.client

The type of client certificate verification. Supported values are require, optional.

- STRING Yes No
ssl.protocol

SSL/TLS protocol.

TLS STRING Yes No
tls.store.type

TLS store type.

JKS STRING Yes No
ssl.configurations

SSL/TSL configurations in format "'<key>:<value>','<key>:<value>'".
Some supported parameters:
 - SSL/TLS protocols: 'sslEnabledProtocols:TLSv1.1,TLSv1.2'
 - List of ciphers: 'ciphers:TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256'
 - Enable session creation: 'client.enable.session.creation:true'
 - Supported server names: 'server.suported.server.names:server'
 - Add HTTP SNIMatcher: 'server.supported.snimatchers:SNIMatcher'

- STRING Yes No
request.size.validation.configurations

Configurations to validate the HTTP request size.
Expected format "'<key>:<value>','<key>:<value>'".
Some supported configurations :
 - Enable request size validation: 'request.size.validation:true'
 If request size is validated
 - Maximum request size: 'request.size.validation.maximum.value:2048'
 - Response status code when request size validation fails: 'request.size.validation.reject.status.code:401'
 - Response message when request size validation fails: 'request.size.validation.reject.message:Message is bigger than the valid size'
 - Response Content-Type when request size validation fails: 'request.size.validation.reject.message.content.type:plain/text'

- STRING Yes No
header.validation.configurations

Configurations to validate HTTP headers.
Expected format "'<key>:<value>','<key>:<value>'".
Some supported configurations :
 - Enable header size validation: 'header.size.validation:true'
 If header size is validated
 - Maximum length of initial line: 'header.validation.maximum.request.line:4096'
 - Maximum length of all headers: 'header.validation.maximum.size:8192'
 - Maximum length of the content or each chunk: 'header.validation.maximum.chunk.size:8192'
 - Response status code when header validation fails: 'header.validation.reject.status.code:401'
 - Response message when header validation fails: 'header.validation.reject.message:Message header is bigger than the valid size'
 - Response Content-Type when header validation fails: 'header.validation.reject.message.content.type:plain/text'

- STRING Yes No
server.bootstrap.configurations

Server bootstrap configurations in format "'<key>:<value>','<key>:<value>'".
Some supported configurations :
 - Server connect timeout in millis: 'server.bootstrap.connect.timeout:15000'
 - Server socket timeout in seconds: 'server.bootstrap.socket.timeout:15'
 - Enable TCP no delay: 'server.bootstrap.nodelay:true'
 - Enable server keep alive: 'server.bootstrap.keepalive:true'
 - Send buffer size: 'server.bootstrap.sendbuffersize:1048576'
 - Receive buffer size: 'server.bootstrap.recievebuffersize:1048576'
 - Number of connections queued: 'server.bootstrap.socket.backlog:100'

- STRING Yes No
trace.log.enabled

Enable trace log for traffic monitoring.

false BOOL Yes No

System Parameters

Name Description Default Value Possible Parameters
serverBootstrapBossGroupSize

Number of boss threads to accept incoming connections.

Number of available processors Any positive integer
serverBootstrapWorkerGroupSize

Number of worker threads to accept the connections from boss threads and perform non-blocking read and write from one or more channels.

(Number of available processors) * 2 Any positive integer
serverBootstrapClientGroupSize

Number of client threads to perform non-blocking read and write to one or more channels.

(Number of available processors) * 2 Any positive integer
defaultHost

The default host of the transport.

0.0.0.0 Any valid host
defaultScheme

The default protocol.

http http
https
defaultHttpPort

The default HTTP port when default scheme is http.

8280 Any valid port
defaultHttpsPort

The default HTTPS port when default scheme is https.

8243 Any valid port
keyStoreLocation

The default keystore file path.

${carbon.home}/resources/security/wso2carbon.jks Path to .jks file
keyStorePassword

The default keystore password.

wso2carbon Keystore password as string

Examples EXAMPLE 1

@source(type='http-request', receiver.url='http://localhost:5005/add',
        source.id='adder',
        @map(type='json, @attributes(messageId='trp:messageId',
                                     value1='$.event.value1',
                                     value2='$.event.value2')))
define stream AddStream (messageId string, value1 long, value2 long);

@sink(type='http-response', source.id='adder',
      message.id='{{messageId}}', @map(type = 'json'))
define stream ResultStream (messageId string, results long);

@info(name = 'query1')
from AddStream 
select messageId, value1 + value2 as results 
insert into ResultStream;

Above sample listens events on http://localhost:5005/stocks url for JSON messages on the format:

{
  "event": {
    "value1": 3,
    "value2": 4
  }
}


Map the vents into AddStream, process the events through query query1, and sends the results produced on ResultStream via http-response sink on the message format:

{
  "event": {
    "results": 7
  }
}

http-response (Source)

Deprecated

(Use http-call-response source instead).
The http-response source receives the responses for the calls made by its corresponding http-request sink, and maps them from formats such as text, XML and JSON.
To handle messages with different http status codes having different formats, multiple http-response sources are allowed to associate with a single http-request sink. It allows accessing the attributes of the event that initiated the call, and the response headers and properties via transport properties in the format trp:<attribute name> and trp:<header/property> respectively.

Syntax

@source(type="http-response", sink.id="<STRING>", http.status.code="<STRING>", allow.streaming.responses="<BOOL>", @map(...)))

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
sink.id

Identifier to correlate the http-response source with its corresponding http-request sink that published the messages.

STRING No No
http.status.code

The matching http responses status code regex, that is used to filter the the messages which will be processed by the source.Eg: http.status.code = '200',
http.status.code = '4\d+'

200 STRING Yes No
allow.streaming.responses

Enable consuming responses on a streaming manner.

false BOOL Yes No

Examples EXAMPLE 1

@sink(type='http-request', method='POST',
      publisher.url='http://localhost:8005/registry/employee',
      sink.id='employee-info', @map(type='json')) 
define stream EmployeeRequestStream (name string, id int);

@source(type='http-response', sink.id='employee-info',
        http.status.code='2\\d+',
        @map(type='json',
             @attributes(name='trp:name', id='trp:id',
                         location='$.town', age='$.age')))
define stream EmployeeResponseStream(name string, id int,
                                     location string, age int);

@source(type='http-response', sink.id='employee-info',
        http.status.code='4\\d+',
        @map(type='text', regex.A='((.|\n)*)',
             @attributes(error='A[1]')))
define stream EmployeeErrorStream(error string);

When events arrive in EmployeeRequestStream, http-request sink makes calls to endpoint on url http://localhost:8005/registry/employee with POST method and Content-Type application/json.
If the arriving event has attributes name:John and id:1423 it will send a message with default JSON mapping as follows:

{
  "event": {
    "name": "John",
    "id": 1423
  }
}

When the endpoint responds with status code in the range of 200 the message will be received by the http-response source associated with the EmployeeResponseStream stream, because it is correlated with the sink by the same sink.id employee-info and as that expects messages with http.status.code in regex format 2\\d+. If the response message is in the format

{
  "town": "NY",
  "age": 24
}

the source maps the location and age attributes by executing JSON path on the message and maps the name and id attributes by extracting them from the request event via as transport properties.
If the response status code is in the range of 400 then the message will be received by the http-response source associated with the EmployeeErrorStream stream, because it is correlated with the sink by the same sink.id employee-info and it expects messages with http.status.code in regex format 4\\d+, and maps the error response to the error attribute of the event.

http-service (Source)

The http-service source receives POST requests via HTTP and HTTPS protocols in format such as text, XML and JSON and sends responses via its corresponding http-service-response sink correlated through a unique source.id.
For request and response correlation, it generates a messageId upon each incoming request and expose it via transport properties in the format trp:messageId to correlate them with the responses at the http-service-response sink.
The request headers and properties can be accessed via transport properties in the format trp:<header>.
It also supports basic authentication to ensure events are received from authorized users/systems.

Syntax

@source(type="http-service", receiver.url="<STRING>", source.id="<STRING>", connection.timeout="<INT>", basic.auth.enabled="<STRING>", worker.count="<INT>", socket.idle.timeout="<INT>", ssl.verify.client="<STRING>", ssl.protocol="<STRING>", tls.store.type="<STRING>", ssl.configurations="<STRING>", request.size.validation.configurations="<STRING>", header.validation.configurations="<STRING>", server.bootstrap.configurations="<STRING>", trace.log.enabled="<BOOL>", @map(...)))

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
receiver.url

The URL on which events should be received. To enable SSL use https protocol in the url.

http://0.0.0.0:9763/<appNAme>/<streamName> STRING Yes No
source.id

Identifier to correlate the http-service source to its corresponding http-service-response sinks to send responses.

STRING No No
connection.timeout

Connection timeout in millis. The system will send a timeout, if a corresponding response is not sent by an associated http-service-response sink within the given time.

120000 INT Yes No
basic.auth.enabled

This only works in VM, Docker and Kubernetes.
Where when enabled it authenticates each request using the Authorization:'Basic encodeBase64(username:Password)' header.

false STRING Yes No
worker.count

The number of active worker threads to serve the incoming events. By default the value is set to 1 to ensure events are processed in the same order they arrived. By increasing this value, higher performance can be achieved in the expense of loosing event ordering.

1 INT Yes No
socket.idle.timeout

Idle timeout for HTTP connection in millis.

120000 INT Yes No
ssl.verify.client

The type of client certificate verification. Supported values are require, optional.

- STRING Yes No
ssl.protocol

SSL/TLS protocol.

TLS STRING Yes No
tls.store.type

TLS store type.

JKS STRING Yes No
ssl.configurations

SSL/TSL configurations in format "'<key>:<value>','<key>:<value>'".
Some supported parameters:
 - SSL/TLS protocols: 'sslEnabledProtocols:TLSv1.1,TLSv1.2'
 - List of ciphers: 'ciphers:TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256'
 - Enable session creation: 'client.enable.session.creation:true'
 - Supported server names: 'server.suported.server.names:server'
 - Add HTTP SNIMatcher: 'server.supported.snimatchers:SNIMatcher'

- STRING Yes No
request.size.validation.configurations

Configurations to validate the HTTP request size.
Expected format "'<key>:<value>','<key>:<value>'".
Some supported configurations :
 - Enable request size validation: 'request.size.validation:true'
 If request size is validated
 - Maximum request size: 'request.size.validation.maximum.value:2048'
 - Response status code when request size validation fails: 'request.size.validation.reject.status.code:401'
 - Response message when request size validation fails: 'request.size.validation.reject.message:Message is bigger than the valid size'
 - Response Content-Type when request size validation fails: 'request.size.validation.reject.message.content.type:plain/text'

- STRING Yes No
header.validation.configurations

Configurations to validate HTTP headers.
Expected format "'<key>:<value>','<key>:<value>'".
Some supported configurations :
 - Enable header size validation: 'header.size.validation:true'
 If header size is validated
 - Maximum length of initial line: 'header.validation.maximum.request.line:4096'
 - Maximum length of all headers: 'header.validation.maximum.size:8192'
 - Maximum length of the content or each chunk: 'header.validation.maximum.chunk.size:8192'
 - Response status code when header validation fails: 'header.validation.reject.status.code:401'
 - Response message when header validation fails: 'header.validation.reject.message:Message header is bigger than the valid size'
 - Response Content-Type when header validation fails: 'header.validation.reject.message.content.type:plain/text'

- STRING Yes No
server.bootstrap.configurations

Server bootstrap configurations in format "'<key>:<value>','<key>:<value>'".
Some supported configurations :
 - Server connect timeout in millis: 'server.bootstrap.connect.timeout:15000'
 - Server socket timeout in seconds: 'server.bootstrap.socket.timeout:15'
 - Enable TCP no delay: 'server.bootstrap.nodelay:true'
 - Enable server keep alive: 'server.bootstrap.keepalive:true'
 - Send buffer size: 'server.bootstrap.sendbuffersize:1048576'
 - Receive buffer size: 'server.bootstrap.recievebuffersize:1048576'
 - Number of connections queued: 'server.bootstrap.socket.backlog:100'

- STRING Yes No
trace.log.enabled

Enable trace log for traffic monitoring.

false BOOL Yes No

System Parameters

Name Description Default Value Possible Parameters
serverBootstrapBossGroupSize

Number of boss threads to accept incoming connections.

Number of available processors Any positive integer
serverBootstrapWorkerGroupSize

Number of worker threads to accept the connections from boss threads and perform non-blocking read and write from one or more channels.

(Number of available processors) * 2 Any positive integer
serverBootstrapClientGroupSize

Number of client threads to perform non-blocking read and write to one or more channels.

(Number of available processors) * 2 Any positive integer
defaultHost

The default host of the transport.

0.0.0.0 Any valid host
defaultScheme

The default protocol.

http http
https
defaultHttpPort

The default HTTP port when default scheme is http.

8280 Any valid port
defaultHttpsPort

The default HTTPS port when default scheme is https.

8243 Any valid port
keyStoreLocation

The default keystore file path.

${carbon.home}/resources/security/wso2carbon.jks Path to .jks file
keyStorePassword

The default keystore password.

wso2carbon Keystore password as string

Examples EXAMPLE 1

@source(type='http-service', receiver.url='http://localhost:5005/add',
        source.id='adder',
        @map(type='json, @attributes(messageId='trp:messageId',
                                     value1='$.event.value1',
                                     value2='$.event.value2')))
define stream AddStream (messageId string, value1 long, value2 long);

@sink(type='http-service-response', source.id='adder',
      message.id='{{messageId}}', @map(type = 'json'))
define stream ResultStream (messageId string, results long);

@info(name = 'query1')
from AddStream 
select messageId, value1 + value2 as results 
insert into ResultStream;

Above sample listens events on http://localhost:5005/stocks url for JSON messages on the format:

{
  "event": {
    "value1": 3,
    "value2": 4
  }
}


Map the vents into AddStream, process the events through query query1, and sends the results produced on ResultStream via http-service-response sink on the message format:

{
  "event": {
    "results": 7
  }
}