Skip to content

PySiddhi

PySiddhi is a Python wrapper for Siddhi. Which can listens to events from data streams, detects complex conditions described via a Streaming SQL language, and triggers actions. It performs both Stream Processing and Complex Event Processing on streaming data. Its Siddhi core is written in Java library.

Content

Features

  • [x] Basic functionality of Siddhi 5.x.x
  • [x] Siddhi Debugger
  • [x] Support to Siddhi Extensions Loading

Installation

PySiddhi can be installed using pip.

pip install pysiddhi

For detail insulation and prerequisite refer section on Installation Guide.

Quick Demo

Following is a quick demo of how to use PySiddhi. For comprehensive demo please refer Quick-Demo-PySiddhi

Step 1 - Define filter using Siddhi Query.

siddhiManager = SiddhiManager()
# Siddhi Query to filter events with volume less than 150 as output
siddhiApp = "define stream cseEventStream (symbol string, price float, volume long);" + \
            "@info(name = 'query1') " + \
            "from cseEventStream[volume < 150] " + \
            "select symbol, price " + \
            "insert into outputStream;"

# Generate runtime
siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(siddhiApp)

For more details on Siddhi Query Language, refer Siddhi Query Language Guide.

Step 2 - Define a listener for filtered events.

# Add listener to capture output events
class QueryCallbackImpl(QueryCallback):
    def receive(self, timestamp, inEvents, outEvents):
        PrintEvent(timestamp, inEvents, outEvents)
siddhiAppRuntime.addCallback("query1",QueryCallbackImpl())

Step 3 - Test filter query using sample input events.

# Retrieving input handler to push events into Siddhi
inputHandler = siddhiAppRuntime.getInputHandler("cseEventStream")

# Starting event processing
siddhiAppRuntime.start()

# Sending events to Siddhi
inputHandler.send(["IBM",700.0,LongType(100)])
inputHandler.send(["WSO2", 60.5, LongType(200)])
inputHandler.send(["GOOG", 50, LongType(30)])
inputHandler.send(["IBM", 76.6, LongType(400)])
inputHandler.send(["WSO2", 45.6, LongType(50)])

# Wait for response
sleep(0.1)

Output

The 3 events with volume less than 150 are printed in log.

INFO  EventPrinter - Events{ @timestamp = 1497708406678, inEvents = [Event{timestamp=1497708406678, id=-1, data=[IBM, 700.0], isExpired=false}], RemoveEvents = null }
INFO  EventPrinter - Events{ @timestamp = 1497708406685, inEvents = [Event{timestamp=1497708406685, id=-1, data=[GOOG, 50], isExpired=false}], RemoveEvents = null }
INFO  EventPrinter - Events{ @timestamp = 1497708406687, inEvents = [Event{timestamp=1497708406687, id=-1, data=[WSO2, 45.6], isExpired=false}], RemoveEvents = null }

Clean Up - Remember to shutdown the Siddhi Manager when your done.

siddhiManager.shutdown()

Contribution

PySiddhi is initiated by a project for Google Summer of Code 2017 Program.

Contributed by: Madhawa Vidanapathirana
Email: madhawavidanapathirana@gmail.com
Organization: University of Moratuwa, Sri Lanka.

Support and Contribution

  • We encourage users to ask questions and get support via StackOverflow, make sure to add the siddhi tag to the issue for better response.

  • If you find any issues related to the extension please report them on the issue tracker.

  • For production support and other contribution related information refer Siddhi Community documentation.