Skip to content

API Docs - v3.2.1

Tested Siddhi Core version: 5.1.21

It could also support other Siddhi Core minor versions.

Sink

elasticsearch (Sink)

Elasticsearch sink implementation uses Elasticsearch indexing document for underlying data storage. The events that are published from the sink will be converted into elasticsearch index documents. The elasticsearch sink is connected to the Elastisearch server via the Elasticsearch Java High Level REST Client library. By using this sink, we can customize the json document before it's stored in the elasticsearch.

Syntax

@sink(type="elasticsearch", hostname="<STRING>", port="<INT>", scheme="<STRING>", elasticsearch.member.list="<STRING>", username="<STRING>", password="<STRING>", index.name="<STRING>", payload.index.of.index.name="<INT>", index.alias="<STRING>", index.number.of.shards="<INT>", index.number.of.replicas="<INT>", bulk.actions="<INT>", bulk.size="<LONG>", concurrent.requests="<INT>", flush.interval="<LONG>", backoff.policy.retry.no="<INT>", backoff.policy.wait.time="<LONG>", ssl.enabled="<BOOL>", trust.store.type="<STRING>", trust.store.path="<STRING>", trust.store.pass="<STRING>", backoff.policy="<STRING>", backoff.policy.retry.no="<INT>", backoff.policy.wait.time="<INT>", @map(...)))

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
hostname

The hostname of the Elasticsearch server.

localhost STRING Yes No
port

The port of the Elasticsearch server.

9200 INT Yes No
scheme

The scheme type of the Elasticsearch server connection.

http STRING Yes No
elasticsearch.member.list

The list of elasticsearch host names. in comma separated mannerhttps://hostname1:9200,https://hostname2:9200

null STRING Yes No
username

The username for the Elasticsearch server connection.

elastic STRING Yes No
password

The password for the Elasticsearch server connection.

changeme STRING Yes No
index.name

The name of the Elasticsearch index.This must be in lower case

The table name defined in the Siddhi App query. STRING Yes No
payload.index.of.index.name

The payload which is used to create the index. This can be used if the user needs to create index names dynamically. This must be in lower case. If this parameter is configured then respective elasticsearch table can be only used for insert operations because indices are created in the runtime dynamically.

-1 INT Yes No
index.alias

The alias of the Elasticsearch index.

null STRING Yes No
index.number.of.shards

The number of shards allocated for the index in the Elasticsearch server.

3 INT Yes No
index.number.of.replicas

The number of replicas for the index in the Elasticsearch server.

2 INT Yes No
bulk.actions

The number of actions to be added to flush a new bulk request. Use -1 to disable it

1 INT Yes No
bulk.size

The size of size of actions currently added to the bulk request to flush a new bulk request in MB. Use -1 to disable it

1 LONG Yes No
concurrent.requests

The number of concurrent requests allowed to be executed. Use 0 to only allow the execution of a single request

0 INT Yes No
flush.interval

The flush interval flushing any BulkRequest pending if the interval passes.

10 LONG Yes No
backoff.policy.retry.no

The number of retries until backoff (The backoff policy defines how the bulk processor should handle retries of bulk requests internally in case they have failed due to resource constraints (i.e. a thread pool was full)).

3 INT Yes No
backoff.policy.wait.time

The constant back off policy that initially waits until the next retry in seconds.

1 LONG Yes No
ssl.enabled

SSL is enabled or not.

null BOOL Yes No
trust.store.type

Trust store type.

jks STRING Yes No
trust.store.path

Trust store path.

null STRING Yes No
trust.store.pass

Trust store password.

wso2carbon STRING Yes No
backoff.policy

Provides a backoff policy(eg: constantBackoff, exponentialBackoff, disable) for bulk requests, whenever a bulk request is rejected due to resource constraints. Bulk processor will wait before the operation is retried internally.

constantBackoff STRING Yes No
backoff.policy.retry.no

The maximum number of retries. Must be a non-negative number.

3 INT Yes No
backoff.policy.wait.time

The delay defines how long to wait between retry attempts. Must not be null.

1 INT Yes No

Examples EXAMPLE 1

@sink(type='elasticsearch', hostname='172.0.0.1', port='9200',index.name='stock_index', @map(type='json', @payload("""{
   "Stock Data":{
      "Symbol":"{{symbol}}",
      "Price":{{price}},
      "Volume":{{volume}}
   }
}""")))define stream stock_stream(symbol string, price float, volume long);

This will create an index called 'stock_index' if it does not already exist in the elasticsearch server and saves the custom json document.

Store

elasticsearch (Store)

Elasticsearch store implementation uses Elasticsearch indexing document for underlying data storage. The events are converted to Elasticsearch index documents when the events are inserted into the elasticsearch store. Elasticsearch indexing documents are converted to events when the documents are read from Elasticsearch indexes. The internal store is connected to the Elasticsearch server via the Elasticsearch Java High Level REST Client library.

Syntax

@Store(type="elasticsearch", hostname="<STRING>", port="<INT>", scheme="<STRING>", elasticsearch.member.list="<STRING>", username="<STRING>", password="<STRING>", index.name="<STRING>", payload.index.of.index.name="<INT>", index.alias="<STRING>", index.number.of.shards="<INT>", index.number.of.replicas="<INT>", bulk.actions="<INT>", bulk.size="<LONG>", concurrent.requests="<INT>", flush.interval="<LONG>", backoff.policy.retry.no="<INT>", backoff.policy.wait.time="<LONG>", ssl.enabled="<BOOL>", trust.store.type="<STRING>", trust.store.path="<STRING>", trust.store.pass="<STRING>", backoff.policy="<STRING>", backoff.policy.retry.no="<INT>", backoff.policy.wait.time="<INT>")
@PrimaryKey("PRIMARY_KEY")
@Index("INDEX")

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
hostname

The hostname of the Elasticsearch server.

localhost STRING Yes No
port

The port of the Elasticsearch server.

9200 INT Yes No
scheme

The scheme type of the Elasticsearch server connection.

http STRING Yes No
elasticsearch.member.list

The list of elasticsearch host names. in comma separated mannerhttps://hostname1:9200,https://hostname2:9200

null STRING Yes No
username

The username for the Elasticsearch server connection.

elastic STRING Yes No
password

The password for the Elasticsearch server connection.

changeme STRING Yes No
index.name

The name of the Elasticsearch index.This must be in lower case

The table name defined in the Siddhi App query. STRING Yes No
payload.index.of.index.name

The payload which is used to create the index. This can be used if the user needs to create index names dynamically. This must be in lower case. If this parameter is configured then respective elasticsearch table can be only used for insert operations because indices are created in the runtime dynamically.

-1 INT Yes No
index.alias

The alias of the Elasticsearch index.

null STRING Yes No
index.number.of.shards

The number of shards allocated for the index in the Elasticsearch server.

3 INT Yes No
index.number.of.replicas

The number of replicas for the index in the Elasticsearch server.

2 INT Yes No
bulk.actions

The number of actions to be added to flush a new bulk request. Use -1 to disable it

1 INT Yes No
bulk.size

The size of size of actions currently added to the bulk request to flush a new bulk request in MB. Use -1 to disable it

1 LONG Yes No
concurrent.requests

The number of concurrent requests allowed to be executed. Use 0 to only allow the execution of a single request

0 INT Yes No
flush.interval

The flush interval flushing any BulkRequest pending if the interval passes.

10 LONG Yes No
backoff.policy.retry.no

The number of retries until backoff (The backoff policy defines how the bulk processor should handle retries of bulk requests internally in case they have failed due to resource constraints (i.e. a thread pool was full)).

3 INT Yes No
backoff.policy.wait.time

The constant back off policy that initially waits until the next retry in seconds.

1 LONG Yes No
ssl.enabled

SSL is enabled or not.

null BOOL Yes No
trust.store.type

Trust store type.

jks STRING Yes No
trust.store.path

Trust store path.

null STRING Yes No
trust.store.pass

Trust store password.

wso2carbon STRING Yes No
backoff.policy

Provides a backoff policy(eg: constantBackoff, exponentialBackoff, disable) for bulk requests, whenever a bulk request is rejected due to resource constraints. Bulk processor will wait before the operation is retried internally.

constantBackoff STRING Yes No
backoff.policy.retry.no

The maximum number of retries. Must be a non-negative number.

3 INT Yes No
backoff.policy.wait.time

The delay defines how long to wait between retry attempts. Must not be null.

1 INT Yes No

Examples EXAMPLE 1

@Store(type="elasticsearch", hostname="localhost", username="elastic", password="changeme", index.name="mystocktable", field.length="symbol:100", bulk.actions="5000", bulk.size="1", concurrent.requests="2", flush.interval="1", backoff.policy.retry.no="3", backoff.policy.wait.time="1")
@PrimaryKey("symbol")define table StockTable (symbol string, price float, volume long);

This example creates an index named 'mystocktable' in the Elasticsearch server if it does not already exist (with three attributes named 'symbol', 'price', and 'volume' of the types 'string', 'float' and 'long' respectively). The connection is made as specified by the parameters configured for the '@Store' annotation. The 'symbol' attribute is considered a unique field and an Elasticsearch index document ID is generated for it.

EXAMPLE 2

@Store(type="elasticsearch", hostname="localhost", username="elastic", password="changeme", index.name="mystocktable", field.length="symbol:100", bulk.actions="5000", bulk.size="1", concurrent.requests="2", flush.interval="1", backoff.policy.retry.no="3", backoff.policy.wait.time="1", ssl.enabled="true", trust.store.type="jks", trust.store.path="/User/wso2/wso2sp/resources/security/client-truststore.jks", trust.store.pass="wso2carbon")
@PrimaryKey("symbol")define table StockTable (symbol string, price float, volume long);

This example uses SSL to connect to Elasticsearch.

EXAMPLE 3

@Store(type="elasticsearch", elasticsearch.member.list="https://hostname1:9200,https://hostname2:9200", username="elastic", password="changeme", index.name="mystocktable", field.length="symbol:100", bulk.actions="5000", bulk.size="1", concurrent.requests="2", flush.interval="1", backoff.policy.retry.no="3", backoff.policy.wait.time="1")
@PrimaryKey("symbol")define table StockTable (symbol string, price float, volume long);

This example defined several elasticsearch members to publish data using elasticsearch.member.list parameter.