API Docs - v4.0.6
Kslack
reorder (Stream Processor)
This stream processor extension performs reordering of an out-of-order event stream.
It implements the K-Slack based out-of-order handling algorithm (originally described in
https://www2.informatik.uni-erlangen.de/publication/download/IPDPS2013.pdf)
Syntax
kslack:reorder(<LONG> timestamp, <LONG> timer.timeout, <LONG> max.k, <BOOL> discard.flag)
QUERY PARAMETERS
Name | Description | Default Value | Possible Data Types | Optional | Dynamic |
---|---|---|---|---|---|
timestamp | Attribute used for used for ordering the events | LONG | No | No | |
timer.timeout | Corresponds to a fixed time-out value in milliseconds, which is set at the beginning of the process. Once the time-out value expires, the extension drains all the events that are buffered within the reorder extension to outside. The time out has been implemented internally using a timer. The events buffered within the extension are released each time the timer ticks. | -1 (timeout is infinite) | LONG | Yes | No |
max.k | The maximum threshold value for K parameter in the K-Slack algorithm | 9,223,372,036,854,775,807 (The maximum Long value) | LONG | Yes | No |
discard.flag | Indicates whether the out-of-order events which appear after the expiration of the K-slack window should get discarded or not | false | BOOL | Yes | No |
Examples EXAMPLE 1
define stream inputStream (eventtt long, price long, volume long);
@info(name = 'query1')
from inputStream#reorder:kslack(eventtt, 1000L)
select eventtt, price, volume
insert into outputStream;
This query performs reordering based on the 'eventtt' attribute values. The timeout value is set to 1000 milliseconds
Akslack
reorder (Stream Processor)
This stream processor extension performs reordering of an out-of-order event stream.
It implements the Alpha K-Slack based out-of-order handling algorithm (originally described in
http://dl.acm.org/citation.cfm?doid=2675743.2771828)
Syntax
akslack:reorder(<LONG> timestamp, <DOUBLE> correlation.field, <LONG> batch.size, <LONG> timer.timeout, <LONG> max.k, <BOOL> discard.flag, <DOUBLE> error.threshold, <DOUBLE> confidence.level)
QUERY PARAMETERS
Name | Description | Default Value | Possible Data Types | Optional | Dynamic |
---|---|---|---|---|---|
timestamp | Attribute used for used for ordering the events | LONG | No | No | |
correlation.field | Corresponds to the data field of which the accuracy directly gets affected by the adaptive operation of the K-Slack extension. This field is used by the Alpha K-Slack to calculate the runtime window coverage threshold which is an upper limit set for the unsuccessfully handled late arrivals | DOUBLE | No | No | |
batch.size | The parameter batch.size denotes the number of events that should be considered in the calculation of an alpha value. batch.size should be a value which should be greater than or equals to 15 | 10,000 | LONG | Yes | No |
timer.timeout | Corresponds to a fixed time-out value in milliseconds, which is set at the beginning of the process. Once the time-out value expires, the extension drains all the events that are buffered within the reorder extension to outside. The time out has been implemented internally using a timer. The events buffered within the extension are released each time the timer ticks. | -1 (timeout is infinite) | LONG | Yes | No |
max.k | The maximum threshold value for K parameter in the Alpha K-Slack algorithm | 9,223,372,036,854,775,807 (The maximum Long value) | LONG | Yes | No |
discard.flag | Indicates whether the out-of-order events which appear after the expiration of the Alpha K-slack window should get discarded or not | false | BOOL | Yes | No |
error.threshold | Error threshold to be applied in Alpha K-Slack algorithm. This parameter must be defined simultaneously with confidenceLevel | 0.03 (3%) | DOUBLE | Yes | No |
confidence.level | Confidence level to be applied in Alpha K-Slack algorithm. This parameter must be defined simultaneously with errorThreshold | 0.95 (95%) | DOUBLE | Yes | No |
Examples EXAMPLE 1
define stream inputStream (eventtt long,data double);
@info(name = 'query1')
from inputStream#reorder:akslack(eventtt, data, 20l)
select eventtt, data
insert into outputStream;
This query performs reordering based on the 'eventtt' attribute values. The batch size applied here is 20.