Long-running time based Aggregations¶
In this guide, you will understand one of the common requirements of Analytics which is aggregating data.
Scenario - Generating alerts based on long-running summarization¶
Aggregation is a mandatory need in any system as it allows users to spot trends and anomalies easily which can lead to actions that will benefit an organization or the business. Aggregated data can also be processed easily to get the information needed for a business requirement decision.
The following sections are available in this guide.
What you'll build¶
Let's consider a real world use case to implement the aggregation requirement. This will help you to understand Siddhi Stream Processing construct, Named Aggregations. Let’s jump into the use case directly.
Let's assume you are a data analyst in the 'X' online shopping mall. 'X' shopping mall daily offers deals to customers to boost their sales. Each day past sales is analysed such that recommendation can be given on what items to be put up for Daily deals and more importantly out of all sellers, whose items to be used in Daily Deals.
Let's take a simple logic to select Daily Deals, you will analyse the last 10 days of sales and get the products which has a lower amount of sales, lets say 100 units. For these products we will select sellers who has the highest sales in the past month and generate an email with top 5 candidates whose products can be offered in the Daily Deal.
Now, let’s understand how this could be implemented in Siddhi engine. For the aggregation use case we will be using a specific function of Siddhi engine, called Named Aggregations.
Prerequisites¶
Below are the prerequisites that should be considered to implement the above use case.
Mandatory Requirements¶
- Siddhi tooling VM/Local distribution
- One of the Siddhi runner distributions
- VM/Local Runtime
- Docker Image
- K8S Operator (commands are given in deployment section)
- Java 8 or higher
Requirements needed to deploy Siddhi in Docker/Kubernetes¶
Implementation¶
When a user checkouts out a sales cart in 'X' online shopping malls website, the website will send the information to Siddhi runtime through HTTP transport.
-
Siddhi runtime will aggregate the sales data in multiple granularities from seconds to years, grouped by category, product code and seller.
-
Daily at 11.50 pm an alert will be generated to include, Daily Deals product and seller recommendations.
Implement Streaming Queries¶
-
Start the Siddhi tooling runtime and go to the editor UI in http://localhost:9390/editor
Follow below steps to start the Siddhi tooling runtime, * Extract the downloaded zip and navigate to
/bin. (TOOLING_HOME refers to the extracted folder) * Issue the following command in the command prompt (Windows) / terminal (Linux/Mac) For Windows: tooling.bat For Linux/Mac: ./tooling.sh
-
Select File -> New option, then you could either use the source view or design view to write/build the Siddhi Application. You can find the Siddhi Applications below, that implements the requirements mentioned above.
-
Let’s write (develop) the Siddhi Applications, as given below.
-
Once the Siddhi apps are created, you can use the Event Simulator option in the editor to simulate events to streams and perform developer testing.
Siddhi App 1 : Responsible for Aggregating Data
@App:name("SalesSummarization")
@App:description('X online shopping mall - Summarization for sales by category, product and seller')
@Source(type = 'http', receiver.url = 'http://0.0.0.0:8080/sales', basic.auth.enabled = 'false',
@map(type = 'json'))
define stream SalesStream(timestamp long, categoryName string, productName string, sellerName string, quantity int);
--Long term Summarization
@store(type = 'rdbms' ,
jdbc.url = '${MYSQL_URL}',
username = '${MYSQL_USERNAME}',
password = '${MYSQL_PASSWORD}',
jdbc.driver.name = 'com.mysql.jdbc.Driver',
pool.properties = "maximumPoolSize:1")
@purge(enable = 'false')
define aggregation SalesAggregation
from SalesStream
select categoryName, productName, sellerName, sum(quantity) as totalSales
group by categoryName, productName, sellerName
aggregate by timestamp every sec...year;
Siddhi App 2: Responsible for Sending alerts
@App:name("DailyDealsCandidatesAlert")
@App:description("Alerts regarding daily deals candidates - 1 product with max 5 sellers who sold the most in past 30 days")
define trigger DailyTrigger at '0 00 23 ? * MON-FRI';
@Source(type = 'http', receiver.url = 'http://0.0.0.0:8080/dailyDealsAlert', basic.auth.enabled = 'false',
@map(type = 'json'))
define stream DailyDealsTriggerStream(emailToBeSent bool);
define stream SalesStream(timestamp long, categoryName string, productName string, sellerName string, quantity int);
@sink(type = 'email', address = "${SENDER_EMAIL_ADDRESS}", username = "${EMAIL_USERNAME}", password = "${EMAIL_PASSWORD}",
subject = "{{emailSubject}}", to = "${RECEIVER_EMAIL_ADDRESS}", host = "smtp.gmail.com", port = "465", content.type = 'text/html',
ssl.enable = "true", auth = "true",
@map(type = 'text',
@payload("""
Hi, <br/><br/>
Please find the seller candidates for Daily Deal {{dailyDealDate}} below, <br/><br/>
Product <strong>{{productName}}</strong>-{{categoryName}} sold {{last10DaysSales}} units for the last 10 days<br/><br/>
<table>
<tr>
<th> Seller Name </th>
<th> Total Sales for the last Month </th>
</tr>
{{candidateInfo}}
</table>
<br/><br/>
Thanks, <br/>
Analytics Team""")))
define stream EmailNotificationStream(dailyDealDate string, emailSubject string, productName string, categoryName string, last10DaysSales long, candidateInfo string);
@sink(type = 'log',
@map(type = 'json'))
define stream DailyDealsSellerCandidates (emailToBeSent bool, categoryName string, productName string, last10DaysSales long, sellerName string, sellersSales long);
--Long term Summarization
@store(type = 'rdbms' , jdbc.url = '${MYSQL_URL}', username = '${MYSQL_USERNAME}', password = '${MYSQL_PASSWORD}', jdbc.driver.name = 'com.mysql.jdbc.Driver'
, pool.properties = "maximumPoolSize:1")
@purge(enable = 'false')
define aggregation SalesAggregation
from SalesStream
select categoryName, productName, sellerName, sum(quantity) as totalSales
group by categoryName, productName, sellerName
aggregate by timestamp every sec...year;
@info(name = 'calculateStartEndTimeForHTTPTrigger')
from DailyDealsTriggerStream
select emailToBeSent, time:timestampInMilliseconds(time:dateSub(time:currentDate(), 9, 'DAY', 'yyyy-MM-dd'), 'yyyy-MM-dd') as startTime, time:timestampInMilliseconds() as endTime
insert into JoinStream;
@info(name = 'calculateStartEndTimeForDailyTrigger')
from DailyTrigger
select true as emailToBeSent, time:timestampInMilliseconds(time:dateSub(time:currentDate(), 9, 'DAY', 'yyyy-MM-dd'), 'yyyy-MM-dd') as startTime, triggered_time as endTime
insert into JoinStream;
@info(name = 'findLowSalesProduct')
from JoinStream join SalesAggregation
within startTime, endTime
per 'seconds'
select endTime, emailToBeSent, categoryName, productName, sum(totalSales) as totalPastSales
group by categoryName, productName
having totalPastSales < 100
order by totalPastSales desc
limit 1
insert into PastSalesStream;
@info(name = 'findSellersWhoSoldHighestQuantity')
from PastSalesStream as PS join SalesAggregation as SA
on PS.categoryName == SA.categoryName and PS.productName == SA.productName
within time:timestampInMilliseconds(time:dateSub(time:currentDate(), 29, 'DAY', 'yyyy-MM-dd'), 'yyyy-MM-dd'), endTime
per 'seconds'
select emailToBeSent, PS.categoryName, PS.productName, totalPastSales as last10DaysSales, sellerName, sum(totalSales) as sellersSales
group by categoryName, productName, sellerName
order by sellersSales desc
limit 5
insert into DailyDealsSellerCandidates;
@info(name = 'emailFormatting')
from DailyDealsSellerCandidates[emailToBeSent == true]#window.batch()
select time:dateAdd(time:currentDate(), 1, 'DAY', 'yyyy-MM-dd') as dailyDealDate, str:concat("Daily Deals Seller Candidate for ", time:dateAdd(time:currentDate(), 1, 'DAY', 'yyyy-MM-dd')) as emailSubject, productName, categoryName, last10DaysSales, str:groupConcat(str:fillTemplate("<tr><td>{{1}}</td><td>{{2}}</td></tr>", sellerName, sellersSales), "") as candidateInfo
insert into EmailNotificationStream;
Source view of the Siddhi apps.
Siddhi App 1
Siddhi App 2
Below are the flow diagram of the above Siddhi Apps.
Siddhi App 1
Siddhi App 2
Testing¶
NOTE: In the provided Siddhi app, there are some environmental variables (EMAIL_USERNAME,EMAIL_PASSWORD, SENDER_EMAIL_ADDRESS and RECEIVER_EMAIL_ADDRESS) used which are required to be set to send an email alert based on the Siddhi queries defined. Moreover, we are using a MySQL database backend to persist aggregation data.
Hence, make sure to set the environmental variables with the proper values in the system
-
SENDER_EMAIL_ADDRESS: Email address of the account used to send email alerts. (eg: 'siddhi.gke.user@gmail.com')
-
EMAIL_USERNAME: Username of the email account which used to send email alerts. (eg: 'siddhi.gke.user')
-
EMAIL_PASSWORD: Password of the email account which used to send email alerts. (eg: 'siddhi123')
-
RECEIVER_EMAIL_ADDRESS: Email address of the account used to receive email alerts.
-
MYSQL_URL: MySQL url used to connect to the database. (eg: 'jdbc:mysql://localhost:3306/testdb?useSSL=false')
-
MYSQL_USERNAME: Username of the MySQL database used to persist aggregated data.
-
MYSQL_PASSWORD: Password of the MySQL database used to persist aggregated data.
Setup MySQL¶
- Download and Install MySQL database as per the guidelines https://www.mysql.com/downloads/
- Log in to the MySQL server and create a database called “testdb”
- Download the MySQL client connector jar and add it to
jars
(if it is non OSGi) orbundles
(if it is OSGi bundle) directory of Siddhi distribution
Tryout¶
When you run the Siddhi app in the editor, you will see below logs getting printed in the editor console.
- You could simply simulate some events directly into the stream and test your Siddhi app in the editor itself.
- You can also send data for the past 5 days using the siddhi mock data generator.
- Then, you can also simulate some events through HTTP to test the application. The following sections explain how you can test the Siddhi app via HTTP using cURL.
Run Mock Data Generator¶
In the provided Siddhi app, there is an HTTP source configured to receive sales data. For simplicity, you will be mocking this data through a generator . Please download the mock server jar and run that mock service by executing the following command.
java -jar siddhi-mock-data-generator-2.0.0.jar
Invoking the Siddhi App¶
As per the Siddhi app that you wrote in the 'Implementation' section, each day at 11pm an email will be generated with the candidates for the Daily Deal tomorrow.
Furthermore, this can be generated by sending a POST request to http://localhost:8080/dailyDealsAlert
.
curl -X POST \
http://localhost:8080/dailyDealsAlert \
-k \
-H 'Content-Type: application/json' \
-d '{
"emailToBeSent": true
}'
If you invoke the above cURL request email alert will be triggered. You can also observe the logs along with the details sent in the email.
DailyDealsCandidatesAlert : DailyDealsSellerCandidates : [
{"event":{"emailToBeSent":true,"categoryName":"Accessories","productName":"Earring","last10DaysSales":80,"sellerName":"Malfoy","sellersSales":20}},
{"event":{"emailToBeSent":true,"categoryName":"Accessories","productName":"Earring","last10DaysSales":80,"sellerName":"George","sellersSales":20}},
{"event":{"emailToBeSent":true,"categoryName":"Accessories","productName":"Earring","last10DaysSales":80,"sellerName":"Harry","sellersSales":20}},
{"event":{"emailToBeSent":true,"categoryName":"Accessories","productName":"Earring","last10DaysSales":80,"sellerName":"Molly","sellersSales":10}},
{"event":{"emailToBeSent":true,"categoryName":"Accessories","productName":"Earring","last10DaysSales":80,"sellerName":"Fred","sellersSales":10}}
]
Note: The configurations provided in the email sink along with the environment properties will work for Gmail, but if you use other mail servers, please make sure to change the config values accordingly.
Deployment¶
Once you are done with the development, export the Siddhi app that you have developed with 'File' -> 'Export File' option.
You can deploy the Siddhi app using any of the methods listed below.
Deploy on VM/ Bare Metal¶
Prerequisites¶
First, please make sure that necessary prerequisites are met as given the Testing section. MySQL is required to try out the use case.
Then, as given in Setup MySQL section. Download the MySQL database and install it. Then create a database called “testdb” in the MySQL database.
Environmental variables related to Email and MySQL needs to be exported.
Siddhi Runner Configuration¶
- Download the latest Siddhi Runner distribution.
- Unzip the
siddhi-runner-x.x.x.zip
. - Configure the necessary environmental variables. Refer Testing
-
Start Siddhi app with the runner config by executing the following commands from the distribution directory.
Linux/Mac : ./bin/runner.sh -Dapps=<siddhi-files-directory> Windows : bin\runner.bat -Dapps=<siddhi-files-directory>
-
If pointing to different DB, run the mock data generator to add data for past 5 days sales. Download the mock data generator. Execute the below command to run the mock data generator.
java -jar siddhi-mock-data-generator-2.0.0.jar
-
Invoke the dailyDealsAlert service with the following cURL request. You can set
emailToBeSent
as false to not send an email but only to observe the logs.curl -X POST \ http://localhost:8080/dailyDealsAlert \ -k \ -H 'Content-Type: application/json' \ -d '{ "emailToBeSent": true }'
-
You can see the output log in the console. Here, you will be able to see the alert log printed as shown below.
-
At the same time, you will also receive the email alert if
emailToBeSent
is true.
Deploy on Docker¶
Prerequisites¶
MySQL is an external dependency for this use case. Hence, you could use the corresponding docker artifacts to test the requirement.
-
First, you can create a docker network for the deployment as shown below
docker network create siddhi-tier --driver bridge
-
Then, you can get the MySQL docker image from here and run it with below command. We are going to use mysql version 5.7.27.
-
Start the MySQL docker images with below command,
docker run -d --name mysql-server --network siddhi-tier -e MYSQL_DATABASE=testdb -e MYSQL_ROOT_PASSWORD=root mysql:5.7.27
Info
Check if the image is started correctly by using
docker ps
. The above command will start mysql server with credentialsroot:root
and create a databasetestdb
-
Now, you have configured necessary prerequisites that required to run the use case.
Siddhi Docker Configuration¶
Siddhi docker artifacts can be exported from the Editor UI as follows,
-
Start the Siddhi tooling runtime and go to the editor UI in http://localhost:9390/editor
Follow below steps to start the Siddhi tooling runtime. * Extract the downloaded zip and navigate to
/bin. (TOOLING_HOME refers to the extracted folder) * Issue the following command in the command prompt (Windows) / terminal (Linux/Mac) For Windows: tooling.bat For Linux/Mac: ./tooling.sh
-
Go to Export -> For Docker Option to export and push the docker image.
-
Step 2: No need to template siddhi apps as it is already a template
-
Step 4: Fill the template variables
Setting MYSQL_URL
Since we are using MySQL docker container MySQL URL should be
jdbc:mysql://<mysql docker container name>:3306/<created DB>?useSSL=false
i.ejdbc:mysql://mysql-server:3306/testdb?useSSL=false
by default
-
Then, you can run the Siddhi docker image that you created with necessary external dependencies to work with MySQL.
docker run --network siddhi-tier -p 8080:8080 -it <docker-image-name>
Info
Here port 8080 is bound with localhost:8080 to run the mock-data generator
-
You can use the sample mock data generator to add data for past 5 days sales. Download the mock data generator. Execute the below command to run the mock data generator.
java -jar siddhi-mock-data-generator-2.0.0.jar
-
Invoke the dailyDealsAlert service with the following cURL request. You can set
emailToBeSent
as false to not send an email but only to observe the logs.curl -X POST \ http://localhost:8080/dailyDealsAlert \ -k \ -H 'Content-Type: application/json' \ -d '{ "emailToBeSent": true }'
-
You can see the output log in the console. Here, you will be able to see the alert log printed as shown below.
Deploy on Kubernetes¶
Prerequisites¶
MySQL is an external dependency for this use case. Hence, you could use the corresponding docker artifacts to test the requirement.
-
It is advisable to create a namespace in Kubernetes to follow below steps.
kubectl create ns agg-guide
-
There are some prerequisites that you should meet to tryout below SiddhiProcess. Such as configure MySQL database in Kubernetes. First, configure the MySQL server within the created namespace as in Step 1. You can use the official helm chart provided for MySQL.
-
First, install the MySQL helm chart as shown below,
helm install --name mysql-server --namespace=agg-guide --set mysqlRootPassword=root,mysqlUser=root,mysqlDatabase=testdb stable/mysql
Here, you can define the root password to connect to the MYSQL database and also define the database name. BTW, make sure to do
helm init --tiller-namespace=agg-guide
if it is not done yet.Verify pods are running with
kubectl get pods --namespace=agg-guide
-
Then, you can set a port forwarding to the MySQL service which allows you to connect from the Host.
kubectl port-forward svc/mysql-server 13300:3306 --namespace=agg-guide
-
Then, you can login to the MySQL server from your host machine as shown below.
-
-
Afterwards, you can install Siddhi Operator
-
To install the Siddhi Kubernetes operator run the following commands.
kubectl apply -f https://github.com/siddhi-io/siddhi-operator/releases/download/v0.2.1/00-prereqs.yaml --namespace=agg-guide kubectl apply -f https://github.com/siddhi-io/siddhi-operator/releases/download/v0.2.1/01-siddhi-operator.yaml --namespace=agg-guide
-
You can verify the installation by making sure the following deployments are running in your Kubernetes cluster.
-
-
Enable ingress
-
Mandatory Commands for all clusters
kubectl apply -f https://raw.githubusercontent.com/kubernetes/ingress-nginx/master/deploy/static/mandatory.yaml
-
Docker for Mac/Docker for Windows
kubectl apply -f https://raw.githubusercontent.com/kubernetes/ingress-nginx/master/deploy/static/provider/cloud-generic.yaml
-
Minikube
minikube addons enable ingress
-
GKE
kubectl apply -f https://raw.githubusercontent.com/kubernetes/ingress-nginx/master/deploy/static/provider/cloud-generic.yaml
-
-
Configure Kubernetes cluster IP as “siddhi” hostname in your /etc/hosts file.
- Minikube: add minikube IP to the “/etc/hosts” file as host “siddhi”, Run “minikube ip” command in terminal to get the minikube IP.
- Docker for Mac: use 0.0.0.0 to the /etc/hosts file as host “siddhi”.
- Docker for Windows: use IP that resolves to host.docker.internal in the /etc/hosts file as host “siddhi”.
- GKE: Obtain the external IP (EXTERNAL-IP) of the Ingress resources by listing down the Kubernetes Ingresses.
Siddhi Kubernetes configuration¶
-
To deploy the above created Siddhi app, we have to create custom resource object yaml file (with the kind as SiddhiProcess). Kubernetes CRD can be exported from the Siddhi tooling runtime with the editor UI in http://localhost:9390/editor
-
Select the Export -> For Kubernetes Option
-
Steps 1- 5 is the same for both artifacts.
-
In Step 6, push the docker image with different name to Docker deployment.
Info
This docker image will be different to the one pushed in Docker deployment, since only base image with additional bundles will be push to the docker in Kubernetes deployment.
Info
Ensure that the pushed docker image is public
-
Step 7 : Let's create non-distributed and stateless deployment
-
Extract siddhi-kubernetes.zip
-
-
Now, let’s create the above resource in the Kubernetes cluster with below command.
kubectl --namespace=agg-guide apply -f <absolute-path>/siddhi-kubernetes/siddhi-process.yaml
Once, siddhi apps are successfully deployed. You can verify its health with below Kubernetes commands
-
You can use the sample mock data generator to add data for past 5 days sales. Download the mock data generator. Execute the below command to run the mock data generator.
java -jar siddhi-mock-data-generator-2.0.0.jar siddhi/siddhi-aggregation-guide-0/8080
Hostname of the HTTP Source
Here the hostname will include deployment name along with the port with syntax,
<IP>/<DEPLOYMENT_NAME>/<PORT>
. The above sample is for artifacts generated with process name,siddhi-aggregation-guide
in Step 6 of the Export Dialog. -
Invoke the dailyDealsAlert service with the following cURL request. You can set
emailToBeSent
as false to not send an email but only to observe the logs.curl -X POST \ http://siddhi/siddhi-aggregation-guide-1/8080/dailyDealsAlert \ -k \ -H 'Content-Type: application/json' \ -d '{ "emailToBeSent": true }'
Hostname of the HTTP Source
Here the hostname will include deployment name along with the port with syntax,
<IP>/<DEPLOYMENT_NAME>/<PORT>
. The above sample is for artifacts generated with process name,siddhi-aggregation-guide
in Step 6 of the Export Dialog. -
You can see the output log in the console. Here, you will be able to see the alert log printed as shown below.
Refer here to get more details about running Siddhi on Kubernetes.