Skip to content

API Docs - v5.0.3

Tested Siddhi Core version: 5.0.2

It could also support other Siddhi Core minor versions.

Unique

ever (Window)

Window that retains the latest events based on a given unique key. When a new event arrives with the same key it replaces the one exist in the window.

Syntax

unique:ever(<INT|LONG|FLOAT|BOOL|DOUBLE|STRING> unique.key)
unique:ever(<INT|LONG|FLOAT|BOOL|DOUBLE|STRING> unique.key, <INT|LONG|FLOAT|BOOL|DOUBLE|STRING> ...)

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
unique.key The attribute used to checked for uniqueness. If multiple attributes need to be checked, they can be specified as a comma-separated list. INT
LONG
FLOAT
BOOL
DOUBLE
STRING
No Yes

Examples EXAMPLE 1

define stream LoginEvents (timeStamp long, ip string);

from LoginEvents#window.unique:ever(ip)
select count(ip) as ipCount, ip
insert all events into UniqueIps;

The above query determines the latest events that have arrived from the 'LoginEvents' stream, based on the 'ip' attribute. At a given time, all the events held in the window should have a unique value for the ip attribute. All the processed events are directed to the 'UniqueIps' output stream with 'ip' and 'ipCount' attributes.

EXAMPLE 2

define stream LoginEvents (timeStamp long, ip string , id string);

from LoginEvents#window.unique:ever(ip, id)
select count(ip) as ipCount, ip , id 
insert expired events into UniqueIps;

This query determines the latest events to be included in the window based on the ip and id attributes. When the 'LoginEvents' event stream receives a new event of which the combination of values for the ip and id attributes matches that of an existing event in the window, the existing event expires and it is replaced with the new event. The expired events which have been expired as a result of being replaced by a newer event are directed to the 'uniqueIps' output stream.

externalTimeBatch (Window)

This is a batch (tumbling) time window that is determined based on an external time, i.e., time stamps that are specified via an attribute in the events. It holds the latest unique events that arrived during the last window time period. The unique events are determined based on the value for a specified unique key parameter. When a new event arrives within the time window with a value for the unique key parameter that is the same as that of an existing event in the window, the existing event expires and it is replaced by the new event.

Syntax

unique:externalTimeBatch(<INT|LONG|FLOAT|BOOL|DOUBLE|STRING> unique.key, <LONG> time.stamp, <INT|LONG> window.time)
unique:externalTimeBatch(<INT|LONG|FLOAT|BOOL|DOUBLE|STRING> unique.key, <LONG> time.stamp, <INT|LONG> window.time, <INT> start.time)
unique:externalTimeBatch(<INT|LONG|FLOAT|BOOL|DOUBLE|STRING> unique.key, <LONG> time.stamp, <INT|LONG> window.time, <INT> start.time, <INT|LONG> time.out)
unique:externalTimeBatch(<INT|LONG|FLOAT|BOOL|DOUBLE|STRING> unique.key, <LONG> time.stamp, <INT|LONG> window.time, <INT> start.time, <INT|LONG> time.out, <INT|LONG> replace.time.stamp.with.batch.end.time)

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
unique.key The attribute that should be checked for uniqueness. INT
LONG
FLOAT
BOOL
DOUBLE
STRING
No Yes
time.stamp The time which the window determines as the current time and acts upon. The value of this parameter should be monotonically increasing. LONG No No
window.time The sliding time period for which the window should hold events. INT
LONG
No No
start.time This specifies an offset in milliseconds in order to start the window at a time different to the standard time. Timestamp of first event INT Yes No
time.out Time to wait for arrival of a new event, before flushing and returning the output for events belonging to a specific batch. The system waits till an event from the next batch arrives to flush the current batch INT
LONG
Yes No
replace.time.stamp.with.batch.end.time Replaces the 'timestamp' value with the corresponding batch end time stamp. false INT
LONG
Yes No

Examples EXAMPLE 1

define stream LoginEvents (timestamp long, ip string);

from LoginEvents#window.unique:externalTimeBatch(ip, timestamp, 1 sec, 0, 2 sec) 
select timestamp, ip, count() as total
insert into UniqueIps ;

In this query, the window holds the latest unique events that arrive from the 'LoginEvent' stream during each second. The latest events are determined based on the external time stamp. At a given time, all the events held in the window have unique values for the 'ip' and monotonically increasing values for 'timestamp' attributes. The events in the window are inserted into the 'UniqueIps' output stream. The system waits for 2 seconds for the arrival of a new event before flushing the current batch.

first (Window)

This is a window that holds only the first set of unique events according to the unique key parameter. When a new event arrives with a key that is already in the window, that event is not processed by the window.

Syntax

unique:first(<INT|LONG|FLOAT|BOOL|DOUBLE|STRING> unique.key)
unique:first(<INT|LONG|FLOAT|BOOL|DOUBLE|STRING> unique.key, <INT|LONG|FLOAT|BOOL|DOUBLE|STRING> ...)

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
unique.key The attribute that should be checked for uniqueness. If there is more than one parameter to check for uniqueness, it can be specified as an array separated by commas. INT
LONG
FLOAT
BOOL
DOUBLE
STRING
No Yes

Examples EXAMPLE 1

define stream LoginEvents (timeStamp long, ip string);

from LoginEvents#window.unique:first(ip)
insert into UniqueIps ;

This returns the first set of unique items that arrive from the 'LoginEvents' stream, and returns them to the 'UniqueIps' stream. The unique events are only those with a unique value for the 'ip' attribute.

firstLengthBatch (Window)

This is a batch (tumbling) window that holds a specific number of unique events (depending on which events arrive first). The unique events are selected based on a specific parameter that is considered as the unique key. When a new event arrives with a value for the unique key parameter that matches the same of an existing event in the window, that event is not processed by the window.

Syntax

unique:firstLengthBatch(<INT|LONG|FLOAT|BOOL|DOUBLE|STRING> unique.key, <INT> window.length)

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
unique.key The attribute that should be checked for uniqueness. INT
LONG
FLOAT
BOOL
DOUBLE
STRING
No Yes
window.length The number of events the window should tumble. INT No Yes

Examples EXAMPLE 1

define window CseEventWindow (symbol string, price float, volume int)

from CseEventStream#window.unique:firstLengthBatch(symbol, 10)
select symbol, price, volume
insert all events into OutputStream ;

The window in this configuration holds the first unique events from the 'CseEventStream' stream every second, and outputs them all into the the 'OutputStream' stream. All the events in a window during a given second should have a unique value for the 'symbol' attribute.

firstTimeBatch (Window)

A batch-time or tumbling window that holds the unique events according to the unique key parameters that have arrived within the time period of that window and gets updated for each such time window. When a new event arrives with a key which is already in the window, that event is not processed by the window.

Syntax

unique:firstTimeBatch(<INT|LONG|FLOAT|BOOL|DOUBLE|STRING> unique.key, <INT|LONG> window.time)
unique:firstTimeBatch(<INT|LONG|FLOAT|BOOL|DOUBLE|STRING> unique.key, <INT|LONG> window.time, <INT|LONG> start.time)

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
unique.key The attribute that should be checked for uniqueness. INT
LONG
FLOAT
BOOL
DOUBLE
STRING
No Yes
window.time The sliding time period for which the window should hold events. INT
LONG
No Yes
start.time This specifies an offset in milliseconds in order to start the window at a time different to the standard time. Timestamp of the first event. INT
LONG
Yes Yes

Examples EXAMPLE 1

define stream CseEventStream (symbol string, price float, volume int)

from CseEventStream#window.unique:firstTimeBatch(symbol,1 sec)
 select symbol, price, volume
insert all events into OutputStream ;

This holds the first unique events that arrive from the 'cseEventStream' input stream during each second, based on the symbol,as a batch, and returns all the events to the 'OutputStream'.

length (Window)

This is a sliding length window that holds the events of the latest window length with the unique key and gets updated for the expiry and arrival of each event. When a new event arrives with the key that is already there in the window, then the previous event expires and new event is kept within the window.

Syntax

unique:length(<INT|LONG|FLOAT|BOOL|DOUBLE|STRING> unique.key, <INT> window.length)

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
unique.key The attribute that should be checked for uniqueness. INT
LONG
FLOAT
BOOL
DOUBLE
STRING
No Yes
window.length The number of events that should be included in a sliding length window. INT No Yes

Examples EXAMPLE 1

define stream CseEventStream (symbol string, price float, volume int)

from CseEventStream#window.unique:length(symbol,10)
select symbol, price, volume
insert all events into OutputStream;

In this configuration, the window holds the latest 10 unique events. The latest events are selected based on the symbol attribute. If the 'CseEventStream' receives an event for which the value for the symbol attribute is the same as that of an existing event in the window, the existing event is replaced by the new event. All the events are returned to the 'OutputStream' event stream once an event expires or is added to the window.

lengthBatch (Window)

This is a batch (tumbling) window that holds a specified number of latest unique events. The unique events are determined based on the value for a specified unique key parameter. The window is updated for every window length, i.e., for the last set of events of the specified number in a tumbling manner. When a new event arrives within the window length having the same value for the unique key parameter as an existing event in the window, the previous event is replaced by the new event.

Syntax

unique:lengthBatch(<INT|LONG|FLOAT|BOOL|DOUBLE|STRING> unique.key, <INT> window.length)

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
unique.key The attribute that should be checked for uniqueness. INT
LONG
FLOAT
BOOL
DOUBLE
STRING
No Yes
window.length The number of events the window should tumble. INT No Yes

Examples EXAMPLE 1

define window CseEventWindow (symbol string, price float, volume int)

 from CseEventStream#window.unique:lengthBatch(symbol, 10)
select symbol, price, volume
insert expired events into OutputStream ;

In this query, the window at any give time holds the last 10 unique events from the 'CseEventStream' stream. Each of the 10 events within the window at a given time has a unique value for the symbol attribute. If a new event has the same value for the symbol attribute as an existing event within the window length, the existing event expires and it is replaced by the new event. The query returns expired individual events as well as expired batches of events to the 'OutputStream' stream.

time (Window)

This is a sliding time window that holds the latest unique events that arrived during the previous time window. The unique events are determined based on the value for a specified unique key parameter. The window is updated with the arrival and expiry of each event. When a new event that arrives within a window time period has the same value for the unique key parameter as an existing event in the window, the previous event is replaced by the new event.

Syntax

unique:time(<INT|LONG|FLOAT|BOOL|DOUBLE|STRING> unique.key, <INT|LONG> window.time)

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
unique.key The attribute that should be checked for uniqueness. INT
LONG
FLOAT
BOOL
DOUBLE
STRING
No No
window.time The sliding time period for which the window should hold events. INT
LONG
No No

Examples EXAMPLE 1

define stream CseEventStream (symbol string, price float, volume int)

from CseEventStream#window.unique:time(symbol, 1 sec)
select symbol, price, volume
insert expired events into OutputStream ;

In this query, the window holds the latest unique events that arrived within the last second from the 'CseEventStream', and returns the expired events to the 'OutputStream' stream. During any given second, each event in the window should have a unique value for the 'symbol' attribute. If a new event that arrives within the same second has the same value for the symbol attribute as an existing event in the window, the existing event expires.

timeBatch (Window)

This is a batch (tumbling) time window that is updated with the latest events based on a unique key parameter. If a new event that arrives within the time period of a windowhas a value for the key parameter which matches that of an existing event, the existing event expires and it is replaced by the latest event.

Syntax

unique:timeBatch(<INT|LONG|FLOAT|BOOL|DOUBLE|STRING> unique.key, <INT|LONG> window.time)
unique:timeBatch(<INT|LONG|FLOAT|BOOL|DOUBLE|STRING> unique.key, <INT|LONG> window.time, <INT|LONG> start.time)

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
unique.key The attribute that should be checked for uniqueness. INT
LONG
FLOAT
BOOL
DOUBLE
STRING
No Yes
window.time The tumbling time period for which the window should hold events. INT
LONG
No Yes
start.time This specifies an offset in milliseconds in order to start the window at a time different to the standard time. Timestamp of first event INT
LONG
Yes Yes

Examples EXAMPLE 1

define stream CseEventStream (symbol string, price float, volume int)

from CseEventStream#window.unique:timeBatch(symbol, 1 sec)
select symbol, price, volume
insert all events into OutputStream ;

This window holds the latest unique events that arrive from the 'CseEventStream' at a given time, and returns all the events to the 'OutputStream' stream. It is updated every second based on the latest values for the 'symbol' attribute.

timeLengthBatch (Window)

This is a batch or tumbling time length window that is updated with the latest events based on a unique key parameter. The window tumbles upon the elapse of the time window, or when a number of unique events have arrived. If a new event that arrives within the period of the window has a value for the key parameter which matches the value of an existing event, the existing event expires and it is replaced by the new event.

Syntax

unique:timeLengthBatch(<INT|LONG|FLOAT|BOOL|DOUBLE|STRING> unique.key, <INT|LONG> window.time, <INT> window.length)
unique:timeLengthBatch(<INT|LONG|FLOAT|BOOL|DOUBLE|STRING> unique.key, <INT|LONG> window.time, <INT|LONG> start.time, <INT> window.length)

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
unique.key The attribute that should be checked for uniqueness. INT
LONG
FLOAT
BOOL
DOUBLE
STRING
No Yes
window.time The sliding time period for which the window should hold the events. INT
LONG
No Yes
start.time This specifies an offset in milliseconds in order to start the window at a time different to the standard time. Timestamp of first event INT
LONG
Yes Yes
window.length The number of events the window should tumble. INT No Yes

Examples EXAMPLE 1

define stream CseEventStream (symbol string, price float, volume int)

from CseEventStream#window.unique:timeLengthBatch(symbol, 1 sec, 20)
select symbol, price, volume
insert all events into OutputStream;

This window holds the latest unique events that arrive from the 'CseEventStream' at a given time, and returns all the events to the 'OutputStream' stream. It is updated every second based on the latest values for the 'symbol' attribute.