Skip to content

API Docs - v5.1.30

Core

and (Aggregate Function)

Returns the results of AND operation for all the events.

Syntax

<BOOL> and(<BOOL> arg)

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
arg

The value that needs to be AND operation.

BOOL No Yes

Examples EXAMPLE 1

from cscStream#window.lengthBatch(10)
select and(isFraud) as isFraudTransaction
insert into alertStream;

This will returns the result for AND operation of isFraud values as a boolean value for event chunk expiry by window length batch.

avg (Aggregate Function)

Calculates the average for all the events.

Syntax

<DOUBLE> avg(<INT|LONG|DOUBLE|FLOAT> arg)

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
arg

The value that need to be averaged.

INT
LONG
DOUBLE
FLOAT
No Yes

Examples EXAMPLE 1

from fooStream#window.timeBatch
 select avg(temp) as avgTemp
 insert into barStream;

avg(temp) returns the average temp value for all the events based on their arrival and expiry.

count (Aggregate Function)

Returns the count of all the events.

Syntax

<LONG> count()
<LONG> count(<INT|LONG|DOUBLE|FLOAT|STRING|BOOL|OBJECT> arg)

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
arg

This function accepts one parameter. It can belong to any one of the available types.

INT
LONG
DOUBLE
FLOAT
STRING
BOOL
OBJECT
Yes Yes

Examples EXAMPLE 1

from fooStream#window.timeBatch(10 sec)
select count() as count
insert into barStream;

This will return the count of all the events for time batch in 10 seconds.

distinctCount (Aggregate Function)

This returns the count of distinct occurrences for a given arg.

Syntax

<LONG> distinctCount(<INT|LONG|DOUBLE|FLOAT|STRING> arg)

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
arg

The object for which the number of distinct occurences needs to be counted.

INT
LONG
DOUBLE
FLOAT
STRING
No Yes

Examples EXAMPLE 1

from fooStream
select distinctcount(pageID) as count
insert into barStream;

distinctcount(pageID) for the following output returns '3' when the available values are as follows.
 "WEB_PAGE_1"
 "WEB_PAGE_1"
 "WEB_PAGE_2"
 "WEB_PAGE_3"
 "WEB_PAGE_1"
 "WEB_PAGE_2"
 The three distinct occurences identified are 'WEB_PAGE_1', 'WEB_PAGE_2', and 'WEB_PAGE_3'.

max (Aggregate Function)

Returns the maximum value for all the events.

Syntax

<INT|LONG|DOUBLE|FLOAT> max(<INT|LONG|DOUBLE|FLOAT> arg)

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
arg

The value that needs to be compared to find the maximum value.

INT
LONG
DOUBLE
FLOAT
No Yes

Examples EXAMPLE 1

from fooStream#window.timeBatch(10 sec)
select max(temp) as maxTemp
insert into barStream;

max(temp) returns the maximum temp value recorded for all the events based on their arrival and expiry.

maxForever (Aggregate Function)

This is the attribute aggregator to store the maximum value for a given attribute throughout the lifetime of the query regardless of any windows in-front.

Syntax

<INT|LONG|DOUBLE|FLOAT> maxForever(<INT|LONG|DOUBLE|FLOAT> arg)

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
arg

The value that needs to be compared to find the maximum value.

INT
LONG
DOUBLE
FLOAT
No Yes

Examples EXAMPLE 1

from inputStream
select maxForever(temp) as max
insert into outputStream;

maxForever(temp) returns the maximum temp value recorded for all the events throughout the lifetime of the query.

min (Aggregate Function)

Returns the minimum value for all the events.

Syntax

<INT|LONG|DOUBLE|FLOAT> min(<INT|LONG|DOUBLE|FLOAT> arg)

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
arg

The value that needs to be compared to find the minimum value.

INT
LONG
DOUBLE
FLOAT
No Yes

Examples EXAMPLE 1

from inputStream
select min(temp) as minTemp
insert into outputStream;

min(temp) returns the minimum temp value recorded for all the events based on their arrival and expiry.

minForever (Aggregate Function)

This is the attribute aggregator to store the minimum value for a given attribute throughout the lifetime of the query regardless of any windows in-front.

Syntax

<INT|LONG|DOUBLE|FLOAT> minForever(<INT|LONG|DOUBLE|FLOAT> arg)

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
arg

The value that needs to be compared to find the minimum value.

INT
LONG
DOUBLE
FLOAT
No Yes

Examples EXAMPLE 1

from inputStream
select minForever(temp) as max
insert into outputStream;

minForever(temp) returns the minimum temp value recorded for all the events throughoutthe lifetime of the query.

or (Aggregate Function)

Returns the results of OR operation for all the events.

Syntax

<BOOL> or(<BOOL> arg)

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
arg

The value that needs to be OR operation.

BOOL No Yes

Examples EXAMPLE 1

from cscStream#window.lengthBatch(10)
select or(isFraud) as isFraudTransaction
insert into alertStream;

This will returns the result for OR operation of isFraud values as a boolean value for event chunk expiry by window length batch.

stdDev (Aggregate Function)

Returns the calculated standard deviation for all the events.

Syntax

<DOUBLE> stdDev(<INT|LONG|DOUBLE|FLOAT> arg)

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
arg

The value that should be used to calculate the standard deviation.

INT
LONG
DOUBLE
FLOAT
No Yes

Examples EXAMPLE 1

from inputStream
select stddev(temp) as stdTemp
insert into outputStream;

stddev(temp) returns the calculated standard deviation of temp for all the events based on their arrival and expiry.

sum (Aggregate Function)

Returns the sum for all the events.

Syntax

<LONG|DOUBLE> sum(<INT|LONG|DOUBLE|FLOAT> arg)

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
arg

The value that needs to be summed.

INT
LONG
DOUBLE
FLOAT
No Yes

Examples EXAMPLE 1

from inputStream
select sum(volume) as sumOfVolume
insert into outputStream;

This will returns the sum of volume values as a long value for each event arrival and expiry.

unionSet (Aggregate Function)

Union multiple sets.
 This attribute aggregator maintains a union of sets. The given input set is put into the union set and the union set is returned.

Syntax

<OBJECT> unionSet(<OBJECT> set)

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
set

The java.util.Set object that needs to be added into the union set.

OBJECT No Yes

Examples EXAMPLE 1

from stockStream 
select createSet(symbol) as initialSet 
insert into initStream 

from initStream#window.timeBatch(10 sec) 
select unionSet(initialSet) as distinctSymbols 
insert into distinctStockStream;

distinctStockStream will return the set object which contains the distinct set of stock symbols received during a sliding window of 10 seconds.

UUID (Function)

Generates a UUID (Universally Unique Identifier).

Syntax

<STRING> UUID()

Examples EXAMPLE 1

from TempStream
select convert(roomNo, 'string') as roomNo, temp, UUID() as messageID
insert into RoomTempStream;

This will converts a room number to string, introducing a message ID to each event asUUID() returns a34eec40-32c2-44fe-8075-7f4fde2e2dd8

from TempStream
select convert(roomNo, 'string') as roomNo, temp, UUID() as messageID
insert into RoomTempStream;

cast (Function)

Converts the first parameter according to the cast.to parameter. Incompatible arguments cause Class Cast exceptions if further processed. This function is used with map extension that returns attributes of the object type. You can use this function to cast the object to an accurate and concrete type.

Syntax

<INT|LONG|DOUBLE|FLOAT|STRING|BOOL|OBJECT> cast(<INT|LONG|DOUBLE|FLOAT|STRING|BOOL|OBJECT> to.be.caster, <STRING> cast.to)

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
to.be.caster

This specifies the attribute to be casted.

INT
LONG
DOUBLE
FLOAT
STRING
BOOL
OBJECT
No Yes
cast.to

A string constant parameter expressing the cast to type using one of the following strings values: int, long, float, double, string, bool.

STRING No Yes

Examples EXAMPLE 1

from fooStream
select symbol as name, cast(temp, 'double') as temp
insert into barStream;

This will cast the fooStream temp field value into 'double' format.

coalesce (Function)

Returns the value of the first input parameter that is not null, and all input parameters have to be on the same type.

Syntax

<INT|LONG|DOUBLE|FLOAT|STRING|BOOL|OBJECT> coalesce(<INT|LONG|DOUBLE|FLOAT|STRING|BOOL|OBJECT> arg, <INT|LONG|DOUBLE|FLOAT|STRING|BOOL|OBJECT> ...)

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
arg

This function accepts one or more parameters. They can belong to any one of the available types. All the specified parameters should be of the same type.

INT
LONG
DOUBLE
FLOAT
STRING
BOOL
OBJECT
No Yes

Examples EXAMPLE 1

from fooStream
select coalesce('123', null, '789') as value
insert into barStream;

This will returns first null value 123.

EXAMPLE 2

from fooStream
select coalesce(null, 76, 567) as value
insert into barStream;

This will returns first null value 76.

EXAMPLE 3

from fooStream
select coalesce(null, null, null) as value
insert into barStream;

This will returns null as there are no notnull values.

convert (Function)

Converts the first input parameter according to the convertedTo parameter.

Syntax

<INT|LONG|DOUBLE|FLOAT|STRING|BOOL> convert(<INT|LONG|DOUBLE|FLOAT|STRING|BOOL|OBJECT> to.be.converted, <STRING> converted.to)

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
to.be.converted

This specifies the value to be converted.

INT
LONG
DOUBLE
FLOAT
STRING
BOOL
OBJECT
No Yes
converted.to

A string constant parameter to which type the attribute need to be converted using one of the following strings values: 'int', 'long', 'float', 'double', 'string', 'bool'.

STRING No Yes

Examples EXAMPLE 1

from fooStream
select convert(temp, 'double') as temp
insert into barStream;

This will convert fooStream temp value into 'double'.

EXAMPLE 2

from fooStream
select convert(temp, 'int') as temp
insert into barStream;

This will convert fooStream temp value into 'int' (value = "convert(45.9, 'int') returns 46").

createSet (Function)

Includes the given input parameter in a java.util.HashSet and returns the set.

Syntax

<OBJECT> createSet(<INT|LONG|DOUBLE|FLOAT|STRING|BOOL> input)

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
input

The input that needs to be added into the set.

INT
LONG
DOUBLE
FLOAT
STRING
BOOL
No Yes

Examples EXAMPLE 1

from stockStream 
select createSet(symbol) as initialSet 
insert into initStream;

For every incoming stockStream event, the initStream stream will produce a set object having only one element: the symbol in the incoming stockStream.

currentTimeMillis (Function)

Returns the current timestamp of siddhi application in milliseconds.

Syntax

<LONG> currentTimeMillis()

Examples EXAMPLE 1

from fooStream
select symbol as name, currentTimeMillis() as eventTimestamp 
insert into barStream;

This will extract current siddhi application timestamp.

default (Function)

Checks if the 'attribute' parameter is null and if so returns the value of the 'default' parameter

Syntax

<INT|LONG|DOUBLE|FLOAT|STRING|BOOL|OBJECT> default(<INT|LONG|DOUBLE|FLOAT|STRING|BOOL|OBJECT> attribute, <INT|LONG|DOUBLE|FLOAT|STRING|BOOL|OBJECT> default)

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
attribute

The attribute that could be null.

INT
LONG
DOUBLE
FLOAT
STRING
BOOL
OBJECT
No Yes
default

The default value that will be used when 'attribute' parameter is null

INT
LONG
DOUBLE
FLOAT
STRING
BOOL
OBJECT
No Yes

Examples EXAMPLE 1

from TempStream
select default(temp, 0.0) as temp, roomNum
insert into StandardTempStream;

This will replace TempStream's temp attribute with default value if the temp is null.

eventTimestamp (Function)

Returns the timestamp of the processed/passed event.

Syntax

<LONG> eventTimestamp()
<LONG> eventTimestamp(<OBJECT> event)

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
event

Event reference.

Current Event OBJECT Yes Yes

Examples EXAMPLE 1

from FooStream
select symbol as name, eventTimestamp() as eventTimestamp 
insert into BarStream;

Extracts current event's timestamp.

EXAMPLE 2

from FooStream as f join FooBarTable as fb
select fb.symbol as name, eventTimestamp(f) as eventTimestamp 
insert into BarStream;

Extracts FooStream event's timestamp.

ifThenElse (Function)

Evaluates the 'condition' parameter and returns value of the 'if.expression' parameter if the condition is true, or returns value of the 'else.expression' parameter if the condition is false. Here both 'if.expression' and 'else.expression' should be of the same type.

Syntax

<INT|LONG|DOUBLE|FLOAT|STRING|BOOL|OBJECT> ifThenElse(<BOOL> condition, <INT|LONG|DOUBLE|FLOAT|STRING|BOOL|OBJECT> if.expression, <INT|LONG|DOUBLE|FLOAT|STRING|BOOL|OBJECT> else.expression)

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
condition

This specifies the if then else condition value.

BOOL No Yes
if.expression

This specifies the value to be returned if the value of the condition parameter is true.

INT
LONG
DOUBLE
FLOAT
STRING
BOOL
OBJECT
No Yes
else.expression

This specifies the value to be returned if the value of the condition parameter is false.

INT
LONG
DOUBLE
FLOAT
STRING
BOOL
OBJECT
No Yes

Examples EXAMPLE 1

@info(name = 'query1')
from sensorEventStream
select sensorValue, ifThenElse(sensorValue>35,'High','Low') as status
insert into outputStream;

This will returns High if sensorValue = 50.

EXAMPLE 2

@info(name = 'query1')
from sensorEventStream
select sensorValue, ifThenElse(voltage < 5, 0, 1) as status
insert into outputStream;

This will returns 1 if voltage= 12.

EXAMPLE 3

@info(name = 'query1')
from userEventStream
select userName, ifThenElse(password == 'admin', true, false) as passwordState
insert into outputStream;

This will returns passwordState as true if password = admin.

instanceOfBoolean (Function)

Checks whether the parameter is an instance of Boolean or not.

Syntax

<BOOL> instanceOfBoolean(<INT|LONG|DOUBLE|FLOAT|STRING|BOOL|OBJECT> arg)

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
arg

The parameter to be checked.

INT
LONG
DOUBLE
FLOAT
STRING
BOOL
OBJECT
No Yes

Examples EXAMPLE 1

from fooStream
select instanceOfBoolean(switchState) as state
insert into barStream;

This will return true if the value of switchState is true.

EXAMPLE 2

from fooStream
select instanceOfBoolean(value) as state
insert into barStream;

if the value = 32 then this will returns false as the value is not an instance of the boolean.

instanceOfDouble (Function)

Checks whether the parameter is an instance of Double or not.

Syntax

<BOOL> instanceOfDouble(<INT|LONG|DOUBLE|FLOAT|STRING|BOOL|OBJECT> arg)

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
arg

The parameter to be checked.

INT
LONG
DOUBLE
FLOAT
STRING
BOOL
OBJECT
No Yes

Examples EXAMPLE 1

from fooStream
select instanceOfDouble(value) as state
insert into barStream;

This will return true if the value field format is double ex : 56.45.

EXAMPLE 2

from fooStream
select instanceOfDouble(switchState) as state
insert into barStream;

if the switchState = true then this will returns false as the value is not an instance of the double.

instanceOfFloat (Function)

Checks whether the parameter is an instance of Float or not.

Syntax

<BOOL> instanceOfFloat(<INT|LONG|DOUBLE|FLOAT|STRING|BOOL|OBJECT> arg)

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
arg

The parameter to be checked.

INT
LONG
DOUBLE
FLOAT
STRING
BOOL
OBJECT
No Yes

Examples EXAMPLE 1

from fooStream
select instanceOfFloat(value) as state
insert into barStream;

This will return true if the value field format is float ex : 56.45f.

EXAMPLE 2

from fooStream
select instanceOfFloat(switchState) as state
insert into barStream;

if the switchState = true then this will returns false as the value is an instance of the boolean not a float.

instanceOfInteger (Function)

Checks whether the parameter is an instance of Integer or not.

Syntax

<BOOL> instanceOfInteger(<INT|LONG|DOUBLE|FLOAT|STRING|BOOL|OBJECT> arg)

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
arg

The parameter to be checked.

INT
LONG
DOUBLE
FLOAT
STRING
BOOL
OBJECT
No Yes

Examples EXAMPLE 1

from fooStream
select instanceOfInteger(value) as state
insert into barStream;

This will return true if the value field format is integer.

EXAMPLE 2

from fooStream
select instanceOfInteger(switchState) as state
insert into barStream;

if the switchState = true then this will returns false as the value is an instance of the boolean not a long.

instanceOfLong (Function)

Checks whether the parameter is an instance of Long or not.

Syntax

<BOOL> instanceOfLong(<INT|LONG|DOUBLE|FLOAT|STRING|BOOL|OBJECT> arg)

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
arg

The parameter to be checked.

INT
LONG
DOUBLE
FLOAT
STRING
BOOL
OBJECT
No Yes

Examples EXAMPLE 1

from fooStream
select instanceOfLong(value) as state
insert into barStream;

This will return true if the value field format is long ex : 56456l.

EXAMPLE 2

from fooStream
select instanceOfLong(switchState) as state
insert into barStream;

if the switchState = true then this will returns false as the value is an instance of the boolean not a long.

instanceOfString (Function)

Checks whether the parameter is an instance of String or not.

Syntax

<BOOL> instanceOfString(<INT|LONG|DOUBLE|FLOAT|STRING|BOOL|OBJECT> arg)

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
arg

The parameter to be checked.

INT
LONG
DOUBLE
FLOAT
STRING
BOOL
OBJECT
No Yes

Examples EXAMPLE 1

from fooStream
select instanceOfString(value) as state
insert into barStream;

This will return true if the value field format is string ex : 'test'.

EXAMPLE 2

from fooStream
select instanceOfString(switchState) as state
insert into barStream;

if the switchState = true then this will returns false as the value is an instance of the boolean not a string.

maximum (Function)

Returns the maximum value of the input parameters.

Syntax

<INT|LONG|DOUBLE|FLOAT> maximum(<INT|LONG|DOUBLE|FLOAT> arg, <INT|LONG|DOUBLE|FLOAT> ...)

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
arg

This function accepts one or more parameters. They can belong to any one of the available types. All the specified parameters should be of the same type.

INT
LONG
DOUBLE
FLOAT
No Yes

Examples EXAMPLE 1

@info(name = 'query1') from inputStream
select maximum(price1, price2, price3) as max
insert into outputStream;

This will returns the maximum value of the input parameters price1, price2, price3.

minimum (Function)

Returns the minimum value of the input parameters.

Syntax

<INT|LONG|DOUBLE|FLOAT> minimum(<INT|LONG|DOUBLE|FLOAT> arg, <INT|LONG|DOUBLE|FLOAT> ...)

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
arg

This function accepts one or more parameters. They can belong to any one of the available types. All the specified parameters should be of the same type.

INT
LONG
DOUBLE
FLOAT
No Yes

Examples EXAMPLE 1

@info(name = 'query1') from inputStream
select maximum(price1, price2, price3) as max
insert into outputStream;

This will returns the minimum value of the input parameters price1, price2, price3.

sizeOfSet (Function)

Returns the size of an object of type java.util.Set.

Syntax

<INT> sizeOfSet(<OBJECT> set)

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
set

The set object. This parameter should be of type java.util.Set. A set object may be created by the 'set' attribute aggregator in Siddhi.

OBJECT No Yes

Examples EXAMPLE 1

from stockStream 
select initSet(symbol) as initialSet 
insert into initStream; 

;from initStream#window.timeBatch(10 sec) 
select union(initialSet) as distinctSymbols 
insert into distinctStockStream; 

from distinctStockStream 
select sizeOfSet(distinctSymbols) sizeOfSymbolSet 
insert into sizeStream;

The sizeStream stream will output the number of distinct stock symbols received during a sliding window of 10 seconds.

pol2Cart (Stream Function)

The pol2Cart function calculating the cartesian coordinates x & y for the given theta, rho coordinates and adding them as new attributes to the existing events.

Syntax

pol2Cart(<DOUBLE> theta, <DOUBLE> rho)
pol2Cart(<DOUBLE> theta, <DOUBLE> rho, <DOUBLE> z)

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
theta

The theta value of the coordinates.

DOUBLE No Yes
rho

The rho value of the coordinates.

DOUBLE No Yes
z

z value of the cartesian coordinates.

If z value is not given, drop the third parameter of the output. DOUBLE Yes Yes

Examples EXAMPLE 1

from PolarStream#pol2Cart(theta, rho)
select x, y 
insert into outputStream ;

This will return cartesian coordinates (4.99953024681082, 0.06853693328228748) for theta: 0.7854 and rho: 5.

EXAMPLE 2

from PolarStream#pol2Cart(theta, rho, 3.4)
select x, y, z 
insert into outputStream ;

This will return cartesian coordinates (4.99953024681082, 0.06853693328228748, 3.4)for theta: 0.7854 and rho: 5 and z: 3.4.

log (Stream Processor)

Logs the message on the given priority with or without the processed event.

Syntax

log()
log(<STRING> log.message)
log(<BOOL> is.event.logged)
log(<STRING> log.message, <BOOL> is.event.logged)
log(<STRING> priority, <STRING> log.message)
log(<STRING> priority, <STRING> log.message, <BOOL> is.event.logged)

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
priority

The priority/type of this log message (INFO, DEBUG, WARN, FATAL, ERROR, OFF, TRACE).

INFO STRING Yes No
log.message

This message will be logged.

: STRING Yes Yes
is.event.logged

To log the processed event.

true BOOL Yes No

Examples EXAMPLE 1

from FooStream#log()
select *
insert into BarStream;

Logs events with SiddhiApp name message prefix on default log level INFO.

EXAMPLE 2

from FooStream#log("Sample Event :")
select *
insert into BarStream;

Logs events with the message prefix "Sample Event :" on default log level INFO.

EXAMPLE 3

from FooStream#log("DEBUG", "Sample Event :", true)
select *
insert into BarStream;

Logs events with the message prefix "Sample Event :" on log level DEBUG.

EXAMPLE 4

from FooStream#log("Event Arrived", false)
select *
insert into BarStream;

For each event logs a message "Event Arrived" on default log level INFO.

EXAMPLE 5

from FooStream#log("Sample Event :", true)
select *
insert into BarStream;

Logs events with the message prefix "Sample Event :" on default log level INFO.

EXAMPLE 6

from FooStream#log(true)
select *
insert into BarStream;

Logs events with on default log level INFO.

batch (Window)

A window that holds an incoming events batch. When a new set of events arrives, the previously arrived old events will be expired. Batch window can be used to aggregate events that comes in batches. If it has the parameter length specified, then batch window process the batch as several chunks.

Syntax

batch()
batch(<INT> window.length)

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
window.length

The length of a chunk

If length value was not given it assign 0 as length and process the whole batch as once INT Yes No

Examples EXAMPLE 1

define stream consumerItemStream (itemId string, price float)

from consumerItemStream#window.batch()
select price, str:groupConcat(itemId) as itemIds
group by price
insert into outputStream;

This will output comma separated items IDs that have the same price for each incoming batch of events.

cron (Window)

This window outputs the arriving events as and when they arrive, and resets (expires) the window periodically based on the given cron expression.

Syntax

cron(<STRING> cron.expression)

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
cron.expression

The cron expression that resets the window.

STRING No No

Examples EXAMPLE 1

define stream InputEventStream (symbol string, price float, volume int);

@info(name = 'query1')
from InputEventStream#cron('*/5 * * * * ?')
select symbol, sum(price) as totalPrice 
insert into OutputStream;

This let the totalPrice to gradually increase and resets to zero as a batch every 5 seconds.

EXAMPLE 2

define stream StockEventStream (symbol string, price float, volume int)
define window StockEventWindow (symbol string, price float, volume int) cron('*/5 * * * * ?');

@info(name = 'query0')
from StockEventStream
insert into StockEventWindow;

@info(name = 'query1')
from StockEventWindow 
select symbol, sum(price) as totalPrice
insert into OutputStream ;

The defined window will let the totalPrice to gradually increase and resets to zero as a batch every 5 seconds.

delay (Window)

A delay window holds events for a specific time period that is regarded as a delay period before processing them.

Syntax

delay(<INT|LONG|TIME> window.delay)

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
window.delay

The time period (specified in sec, min, ms) for which the window should delay the events.

INT
LONG
TIME
No No

Examples EXAMPLE 1

define window delayWindow(symbol string, volume int) delay(1 hour);
define stream PurchaseStream(symbol string, volume int);
define stream DeliveryStream(symbol string);
define stream OutputStream(symbol string);

@info(name='query1') 
from PurchaseStream
select symbol, volume
insert into delayWindow;

@info(name='query2') 
from delayWindow join DeliveryStream
on delayWindow.symbol == DeliveryStream.symbol
select delayWindow.symbol
insert into OutputStream;

In this example, purchase events that arrive in the 'PurchaseStream' stream are directed to a delay window. At any given time, this delay window holds purchase events that have arrived within the last hour. These purchase events in the window are matched by the 'symbol' attribute, with delivery events that arrive in the 'DeliveryStream' stream. This monitors whether the delivery of products is done with a minimum delay of one hour after the purchase.

expression (Window)

A sliding window that dynamically shrink and grow based on the expression, it holds events that satisfies the given expression, when they aren't, they are evaluated from the first (oldest) to the last (latest/current) and expired from the oldest until the expression is satisfied.
Note: All the events in window are reevaluated only when the given expression is changed.

Syntax

expression(<STRING> expression)

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
expression

The expression to retain events.

STRING No Yes

Examples EXAMPLE 1

@info(name = 'query1')
from StockEventWindow#window.expression('count()<=20')
select symbol, sum(price) as price
insert into OutputStream ;

This will retain last 20 events in a sliding manner.

EXAMPLE 2

@info(name = 'query1')
from StockEventWindow#window.expression(
       'sum(price) < 100 and eventTimestamp(last) - eventTimestamp(first) < 3000')
select symbol, sum(price) as price
insert into OutputStream ;

This will retain the latest events having their sum(price) < 100, and the last and first events are within 3 second difference.

expressionBatch (Window)

A batch window that dynamically shrink and grow based on the expression, it holds events until the expression is satisfied, and expires all when the expression is not satisfied.When a string is passed as the expression it is evaluated from the first (oldest) to the last (latest/current).
Note: All the events in window are reevaluated only when the given expression is changed.

Syntax

expressionBatch(<STRING|BOOL> expression)
expressionBatch(<STRING|BOOL> expression, <BOOL> include.triggering.event)
expressionBatch(<STRING|BOOL> expression, <BOOL> include.triggering.event, <BOOL> stream.current.event)

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
expression

The expression to retain events.

STRING
BOOL
No Yes
include.triggering.event

Include the event triggered the expiry in to the current event batch.

false BOOL Yes Yes
stream.current.event

Let the window stream the current events out as and when they arrive to the window while expiring them in batches.

false BOOL Yes No

Examples EXAMPLE 1

@info(name = 'query1')
from StockEventWindow#window.expressionBatch('count()<=20')
select symbol, sum(price) as price
insert into OutputStream ;

Retain and output 20 events at a time as batch.

EXAMPLE 2

@info(name = 'query1')
from StockEventWindow#window.expressionBatch(
       'sum(price) < 100 and eventTimestamp(last) - eventTimestamp(first) < 3000')
select symbol, sum(price) as price
insert into OutputStream ;

Retain and output events having their sum(price) < 100, and the last and first events are within 3 second difference as a batch.

EXAMPLE 3

@info(name = 'query1')
from StockEventWindow#window.expressionBatch(
       'last.symbol==first.symbol')
select symbol, sum(price) as price
insert into OutputStream ;

Output events as a batch when a new symbol type arrives.

EXAMPLE 4

@info(name = 'query1')
from StockEventWindow#window.expressionBatch(
       'flush', true)
select symbol, sum(price) as price
insert into OutputStream ;

Output events as a batch when a flush attribute becomes true, the output batch will also contain the triggering event.

EXAMPLE 5

@info(name = 'query1')
from StockEventWindow#window.expressionBatch(
       'flush', false, true)
select symbol, sum(price) as price
insert into OutputStream ;

Arriving events are emitted as soon as they are arrived, and the retained events are expired when flush attribute becomes true, and the output batch will not contain the triggering event.

externalTime (Window)

A sliding time window based on external time. It holds events that arrived during the last windowTime period from the external timestamp, and gets updated on every monotonically increasing timestamp.

Syntax

externalTime(<LONG> timestamp, <INT|LONG|TIME> window.time)

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
timestamp

The time which the window determines as current time and will act upon. The value of this parameter should be monotonically increasing.

LONG No Yes
window.time

The sliding time period for which the window should hold events.

INT
LONG
TIME
No No

Examples EXAMPLE 1

define window cseEventWindow (symbol string, price float, volume int) externalTime(eventTime, 20 sec) output expired events;

@info(name = 'query0')
from cseEventStream
insert into cseEventWindow;

@info(name = 'query1')
from cseEventWindow
select symbol, sum(price) as price
insert expired events into outputStream ;

processing events arrived within the last 20 seconds from the eventTime and output expired events.

externalTimeBatch (Window)

A batch (tumbling) time window based on external time, that holds events arrived during windowTime periods, and gets updated for every windowTime.

Syntax

externalTimeBatch(<LONG> timestamp, <INT|LONG|TIME> window.time)
externalTimeBatch(<LONG> timestamp, <INT|LONG|TIME> window.time, <INT|LONG|TIME> start.time)
externalTimeBatch(<LONG> timestamp, <INT|LONG|TIME> window.time, <INT|LONG|TIME> start.time, <INT|LONG|TIME> timeout)
externalTimeBatch(<LONG> timestamp, <INT|LONG|TIME> window.time, <INT|LONG|TIME> start.time, <INT|LONG|TIME> timeout, <BOOL> replace.with.batchtime)

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
timestamp

The time which the window determines as current time and will act upon. The value of this parameter should be monotonically increasing.

LONG No Yes
window.time

The batch time period for which the window should hold events.

INT
LONG
TIME
No No
start.time

User defined start time. This could either be a constant (of type int, long or time) or an attribute of the corresponding stream (of type long). If an attribute is provided, initial value of attribute would be considered as startTime.

Timestamp of first event INT
LONG
TIME
Yes Yes
timeout

Time to wait for arrival of new event, before flushing and giving output for events belonging to a specific batch.

System waits till an event from next batch arrives to flush current batch INT
LONG
TIME
Yes No
replace.with.batchtime

This indicates to replace the expired event timeStamp as the batch end timeStamp

System waits till an event from next batch arrives to flush current batch BOOL Yes No

Examples EXAMPLE 1

define window cseEventWindow (symbol string, price float, volume int) externalTimeBatch(eventTime, 1 sec) output expired events;
@info(name = 'query0')
from cseEventStream
insert into cseEventWindow;
@info(name = 'query1')
from cseEventWindow
select symbol, sum(price) as price
insert expired events into outputStream ;

This will processing events that arrive every 1 seconds from the eventTime.

EXAMPLE 2

define window cseEventWindow (symbol string, price float, volume int) externalTimeBatch(eventTime, 20 sec, 0) output expired events;

This will processing events that arrive every 1 seconds from the eventTime. Starts on 0th millisecond of an hour.

EXAMPLE 3

define window cseEventWindow (symbol string, price float, volume int) externalTimeBatch(eventTime, 2 sec, eventTimestamp, 100) output expired events;

This will processing events that arrive every 2 seconds from the eventTim. Considers the first event's eventTimestamp value as startTime. Waits 100 milliseconds for the arrival of a new event before flushing current batch.

frequent (Window)

Deprecated

This window returns the latest events with the most frequently occurred value for a given attribute(s). Frequency calculation for this window processor is based on Misra-Gries counting algorithm.

Syntax

frequent(<INT> event.count)
frequent(<INT> event.count, <STRING> attribute)

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
event.count

The number of most frequent events to be emitted to the stream.

INT No No
attribute

The attributes to group the events. If no attributes are given, the concatenation of all the attributes of the event is considered.

The concatenation of all the attributes of the event is considered. STRING Yes Yes

Examples EXAMPLE 1

@info(name = 'query1')
from purchase[price >= 30]#window.frequent(2)
select cardNo, price
insert all events into PotentialFraud;

This will returns the 2 most frequent events.

EXAMPLE 2

@info(name = 'query1')
from purchase[price >= 30]#window.frequent(2, cardNo)
select cardNo, price
insert all events into PotentialFraud;

This will returns the 2 latest events with the most frequently appeared card numbers.

length (Window)

A sliding length window that holds the last 'window.length' events at a given time, and gets updated for each arrival and expiry.

Syntax

length(<INT> window.length)

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
window.length

The number of events that should be included in a sliding length window.

INT No No

Examples EXAMPLE 1

define window StockEventWindow (symbol string, price float, volume int) length(10) output all events;

@info(name = 'query0')
from StockEventStream
insert into StockEventWindow;
@info(name = 'query1')

from StockEventWindow
select symbol, sum(price) as price
insert all events into outputStream ;

This will process last 10 events in a sliding manner.

lengthBatch (Window)

A batch (tumbling) length window that holds and process a number of events as specified in the window.length.

Syntax

lengthBatch(<INT> window.length)
lengthBatch(<INT> window.length, <BOOL> stream.current.event)

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
window.length

The number of events the window should tumble.

INT No No
stream.current.event

Let the window stream the current events out as and when they arrive to the window while expiring them in batches.

false BOOL Yes No

Examples EXAMPLE 1

define stream InputEventStream (symbol string, price float, volume int);

@info(name = 'query1')
from InputEventStream#lengthBatch(10)
select symbol, sum(price) as price 
insert into OutputStream;

This collect and process 10 events as a batch and output them.

EXAMPLE 2

define stream InputEventStream (symbol string, price float, volume int);

@info(name = 'query1')
from InputEventStream#lengthBatch(10, true)
select symbol, sum(price) as sumPrice 
insert into OutputStream;

This window sends the arriving events directly to the output letting the sumPrice to increase gradually, after every 10 events it clears the window as a batch and resets the sumPrice to zero.

EXAMPLE 3

define stream InputEventStream (symbol string, price float, volume int);
define window StockEventWindow (symbol string, price float, volume int) lengthBatch(10) output all events;

@info(name = 'query0')
from InputEventStream
insert into StockEventWindow;

@info(name = 'query1')
from StockEventWindow
select symbol, sum(price) as price
insert all events into OutputStream ;

This uses an defined window to process 10 events as a batch and output all events.

lossyFrequent (Window)

Deprecated

This window identifies and returns all the events of which the current frequency exceeds the value specified for the supportThreshold parameter.

Syntax

lossyFrequent(<DOUBLE> support.threshold)
lossyFrequent(<DOUBLE> support.threshold, <DOUBLE> error.bound)
lossyFrequent(<DOUBLE> support.threshold, <DOUBLE> error.bound, <STRING> attribute)

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
support.threshold

The support threshold value.

DOUBLE No No
error.bound

The error bound value.

support.threshold/10 DOUBLE Yes No
attribute

The attributes to group the events. If no attributes are given, the concatenation of all the attributes of the event is considered.

The concatenation of all the attributes of the event is considered. STRING Yes Yes

Examples EXAMPLE 1

define stream purchase (cardNo string, price float);
define window purchaseWindow (cardNo string, price float) lossyFrequent(0.1, 0.01);
@info(name = 'query0')
from purchase[price >= 30]
insert into purchaseWindow;
@info(name = 'query1')
from purchaseWindow
select cardNo, price
insert all events into PotentialFraud;

lossyFrequent(0.1, 0.01) returns all the events of which the current frequency exceeds 0.1, with an error bound of 0.01.

EXAMPLE 2

define stream purchase (cardNo string, price float);
define window purchaseWindow (cardNo string, price float) lossyFrequent(0.3, 0.05, cardNo);
@info(name = 'query0')
from purchase[price >= 30]
insert into purchaseWindow;
@info(name = 'query1')
from purchaseWindow
select cardNo, price
insert all events into PotentialFraud;

lossyFrequent(0.3, 0.05, cardNo) returns all the events of which the cardNo attributes frequency exceeds 0.3, with an error bound of 0.05.

session (Window)

Holds events that belong to a session. Events belong to a specific session are identified by a session key, and a session gap is determines the time period after which the session is considered to be expired. To have meaningful aggregation on session windows, the events need to be aggregated based on session key via a group by clause.

Syntax

session(<INT|LONG|TIME> session.gap)
session(<INT|LONG|TIME> session.gap, <STRING> session.key)
session(<INT|LONG|TIME> session.gap, <STRING> session.key, <INT|LONG|TIME> allowed.latency)

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
session.gap

The time period after which the session is considered to be expired.

INT
LONG
TIME
No No
session.key

The session identification attribute. Used to group events belonging to a specific session.

default-key STRING Yes Yes
allowed.latency

The time period for which the session window is valid after the expiration of the session, to accept late event arrivals. This time period should be less than the session.gap parameter.

0 INT
LONG
TIME
Yes No

Examples EXAMPLE 1

define stream PurchaseEventStream (user string, item_number int, price float, quantity int);

@info(name='query1) 
from PurchaseEventStream#window.session(5 sec, user) 
select user, sum(quantity) as totalQuantity, sum(price) as totalPrice 
group by user 
insert into OutputStream;

From the events arriving at the PurchaseEventStream, a session window with 5 seconds session gap is processed based on 'user' attribute as the session group identification key. All events falling into the same session are aggregated based on user attribute, and outputted to the OutputStream.

EXAMPLE 2

define stream PurchaseEventStream (user string, item_number int, price float, quantity int);

@info(name='query2) 
from PurchaseEventStream#window.session(5 sec, user, 2 sec) 
select user, sum(quantity) as totalQuantity, sum(price) as totalPrice 
group by user 
insert into OutputStream;

From the events arriving at the PurchaseEventStream, a session window with 5 seconds session gap is processed based on 'user' attribute as the session group identification key. This session window is kept active for 2 seconds after the session expiry to capture late (out of order) event arrivals. If the event timestamp falls in to the last session the session is reactivated. Then all events falling into the same session are aggregated based on user attribute, and outputted to the OutputStream.

sort (Window)

This window holds a batch of events that equal the number specified as the windowLength and sorts them in the given order.

Syntax

sort(<INT> window.length, <STRING|DOUBLE|INT|LONG|FLOAT|LONG> attribute)
sort(<INT> window.length, <STRING|DOUBLE|INT|LONG|FLOAT|LONG> attribute, <STRING> order, <STRING> ...)
sort(<INT> window.length, <STRING|DOUBLE|INT|LONG|FLOAT|LONG> attribute, <STRING> order, <STRING|DOUBLE|INT|LONG|FLOAT|LONG> attribute, <STRING|DOUBLE|INT|LONG|FLOAT|LONG> ...)

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
window.length

The size of the window length.

INT No No
attribute

The attribute that should be checked for the order.

The concatenation of all the attributes of the event is considered. STRING
DOUBLE
INT
LONG
FLOAT
LONG
No Yes
order

The order define as "asc" or "desc".

asc STRING Yes No

Examples EXAMPLE 1

define stream cseEventStream (symbol string, price float, volume long);
define window cseEventWindow (symbol string, price float, volume long) sort(2,volume, 'asc');
@info(name = 'query0')
from cseEventStream
insert into cseEventWindow;
@info(name = 'query1')
from cseEventWindow
select volume
insert all events into outputStream ;

sort(5, price, 'asc') keeps the events sorted by price in the ascending order. Therefore, at any given time, the window contains the 5 lowest prices.

time (Window)

A sliding time window that holds events that arrived during the last windowTime period at a given time, and gets updated for each event arrival and expiry.

Syntax

time(<INT|LONG|TIME> window.time)

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
window.time

The sliding time period for which the window should hold events.

INT
LONG
TIME
No No

Examples EXAMPLE 1

define window cseEventWindow (symbol string, price float, volume int) time(20) output all events;
@info(name = 'query0')
from cseEventStream
insert into cseEventWindow;
@info(name = 'query1')
from cseEventWindow
select symbol, sum(price) as price
insert all events into outputStream ;

This will processing events that arrived within the last 20 milliseconds.

timeBatch (Window)

A batch (tumbling) time window that holds and process events that arrive during 'window.time' period as a batch.

Syntax

timeBatch(<INT|LONG|TIME> window.time)
timeBatch(<INT|LONG|TIME> window.time, <INT|LONG> start.time)
timeBatch(<INT|LONG|TIME> window.time, <BOOL> stream.current.event)
timeBatch(<INT|LONG|TIME> window.time, <INT|LONG> start.time, <BOOL> stream.current.event)

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
window.time

The batch time period in which the window process the events.

INT
LONG
TIME
No No
start.time

This specifies an offset in milliseconds in order to start the window at a time different to the standard time.

Timestamp of first event INT
LONG
Yes No
stream.current.event

Let the window stream the current events out as and when they arrive to the window while expiring them in batches.

false BOOL Yes No

Examples EXAMPLE 1

define stream InputEventStream (symbol string, price float, volume int);

@info(name = 'query1')
from InputEventStream#timeBatch(20 sec)
select symbol, sum(price) as price 
insert into OutputStream;

This collect and process incoming events as a batch every 20 seconds and output them.

EXAMPLE 2

define stream InputEventStream (symbol string, price float, volume int);

@info(name = 'query1')
from InputEventStream#timeBatch(20 sec, true)
select symbol, sum(price) as sumPrice 
insert into OutputStream;

This window sends the arriving events directly to the output letting the sumPrice to increase gradually and on every 20 second interval it clears the window as a batch resetting the sumPrice to zero.

EXAMPLE 3

define stream InputEventStream (symbol string, price float, volume int);
define window StockEventWindow (symbol string, price float, volume int) timeBatch(20 sec) output all events;

@info(name = 'query0')
from InputEventStream
insert into StockEventWindow;

@info(name = 'query1')
from StockEventWindow
select symbol, sum(price) as price
insert all events into OutputStream ;

This uses an defined window to process events arrived every 20 seconds as a batch and output all events.

timeLength (Window)

A sliding time window that, at a given time holds the last window.length events that arrived during last window.time period, and gets updated for every event arrival and expiry.

Syntax

timeLength(<INT|LONG|TIME> window.time, <INT> window.length)

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
window.time

The sliding time period for which the window should hold events.

INT
LONG
TIME
No No
window.length

The number of events that should be be included in a sliding length window..

INT No No

Examples EXAMPLE 1

define stream cseEventStream (symbol string, price float, volume int);
define window cseEventWindow (symbol string, price float, volume int) timeLength(2 sec, 10);
@info(name = 'query0')
from cseEventStream
insert into cseEventWindow;
@info(name = 'query1')
from cseEventWindow select symbol, price, volume
insert all events into outputStream;

window.timeLength(2 sec, 10) holds the last 10 events that arrived during last 2 seconds and gets updated for every event arrival and expiry.

Sink

inMemory (Sink)

In-memory sink publishes events to In-memory sources that are subscribe to the same topic to which the sink publishes. This provides a way to connect multiple Siddhi Apps deployed under the same Siddhi Manager (JVM). Here both the publisher and subscriber should have the same event schema (stream definition) for successful data transfer.

Syntax

@sink(type="inMemory", topic="<STRING>", @map(...)))

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
topic

Event are delivered to allthe subscribers subscribed on this topic.

STRING No No

Examples EXAMPLE 1

@sink(type='inMemory', topic='Stocks', @map(type='passThrough'))
define stream StocksStream (symbol string, price float, volume long);

Here the StocksStream uses inMemory sink to emit the Siddhi events to all the inMemory sources deployed in the same JVM and subscribed to the topic Stocks.

log (Sink)

This is a sink that can be used as a logger. This will log the output events in the output stream with user specified priority and a prefix

Syntax

@sink(type="log", priority="<STRING>", prefix="<STRING>", @map(...)))

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
priority

This will set the logger priority i.e log level. Accepted values are INFO, DEBUG, WARN, FATAL, ERROR, OFF, TRACE

INFO STRING Yes No
prefix

This will be the prefix to the output message. If the output stream has event [2,4] and the prefix is given as "Hello" then the log will show "Hello : [2,4]"

default prefix will be : STRING Yes No

Examples EXAMPLE 1

@sink(type='log', prefix='My Log', priority='DEBUG') 
define stream BarStream (symbol string, price float, volume long)

In this example BarStream uses log sink and the prefix is given as My Log. Also the priority is set to DEBUG.

EXAMPLE 2

@sink(type='log', priority='DEBUG') 
define stream BarStream (symbol string, price float, volume long)

In this example BarStream uses log sink and the priority is set to DEBUG. User has not specified prefix so the default prefix will be in the form <Siddhi App Name> : <Stream Name>

EXAMPLE 3

@sink(type='log', prefix='My Log') 
define stream BarStream (symbol string, price float, volume long)

In this example BarStream uses log sink and the prefix is given as My Log. User has not given a priority so it will be set to default INFO.

EXAMPLE 4

@sink(type='log') 
define stream BarStream (symbol string, price float, volume long)

In this example BarStream uses log sink. The user has not given prefix or priority so they will be set to their default values.

Sinkmapper

passThrough (Sink Mapper)

Pass-through mapper passed events (Event[]) through without any mapping or modifications.

Syntax

@sink(..., @map(type="passThrough")

Examples EXAMPLE 1

@sink(type='inMemory', @map(type='passThrough'))
define stream BarStream (symbol string, price float, volume long);

In the following example BarStream uses passThrough outputmapper which emit Siddhi event directly without any transformation into sink.

Source

inMemory (Source)

In-memory source subscribes to a topic to consume events which are published on the same topic by In-memory sinks. This provides a way to connect multiple Siddhi Apps deployed under the same Siddhi Manager (JVM). Here both the publisher and subscriber should have the same event schema (stream definition) for successful data transfer.

Syntax

@source(type="inMemory", topic="<STRING>", @map(...)))

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
topic

Subscribes to the events sent on the given topic.

STRING No No

Examples EXAMPLE 1

@source(type='inMemory', topic='Stocks', @map(type='passThrough'))
define stream StocksStream (symbol string, price float, volume long);

Here the StocksStream uses inMemory source to consume events published on the topic Stocks by the inMemory sinks deployed in the same JVM.

Sourcemapper

passThrough (Source Mapper)

Pass-through mapper passed events (Event[]) through without any mapping or modifications.

Syntax

@source(..., @map(type="passThrough")

Examples EXAMPLE 1

@source(type='tcp', @map(type='passThrough'))
define stream BarStream (symbol string, price float, volume long);

In this example BarStream uses passThrough inputmapper which passes the received Siddhi event directly without any transformation into source.