API Docs - v3.2.1
Tested Siddhi Core version: 5.1.21
It could also support other Siddhi Core minor versions.
Sink
elasticsearch (Sink)
Elasticsearch sink implementation uses Elasticsearch indexing document for underlying data storage. The events that are published from the sink will be converted into elasticsearch index documents. The elasticsearch sink is connected to the Elastisearch server via the Elasticsearch Java High Level REST Client library. By using this sink, we can customize the json document before it's stored in the elasticsearch.
Syntax@sink(type="elasticsearch", hostname="<STRING>", port="<INT>", scheme="<STRING>", elasticsearch.member.list="<STRING>", username="<STRING>", password="<STRING>", index.name="<STRING>", payload.index.of.index.name="<INT>", index.alias="<STRING>", index.number.of.shards="<INT>", index.number.of.replicas="<INT>", bulk.actions="<INT>", bulk.size="<LONG>", concurrent.requests="<INT>", flush.interval="<LONG>", backoff.policy.retry.no="<INT>", backoff.policy.wait.time="<LONG>", ssl.enabled="<BOOL>", trust.store.type="<STRING>", trust.store.path="<STRING>", trust.store.pass="<STRING>", backoff.policy="<STRING>", backoff.policy.retry.no="<INT>", backoff.policy.wait.time="<INT>", @map(...)))
QUERY PARAMETERS
Name | Description | Default Value | Possible Data Types | Optional | Dynamic |
---|---|---|---|---|---|
hostname | The hostname of the Elasticsearch server. |
localhost | STRING | Yes | No |
port | The port of the Elasticsearch server. |
9200 | INT | Yes | No |
scheme | The scheme type of the Elasticsearch server connection. |
http | STRING | Yes | No |
elasticsearch.member.list | The list of elasticsearch host names. in comma separated manner |
null | STRING | Yes | No |
username | The username for the Elasticsearch server connection. |
elastic | STRING | Yes | No |
password | The password for the Elasticsearch server connection. |
changeme | STRING | Yes | No |
index.name | The name of the Elasticsearch index.This must be in lower case |
The table name defined in the Siddhi App query. | STRING | Yes | No |
payload.index.of.index.name | The payload which is used to create the index. This can be used if the user needs to create index names dynamically. This must be in lower case. If this parameter is configured then respective elasticsearch table can be only used for insert operations because indices are created in the runtime dynamically. |
-1 | INT | Yes | No |
index.alias | The alias of the Elasticsearch index. |
null | STRING | Yes | No |
index.number.of.shards | The number of shards allocated for the index in the Elasticsearch server. |
3 | INT | Yes | No |
index.number.of.replicas | The number of replicas for the index in the Elasticsearch server. |
2 | INT | Yes | No |
bulk.actions | The number of actions to be added to flush a new bulk request. Use -1 to disable it |
1 | INT | Yes | No |
bulk.size | The size of size of actions currently added to the bulk request to flush a new bulk request in MB. Use -1 to disable it |
1 | LONG | Yes | No |
concurrent.requests | The number of concurrent requests allowed to be executed. Use 0 to only allow the execution of a single request |
0 | INT | Yes | No |
flush.interval | The flush interval flushing any BulkRequest pending if the interval passes. |
10 | LONG | Yes | No |
backoff.policy.retry.no | The number of retries until backoff (The backoff policy defines how the bulk processor should handle retries of bulk requests internally in case they have failed due to resource constraints (i.e. a thread pool was full)). |
3 | INT | Yes | No |
backoff.policy.wait.time | The constant back off policy that initially waits until the next retry in seconds. |
1 | LONG | Yes | No |
ssl.enabled | SSL is enabled or not. |
null | BOOL | Yes | No |
trust.store.type | Trust store type. |
jks | STRING | Yes | No |
trust.store.path | Trust store path. |
null | STRING | Yes | No |
trust.store.pass | Trust store password. |
wso2carbon | STRING | Yes | No |
backoff.policy | Provides a backoff policy(eg: constantBackoff, exponentialBackoff, disable) for bulk requests, whenever a bulk request is rejected due to resource constraints. Bulk processor will wait before the operation is retried internally. |
constantBackoff | STRING | Yes | No |
backoff.policy.retry.no | The maximum number of retries. Must be a non-negative number. |
3 | INT | Yes | No |
backoff.policy.wait.time | The delay defines how long to wait between retry attempts. Must not be null. |
1 | INT | Yes | No |
Examples EXAMPLE 1
@sink(type='elasticsearch', hostname='172.0.0.1', port='9200',index.name='stock_index', @map(type='json', @payload("""{
"Stock Data":{
"Symbol":"{{symbol}}",
"Price":{{price}},
"Volume":{{volume}}
}
}""")))define stream stock_stream(symbol string, price float, volume long);
This will create an index called 'stock_index' if it does not already exist in the elasticsearch server and saves the custom json document.
Store
elasticsearch (Store)
Elasticsearch store implementation uses Elasticsearch indexing document for underlying data storage. The events are converted to Elasticsearch index documents when the events are inserted into the elasticsearch store. Elasticsearch indexing documents are converted to events when the documents are read from Elasticsearch indexes. The internal store is connected to the Elasticsearch server via the Elasticsearch Java High Level REST Client library.
Syntax@Store(type="elasticsearch", hostname="<STRING>", port="<INT>", scheme="<STRING>", elasticsearch.member.list="<STRING>", username="<STRING>", password="<STRING>", index.name="<STRING>", payload.index.of.index.name="<INT>", index.alias="<STRING>", index.number.of.shards="<INT>", index.number.of.replicas="<INT>", bulk.actions="<INT>", bulk.size="<LONG>", concurrent.requests="<INT>", flush.interval="<LONG>", backoff.policy.retry.no="<INT>", backoff.policy.wait.time="<LONG>", ssl.enabled="<BOOL>", trust.store.type="<STRING>", trust.store.path="<STRING>", trust.store.pass="<STRING>", backoff.policy="<STRING>", backoff.policy.retry.no="<INT>", backoff.policy.wait.time="<INT>")
@PrimaryKey("PRIMARY_KEY")
@Index("INDEX")
QUERY PARAMETERS
Name | Description | Default Value | Possible Data Types | Optional | Dynamic |
---|---|---|---|---|---|
hostname | The hostname of the Elasticsearch server. |
localhost | STRING | Yes | No |
port | The port of the Elasticsearch server. |
9200 | INT | Yes | No |
scheme | The scheme type of the Elasticsearch server connection. |
http | STRING | Yes | No |
elasticsearch.member.list | The list of elasticsearch host names. in comma separated manner |
null | STRING | Yes | No |
username | The username for the Elasticsearch server connection. |
elastic | STRING | Yes | No |
password | The password for the Elasticsearch server connection. |
changeme | STRING | Yes | No |
index.name | The name of the Elasticsearch index.This must be in lower case |
The table name defined in the Siddhi App query. | STRING | Yes | No |
payload.index.of.index.name | The payload which is used to create the index. This can be used if the user needs to create index names dynamically. This must be in lower case. If this parameter is configured then respective elasticsearch table can be only used for insert operations because indices are created in the runtime dynamically. |
-1 | INT | Yes | No |
index.alias | The alias of the Elasticsearch index. |
null | STRING | Yes | No |
index.number.of.shards | The number of shards allocated for the index in the Elasticsearch server. |
3 | INT | Yes | No |
index.number.of.replicas | The number of replicas for the index in the Elasticsearch server. |
2 | INT | Yes | No |
bulk.actions | The number of actions to be added to flush a new bulk request. Use -1 to disable it |
1 | INT | Yes | No |
bulk.size | The size of size of actions currently added to the bulk request to flush a new bulk request in MB. Use -1 to disable it |
1 | LONG | Yes | No |
concurrent.requests | The number of concurrent requests allowed to be executed. Use 0 to only allow the execution of a single request |
0 | INT | Yes | No |
flush.interval | The flush interval flushing any BulkRequest pending if the interval passes. |
10 | LONG | Yes | No |
backoff.policy.retry.no | The number of retries until backoff (The backoff policy defines how the bulk processor should handle retries of bulk requests internally in case they have failed due to resource constraints (i.e. a thread pool was full)). |
3 | INT | Yes | No |
backoff.policy.wait.time | The constant back off policy that initially waits until the next retry in seconds. |
1 | LONG | Yes | No |
ssl.enabled | SSL is enabled or not. |
null | BOOL | Yes | No |
trust.store.type | Trust store type. |
jks | STRING | Yes | No |
trust.store.path | Trust store path. |
null | STRING | Yes | No |
trust.store.pass | Trust store password. |
wso2carbon | STRING | Yes | No |
backoff.policy | Provides a backoff policy(eg: constantBackoff, exponentialBackoff, disable) for bulk requests, whenever a bulk request is rejected due to resource constraints. Bulk processor will wait before the operation is retried internally. |
constantBackoff | STRING | Yes | No |
backoff.policy.retry.no | The maximum number of retries. Must be a non-negative number. |
3 | INT | Yes | No |
backoff.policy.wait.time | The delay defines how long to wait between retry attempts. Must not be null. |
1 | INT | Yes | No |
Examples EXAMPLE 1
@Store(type="elasticsearch", hostname="localhost", username="elastic", password="changeme", index.name="mystocktable", field.length="symbol:100", bulk.actions="5000", bulk.size="1", concurrent.requests="2", flush.interval="1", backoff.policy.retry.no="3", backoff.policy.wait.time="1")
@PrimaryKey("symbol")define table StockTable (symbol string, price float, volume long);
This example creates an index named 'mystocktable' in the Elasticsearch server if it does not already exist (with three attributes named 'symbol', 'price', and 'volume' of the types 'string', 'float' and 'long' respectively). The connection is made as specified by the parameters configured for the '@Store' annotation. The 'symbol' attribute is considered a unique field and an Elasticsearch index document ID is generated for it.
EXAMPLE 2
@Store(type="elasticsearch", hostname="localhost", username="elastic", password="changeme", index.name="mystocktable", field.length="symbol:100", bulk.actions="5000", bulk.size="1", concurrent.requests="2", flush.interval="1", backoff.policy.retry.no="3", backoff.policy.wait.time="1", ssl.enabled="true", trust.store.type="jks", trust.store.path="/User/wso2/wso2sp/resources/security/client-truststore.jks", trust.store.pass="wso2carbon")
@PrimaryKey("symbol")define table StockTable (symbol string, price float, volume long);
This example uses SSL to connect to Elasticsearch.
EXAMPLE 3
@Store(type="elasticsearch", elasticsearch.member.list="https://hostname1:9200,https://hostname2:9200", username="elastic", password="changeme", index.name="mystocktable", field.length="symbol:100", bulk.actions="5000", bulk.size="1", concurrent.requests="2", flush.interval="1", backoff.policy.retry.no="3", backoff.policy.wait.time="1")
@PrimaryKey("symbol")define table StockTable (symbol string, price float, volume long);
This example defined several elasticsearch members to publish data using elasticsearch.member.list parameter.