Skip to content

Siddhi 4.x as a Python library

To use Siddhi in Python via PySiddhi and to get a working sample follow the below sample:

Step 1: Initialize libraries and imports

Add log4j.xml file to working directory in order to enable log4j logging. Log4j is used by PrintEvent to generate output.

Create a .py file add add the following Siddhi related imports:

from PySiddhi4.DataTypes.LongType import LongType
from PySiddhi4.core.SiddhiManager import SiddhiManager
from PySiddhi4.core.query.output.callback.QueryCallback import QueryCallback
from PySiddhi4.core.util.EventPrinter import PrintEvent
from time import sleep

Step 2: Creating Siddhi Application

A Siddhi application is a self contained execution entity that defines how data is captured, processed and sent out.

Create a Siddhi Application by defining a stream definition E.g.StockEventStream defining the format of the incoming events, and by defining a Siddhi query as follows.

siddhiManager = SiddhiManager()
# Siddhi Query to filter events with volume less than 150 as output
siddhiApp = "define stream StockEventStream (symbol string, price float, volume long); " + \
"@info(name = 'query1') from StockEventStream[volume < 150] select symbol,price insert into OutputStream;"
This Siddhi query query to detect stock records having volume less than 150, and then inserts the results into a stream named OutputStream.

For more details on Siddhi Query Language, refer Siddhi Query Language Guide.

Step 3: Creating Siddhi Application Runtime

This step involves creating a runtime representation of a Siddhi application.

# Generate runtime
siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(siddhiApp)

Step 4: Registering a Callback

You can register a callback to the Siddhi application runtime in order to receive the results once the events are processed. There are two types of callbacks:

  • Query callback: This subscribes to a query.
  • Stream callback: This subscribes to an event stream.

In this example, a Query callback is added to the query1 to capture the processed events.

# Add listener to capture output events
class QueryCallbackImpl(QueryCallback):
    def receive(self, timestamp, inEvents, outEvents):
        PrintEvent(timestamp, inEvents, outEvents)
siddhiAppRuntime.addCallback("query1",QueryCallbackImpl())

Here, once the results are generated they are sent to the receive method of this callback. An event printer is added inside this callback to print the incoming events for demonstration purposes.

Step 5: Sending Events

In order to programmatically send events from the stream you need to obtain it's an input handler as follows:

# Retrieving input handler to push events into Siddhi
inputHandler = siddhiAppRuntime.getInputHandler("StockEventStream")

Use the following code to start the Siddhi application runtime, send events and to shutdown Siddhi:

# Starting event processing
siddhiAppRuntime.start()

# Sending events to Siddhi
inputHandler.send(["IBM",700.0,LongType(100)])
inputHandler.send(["WSO2", 60.5, LongType(200)])
inputHandler.send(["GOOG", 50, LongType(30)])
inputHandler.send(["IBM", 76.6, LongType(400)])
inputHandler.send(["WSO2", 45.6, LongType(50)])

# Wait for response
sleep(10)

#Shutdown SiddhiApp runtime
siddhiAppRuntime.stop()

# Shutdown Siddhi
siddhiManager.shutdown()

Full sample code

This sample demonstrating how to write a streaming query to detect stock records having volume less than 150.

from PySiddhi4.DataTypes.LongType import LongType
from PySiddhi4.core.SiddhiManager import SiddhiManager
from PySiddhi4.core.query.output.callback.QueryCallback import QueryCallback
from PySiddhi4.core.util.EventPrinter import PrintEvent
from time import sleep

siddhiManager = SiddhiManager()
# Siddhi Query to filter events with volume less than 150 as output
siddhiApp = "define stream StockEventStream (symbol string, price float, volume long); " + \
"@info(name = 'query1') from StockEventStream[volume < 150] select symbol,price insert into OutputStream;"

# Generate runtime
siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(siddhiApp)

# Add listener to capture output events
class QueryCallbackImpl(QueryCallback):
    def receive(self, timestamp, inEvents, outEvents):
        PrintEvent(timestamp, inEvents, outEvents)
siddhiAppRuntime.addCallback("query1",QueryCallbackImpl())

# Retrieving input handler to push events into Siddhi
inputHandler = siddhiAppRuntime.getInputHandler("StockEventStream")

# Starting event processing
siddhiAppRuntime.start()

# Sending events to Siddhi
inputHandler.send(["IBM",700.0,LongType(100)])
inputHandler.send(["WSO2", 60.5, LongType(200)])
inputHandler.send(["GOOG", 50, LongType(30)])
inputHandler.send(["IBM", 76.6, LongType(400)])
inputHandler.send(["WSO2", 45.6, LongType(50)])

# Wait for response
sleep(10)

siddhiAppRuntime.stop()

siddhiManager.shutdown()

Expected Output

The 3 events with volume less than 150 are printed in log.

INFO  EventPrinter - Events{ @timestamp = 1497708406678, inEvents = [Event{timestamp=1497708406678, id=-1, data=[IBM, 700.0], isExpired=false}], RemoveEvents = null }
INFO  EventPrinter - Events{ @timestamp = 1497708406685, inEvents = [Event{timestamp=1497708406685, id=-1, data=[GOOG, 50], isExpired=false}], RemoveEvents = null }
INFO  EventPrinter - Events{ @timestamp = 1497708406687, inEvents = [Event{timestamp=1497708406687, id=-1, data=[WSO2, 45.6], isExpired=false}], RemoveEvents = null }
Top