Skip to content

Siddhi 5.1 as a Python library

PySiddhi is a Python wrapper for Siddhi which can listens to events from data streams, detects complex conditions described via a Streaming SQL language, and triggers actions.

To use Siddhi in Python via PySiddhi and to get a working sample follow the below steps:

There are some prerequisites should be met before installing PySiddhi with above command.

Prerequisites

  • The following dependencies should be installed prior to installation of library.

    Linux

    • Python 2.7 or 3.x
    • Cython
      sudo pip install cython
    • Python Developer Package
      sudo apt-get install python-dev python3-dev python-dev
    • Oracle Java 8 and set JAVA_HOME path
    • libboost for Python (Only to build from Source)
      sudo apt-get install libboost-python-dev
    • Maven (Only to build from Source)
    • g++ and other development tools (Only to build from Source)
      sudo apt-get install build-essential g++ autotools-dev libicu-dev build-essential libbz2-dev libboost-all-dev

    macOS

    • Install brew
    • Install python using brew
    • Cython
      sudo pip install cython
    • Oracle Java 8 and set JAVA_HOME path
    • boost for python (Only to build from Source)
      brew install boost
    • Maven (Only to build from Source)

    Windows

    • Install Python
    • Cython
      sudo pip install cython
    • Oracle Java 8 and set JAVA_HOME path
    • Install Visual Studio Build tools (Only to build from Source)
    • Maven (Only to build from Source)
  • Download siddhi-sdk release from here and set the SIDDHISDK_HOME as an environment variable.
    export SIDDHISDK_HOME="<path-to-siddhi-sdk>"

  • Download siddhi-python-api-proxy-5.1.0.jar from here and copy to <SIDDHISDK_HOME>/lib directory

Installation

The current version is tested with Unix/Linux based operating systems. PySiddhi can be installed using one of the following methods.

Install PySiddhi via Python Package Management

PySiddhi can be installed using pip.

1
pip install pysiddhi

Install PySiddhi from Online Code

Using the following PIP command, PySiddhi can be directly installed from online code available in GitHub.

1
pip install git+https://github.com/siddhi-io/PySiddhi.git
Note: In case of permission errors, use sudo

Install from Downloaded Code

Switch to the branch master of PySiddhi. Navigate to source code root directory and execute the following PIP command.

1
pip install .
Note the period (.) at end of command. In case of permission errors, use sudo

Sample

Let's tryout the below sample with PySiddhi to get better understanding about the usage. This sample demonstrating how to write a streaming query to detect stock records having volume less than 150.

Step 1 - Initialize libraries and imports

Add this file to working directory in order to enable log4j logging. Log4j is used by PrintEvent to generate output.

1
2
3
4
5
from PySiddhi.DataTypes.LongType import LongType
from PySiddhi.core.SiddhiManager import SiddhiManager
from PySiddhi.core.query.output.callback.QueryCallback import QueryCallback
from PySiddhi.core.util.EventPrinter import PrintEvent
from time import sleep

Step 2 - Define filter using Siddhi Query

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
siddhiManager = SiddhiManager()
# Siddhi Query to filter events with volume less than 150 as output
siddhiApp = "define stream cseEventStream (symbol string, price float, volume long);" + \
            "@info(name = 'query1') " + \
            "from cseEventStream[volume < 150] " + \
            "select symbol, price " + \
            "insert into outputStream;"

# Generate runtime
siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(siddhiApp)

For more details on Siddhi Query Language, refer Siddhi Query Language Guide.

Step 3 - Define a listener for filtered events

1
2
3
4
5
# Add listener to capture output events
class QueryCallbackImpl(QueryCallback):
    def receive(self, timestamp, inEvents, outEvents):
        PrintEvent(timestamp, inEvents, outEvents)
siddhiAppRuntime.addCallback("query1",QueryCallbackImpl())

Step 4 - Test filter query using sample input events

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
# Retrieving input handler to push events into Siddhi
inputHandler = siddhiAppRuntime.getInputHandler("cseEventStream")

# Starting event processing
siddhiAppRuntime.start()

# Sending events to Siddhi
inputHandler.send(["IBM",700.0,LongType(100)])
inputHandler.send(["WSO2", 60.5, LongType(200)])
inputHandler.send(["GOOG", 50, LongType(30)])
inputHandler.send(["IBM", 76.6, LongType(400)])
inputHandler.send(["WSO2", 45.6, LongType(50)])

# Wait for response
sleep(0.1)

Clean Up - Remember to shutdown the Siddhi Manager when your done.

1
siddhiManager.shutdown()

Complete Siddhi App

Please find the complete sample Siddhi app in below

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
from PySiddhi.DataTypes.LongType import LongType
from PySiddhi.core.SiddhiManager import SiddhiManager
from PySiddhi.core.query.output.callback.QueryCallback import QueryCallback
from PySiddhi.core.util.EventPrinter import PrintEvent
from time import sleep

siddhiManager = SiddhiManager()
# Siddhi Query to filter events with volume less than 150 as output
siddhiApp = "define stream cseEventStream (symbol string, price float, volume long); " + \
"@info(name = 'query1') from cseEventStream[volume < 150] select symbol,price insert into outputStream;"

# Generate runtime
siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(siddhiApp)

# Add listener to capture output events
class QueryCallbackImpl(QueryCallback):
    def receive(self, timestamp, inEvents, outEvents):
        PrintEvent(timestamp, inEvents, outEvents)
siddhiAppRuntime.addCallback("query1",QueryCallbackImpl())

# Retrieving input handler to push events into Siddhi
inputHandler = siddhiAppRuntime.getInputHandler("cseEventStream")

# Starting event processing
siddhiAppRuntime.start()

# Sending events to Siddhi
inputHandler.send(["IBM",700.0,LongType(100)])
inputHandler.send(["WSO2", 60.5, LongType(200)])
inputHandler.send(["GOOG", 50, LongType(30)])
inputHandler.send(["IBM", 76.6, LongType(400)])
inputHandler.send(["WSO2", 45.6, LongType(50)])

# Wait for response
sleep(10)

siddhiManager.shutdown()

Expected Output

The 3 events with volume less than 150 are printed in log.

1
2
3
INFO  EventPrinter - Events{ @timestamp = 1497708406678, inEvents = [Event{timestamp=1497708406678, id=-1, data=[IBM, 700.0], isExpired=false}], RemoveEvents = null }
INFO  EventPrinter - Events{ @timestamp = 1497708406685, inEvents = [Event{timestamp=1497708406685, id=-1, data=[GOOG, 50], isExpired=false}], RemoveEvents = null }
INFO  EventPrinter - Events{ @timestamp = 1497708406687, inEvents = [Event{timestamp=1497708406687, id=-1, data=[WSO2, 45.6], isExpired=false}], RemoveEvents = null }

Uninstall PySiddhi

If the library has been installed as explained above, it could be uninstalled using the following pip command.

1
pip uninstall pysiddhi

Refer here to get more details about running Siddhi on Python.

Top