Siddhi 5.x Architecture¶
Siddhi is an open source, cloud-native, stream processing and complex event processing engine. It can be utilized in any of the following ways:
- Run as a server on its own
- Run with WSO2 Stream Processor as a service
- Embedded into any Java or Python-based application
- Run on an Android application
Siddhi provides streaming data integration and data analytical operators. It connects multiple disparate live data sources, orchestrates data flows, calculates analytics, and also detects complex event patterns. This allows developers to build applications that collect data, perform data transformation and analytics, and publish the results to data sinks in real time.
Please find the Siddhi 4.x Architecture here
This section illustrates the architecture of the Siddhi Engine and guides you through its key functionality. We hope this article helps developers to understand Siddhi and its codebase better, and also help them to contribute and improve Siddhi.
Main Design Decisions¶
- Event-by-event processing of real-time streaming data to achieve low latency.
- Ease of use with Streaming SQL providing an intuitive way to express stream processing logic and complex event processing constructs such as Patterns.
- Achieve high performance by processing events in-memory and using data stores for long term data storage.
- Optimize performance by enforcing a strict event stream schema and by pre-compiling the queries.
- Optimize memory consumption by having only the absolutely necessary information in-memory and dropping the rest as soon as possible.
- Supporting multiple extension points to accommodate a diverse set of functionality such as supporting multiple sources, sinks, functions, aggregation operations, windows, etc.
At a high level, Siddhi consumes events from various events sources, processes them according to the defined Siddhi application,
and produces results to the subscribed event sinks.
Siddhi can store and consume events from in-memory tables or from external data stores such as
Hazelcast in-memory grid, etc. (i.e., when configured to do so). Siddhi also allows applications and users to query Siddhi via its Store Query API to interactively
retrieve data from in-memory and other stores.
Main Modules in Siddhi Engine¶
Siddhi Engine comprises four main modules, they are:
Siddhi Query API: This allows users to define the execution logic of the Siddhi application as queries and definitions using POJOs (Plain Old Java Objects). Internally, Siddhi uses these objects to identify the logic that it is expected to perform.
Siddhi Query Compiler: This allows users to define the Siddhi application using the Siddhi Streaming SQL, and it compiles the Streaming SQL script to Siddhi Query API POJOs so that Siddhi can execute them.
Siddhi Core: This builds the execution runtime based on the defined Siddhi Application POJOs and processes the incoming events as and when they arrive.
Siddhi Annotation: This is a helper module that allows all extensions to be annotated so that they can be picked by Siddhi Core for processing. This also helps Siddhi to generate the extension documentation.
Siddhi Component Architecture¶
The following diagram illustrates the main components of Siddhi and how they work together.
Here the Siddhi Core module maintains the execution logic. It also interacts with the external environment and systems for consuming, processing and publishing events. It uses the following components to achieve its tasks:
SiddhiManager: This is a key component of Siddhi Core that manages Siddhi Application Runtimes and facilitates their functionality via Siddhi Context with periodic state persistence, statistics reporting and extension loading. It is recommended to use one Siddhi Manager for a single JVM.
SiddhiAppRuntime: Siddhi Application Runtime can be generated for each Siddhi Application through the Siddhi Manager. Siddhi Application Runtimes provide an isolated execution environment for each defined Siddhi Application. These Siddhi Application Runtimes can have their own lifecycle and they execute based on the logic defined in their Siddhi Application.
SiddhiContext: This is a shared object across all the Siddhi Application Runtimes within the same Siddhi manager. It contains references to the persistence store for periodic persistence, statistics manager to report performance statistics of Siddhi Application Runtimes, and extension holders for loading Siddhi extensions.
Siddhi Application Creation¶
Execution logic of the Siddhi Engine is composed as a Siddhi Application, and this is usually passed as a string to SiddhiManager to create the SiddhiAppRuntime for execution.
When a Siddhi Application is passed to the
SiddhiManager.createSiddhiAppRuntime(), it is processed internally with the
Here, the SiddhiApp String is compiled to SiddhiApp
object model by the SiddhiQLBaseVisitorImpl class.
This validates the syntax of the given Siddhi Application. The model is then passed to the SiddhiAppParser
to create the SiddhiAppRuntime. During this phase, the semantics of the Siddhi Application is validated and the execution logic of the Siddhi Application is optimized.
Siddhi App Execution Flow¶
Following diagram depicts the execution flow within a Siddhi App Runtime.
The path taken by events within Siddhi Engine is indicated in blue.
The components that are involved in handling the events are the following:
This routes events of a particular stream to various components within the Siddhi App Runtime. A stream junction is generated for each defined or inferred Stream in the Siddhi Application. A stream junction by default uses the incoming event's thread and passes all the events to its subscribed components as soon as they arrive, but this behaviour can be altered by configuring
@Asyncannotation to buffer the events at the and stream junction and to use another one or more threads to collect the events from the buffer and process the subsequent executions.
Input handler is used to push
Eventobjects into stream junctions from defined event sources, and from Java/Python programmes.
Events from stream junction and passes them to event sinks to publish to external endpoints, and/or passes them to subscribed Java/Python programmes for further processing.
These components process events by filtering, transforming, aggregating, joining, pattern matching, etc. They consume events from one or more stream junctions, process them and publish the processed events into a set of stream junctions based on the defined queries or partitions.
Sources consume events from external sources in various data formats, convert them into Siddhi events using SourceMappers and pass them to corresponding stream junction via their associated input handlers. A source is generated for each
@Sourceannotation defined above a stream definition.
A source mapper is a sub-component of source, and it needs to be configured for each source in order to convert the incoming event into Siddhi event. The source mapper type can be configured using the
@Mapannotation within the
@Sourceannotation. When the
@Mapannotation is not defined, Siddhi uses the PassThroughSourceMapper, where it assumes that the incoming message is already in the Siddhi Event format (i.e
Event), and therefore makes no changes to the incoming event format.
Sinks consumes events from its associated stream junction, convert them to various data formats via SinkMapper and publish them to external endpoints as defined in the
@Sinkannotation. A sink is generated for each
@Sinkannotation defined above a stream definition.
A sink mapper is a sub-component of sink. and its need to be configured for each sink in order to map the Siddhi events to the specified data format so that they can be published via the sink. The sink mapper type can be configured using the
@Mapannotation within the
@Sinkannotation. When the
@Mapannotation is not defined, Siddhi uses PassThroughSinkMapper, where it passes the Siddhi Event (i.e
Event) without any formatting to the Sink.
Tables are used to store events. When tables are defined by default, Siddhi uses the InMemoryTable implementation to store events in-memory. When
@Storeannotation is used on top of the table definition, it loads the associated external data store connector based on the defined
storetype. Most table implementations are extended from either AbstractRecordTable or AbstractQueryableRecordTable abstract classes the former provides the functionality to query external data store based on a given filtering condition, and the latter queries external data store by providing projection, limits, and ordering parameters in addition to data filter condition.
Windows store events as and when they arrive and automatically expire/clean them based on the given window constraint. Multiple types of windows are can be implemented by extending the WindowProcessor abstract class.
Long running time series aggregates defined via the aggregation definition is calculated in an incremental manner using the Incremental Aggregation Processor for the defined time periods. Incremental aggregation functions can be implemented by extending IncrementalAttributeAggregator. By default, incremental aggregations aggregate all the values in-memory, but when it is associated with a store by adding
@storeannotation it uses in-memory to aggregate partial results and uses data stores to persist those increments. When requested for aggregate results it retrieves data from data stores and (if needed from) in-memory, computes combined aggregate results and provides as the output.
A trigger triggers events at a given interval as given in the trigger definition. The triggered events are pushed to a stream junction having the same name as the trigger.
A query callback taps into the events that are emitted by a particular query. It notifies the event occurrence
timestampand classifies the output events into
Siddhi Query Execution¶
Siddhi QueryRuntimes can be categorized into three main types:
SingleInputStream: Queries that consist of query types such as filters and windows.
JoinInputStream: Queries that consist of joins.
StateInputStream: Queries that consist of patterns and sequences.
The following section explains the internals of each query type.
SingleInputStream Query Runtime (Filter & Windows)¶
A single input stream query runtime is generated for filter and window queries. They consume events from a stream junction or a window and convert the incoming events according to the expected output stream format at the ProcessStreamReceiver by dropping all the unrelated incoming stream attributes.
Then the converted events are passed through a few Processors such as FilterProcessor,
WindowProcessor can be extended with various stream processing capabilities.
The last processor of the chain of processors must always be a
QuerySelector and it can't appear anywhere else. When the query runtime consumes events from a stream,
its processor chain can maximum contain one
WindowProcessor, and when query runtime consumes events from a window, its chain of processors cannot
FilterProcessor is implemented using expressions that return a boolean value. ExpressionExecutor
is used to process conditions, mathematical operations, unary operations, constant values, variables, and functions. Expressions have a tree structure, and
they are processed based using the Depth First search algorithm. To achieve high performance, Siddhi currently depends on the user
to formulate the least successful case in the leftmost side of the condition, thereby increasing the chance of early
The condition expression
price >= 100 and ( Symbol == 'IBM' or Symbol == 'MSFT' ) is represented as shown below.
These expressions also support the execution of user-defined functions (UDFs), and they can be implemented by extending the FunctionExecutor class.
After getting processed by all the processors, events reach the
QuerySelector for transformation. At the
events are transformed based on the
select clause of the query. The
select clause produces one
AttributeProcessor for each output stream attribute, and these
AttributeProcessors contain expressions defining data transformation including constant values, variables, user-defined functions, etc.
They can also contain AttributeAggregatorExecutors
to process aggregation operations such as
count, etc. If there is a
Group By clause defined, then the GroupByKeyGenerator
is used to identify the composite group-by key, and then for each key, an
AttributeAggregatorExecutor state is generated to maintain per group-by key aggregations.
When each time AttributeProcessor
is executed the
AttributeAggregatorExecutor calculates per group-by aggregation results and output the values. When
AttributeAggregatorExecutor group-by states become
obsolete, they are destroyed and automatically cleaned.
After an event is transformed to the output format through the above process, it is evaluated against the having condition executor if a
having clause is
provided. The succeeding events are then ordered, and limited based on
before they pushed to the OutputRateLimiter.
OutputRateLimiter, the event output is controlled before sending the events to the stream junction or to the query callback.
output clause is not defined, the PassThroughOutputRateLimiter is used by passing all the events without any rate limiting.
Temporal Processing with Windows¶
To achieve temporal processing, Siddhi uses the following four type of events:
Current Events: Events that are newly arriving to the query from streams.
Expired Events: Events that have expired from a window.
Timer Events: Events that inform the query about an update of execution time. These events are usually generated by schedulers.
Reset Events: Events that resets the Siddhi query states.
In Siddhi, when an event comes into a WindowProcessor,
it creates an appropriate
expired event corresponding to the incoming
current event with the expiring timestamp, and stores that
event in the window. At the same time,
WindowProcessor also forwards the
current event to the next processor for further processing.
It uses a scheduler or some other counting approach to determine when to emit the events that are stored in in-memory. When the
expired events meet the condition for expiry based on the window contains, it emits the
expired events to the next processor.
At times like in
window.timeBatch() there can be cases that need emitting all the events in-memory at once and the output does not need individual
expired events values, in this cases the window emits a single
reset event instead of sending one
expired event for each event it has stored,
so that it can reset the states in one go. For the
QuerySelector aggregations to work correctly the window must emit a corresponding
expired event for each
current event it has emitted or it must send a
QuerySelector, the arrived
current events increase the aggregation values,
expired events decrease the values, and
reset events reset the aggregation calculation to produce correct query output.
For example, the sliding TimeWindow
window.time()) creates a corresponding
expired event for each
current event that arrives, adds the
expired events to
the window, adds an entry to the scheduler to notify when that event need to be expired, and finally sends the
current event to the next processor for subsequent processing.
The scheduler notifies the window by sending a
timer event, and when the window receives an indication that the expected expiry time has come for the oldest event in the window via a
timer event or by other means, it removes the
expired event from the window and passes that to the next processor.
JoinInputStream Query Runtime (Join)¶
Join input stream query runtime is generated for join queries. This can consume events from two stream junctions and perform a join operation as depicted above.
It can also perform a join by consuming events from one stream junction and join against itself, or it can also join against a
table, window or an aggregation. When a join is performed with a table, window or aggregation,
WindowProcessor in the above image is replaced with the corresponding table, window or aggregation and no basic processors are used on their side.
The joining operation is triggered by the events that arrive from the stream junction.
Here, when an event from one stream reaches the pre JoinProcessor,
it matches against all the available events of the other stream's
When a match is found, those matched events are sent to the
current events, and at the same time,
the original event is added to the
WindowProcessor where it remains until it expires. Similarly, when an
event expires from the
WindowProcessor, it matches against all the available events of the other stream's
when a match is found, those matched events are sent to the
Despite the optimizations, a join query is quite expensive when it comes to performance. This is because the
is locked during the matching process to avoid race conditions and to achieve accuracy while joining. Therefore, when possible avoid
matching large (time or length) windows in high volume streams.
StateInputStream Query Runtime (Pattern & Sequence)¶
The state input stream query runtime is generated for pattern and sequence queries. This consumes events from one or more stream junctions
and checks whether the events match each pattern or sequence condition by processing the set of basic processors associated with each
PreStateProcessors usually contains lists of state events that are already matched by previous conditions, and if its the first condition
then it will have an empty state event in its list. When
ProcessStreamReceiver consumes an event, it passes the event
PreStateProcessor which updates the list of state events it has with the incoming event and executes the condition by passing the events to the basic processors.
The state events that match the conditions reach the
PostStateProcessor which will then stores the events to the state event list of the following
If it is the final condition's
PostStateProcessor, then it will pass the state event to the
QuerySelector to generate and emit the output.
Siddhi Partition Execution¶
A partition is a wrapper around one or more Siddhi queries and inner streams that connect them.
A partition is implemented in Siddhi as a PartitionRuntime
which contains multiple
QueryRuntimes and inner stream junctions.
Each partitioned stream entering the partition goes through a designated PartitionStreamReceiver.
PartitionStreamReceiver evaluates the incoming events to identify their associated partition-key using either RangePartitionExecutor
The identified partition-key is then set as thread local variable and the event is passed to the
QueryRuntimes of processing.
QueryRuntimes process events by maintaining separate states for each partition-key such that producing separate output per partition.
When a partition query consumes a non-partitioned global stream, the
QueryRuntimes are executed for each available partition-key in the system such that
allowing all partitions to receive the same event. When the partitions are obsolete
PartitionRuntime deletes all the partition states from its
Siddhi supports long duration time series aggregations via its aggregation definition. AggregationRuntime
implements this by the use of streaming lambda architecture, where it processes part of the data in-memory and gets part of the data from data stores.
AggregationRuntime creates an in-memory table or external store for each time granularity (i.e seconds, minutes, days, etc) it has to process the events,
and when events enter it calculates the aggregations in-memory for its least granularity (usually seconds) using the IncrementalExecutor
and maintains the running aggregation values in its BaseIncrementalValueStore.
At each clock end time of the granularity (end of each second)
IncrementalExecutor stores the summarized values to the associated granularity
table and also passes the summarized values to the
IncrementalExecutor of the next granularity level, which also follows
the same methodology in processing the events. Through this approach each time granularities, the current time duration will be in-memory and all the historical time durations
will be in stored in the tables.
The aggregations results are calculated by IncrementalAttributeAggregators
and stored in such a way that allows proper data composition upon retrial, for example,
avg() is stored as
This allows data composition across various granularity time durations when retrieving, for example, results for
avg() composed by returning sum of
sums divided by the sum of
Aggregation can also work in a distributed manner and across system restarts. This is done by storing node specific IDs and granularity time duration information in the tables.
To make sure tables do not go out of memory IncrementalDataPurging
is used to purge old data.
When aggregation is queried through join or store query for a given time granularity it reads the data
from the in-memory
BaseIncrementalValueStore and from the tables computes the composite results as described, and presents the results.
Siddhi Event Formats¶
Siddhi has three event formats.
This is the format exposed to external systems when they send events via Input Handler and consume events via Stream Callback or Query Callback. This consists of a
Objectthat contains all the values in accordance to the corresponding stream.
This is used within queries. This contains a
timestampand the following three
beforeWindowData: This contains values that are only used in processors that are executed before the
onAfterWindowData: This contains values that are only used by the
WindowProcessorand the other processors that follow it, but not sent as output.
outputData: This contains the values that are sent via the output stream of the query.
In order to optimize the amount of data that is stored in the in-memory at windows, the content in
beforeWindowDatais cleared before the event enters the
WindowProcessor. StreamEvents can also be chained by linking each other via the
nextproperty in them.
This is used in joins, patterns and sequences queries when we need to associate events of multiple streams, tables, windows or aggregations together. This contains a
timestamp, a collection of
StreamEvents representing different streams, tables, etc, that are used in the query, and an
outputDatavalues that are needed for query output. The
StreamEvents within the
StateEventthemselves can be chained by linking each other with the
nextproperty in them.
Event Chunks provide an easier way of manipulating the chain of
StateEvents so that they are be easily iterated, inserted and removed.
This article focuses on describing the architecture of Siddhi and rationalizing some of the architectural decisions made when implementing the system. It also explains the key features of Siddhi. We hope this will be a good starting point for new developers to understand Siddhi and to start contributing to it.