Skip to content

Retrieve and Publish Data from/to Various Enterprise Systems

This guide illustrates how you can receive data from various enterprise systems, process it, and then send processed data into other systems.

Scenario

Paris becomes very busy in the rush hours. Most of the people trying to go to offices in the morning and children try to go to school. In the afternoon all those people trying to go back to their premises. The traffic becomes high in those hours. If there is a bad weather condition in those rush hours the traffic might increase up to a certain level.

Partis Transport Management Authority (Paris TMA) is accountable for handling all the traffic in Paris. They have noticed this increase in traffic due to the bad weather conditions in several areas. To reduce this traffic, they try to build a system that notifies all users if there are bad weather conditions in a particular area. Paris TMA uses a centralized database to receive notifications about bad weather conditions. Besides, they are tracking locations of all the vehicles in Paris city. If some incident happens, this system will notify all the users that are near to that area.

After completing this guide you will understand how to receive various inputs from heterogeneous systems (like NATS, RDBMS, MongoDB), process it, and send back processed outputs in various formats like Email notification.

What You'll Build

The implementation of the ParisTMA system represented in the following architecture diagram. The overall architecture can be expressed as the following subsystems.

Vehicle Locator Subsystem

The vehicle locator subsystem is responsible for receiving the location of each subscriber and update subscriber database collection with their current location. The process of this subsystem showed in step 01-03 in the architecture diagram. The location of a vehicle, input to the NATS cluster as a JSON object like below.

1
2
3
4
{
    "vehicleId": "v001",
    "currentArea": "a002"
}

The Siddhi app called ParisTMAVehicleLocator will listen to those messages that come to the NATS cluster and periodically update the Mongo database with those current locations. This Mongo database contained all the information relevant to subscribers along with their current location.

Notifier Subsystem

The process of the notifier subsystem illustrated in step 04-07 in the architecture diagram. The notifier subsystem triggers the process when our system receives weather update to the RDBMS as shown in step 04. The ParisTMANotifier Siddhi app will handle all the logical execution of this notification process. There exists a CDC source which listens to the changes of the RDBMS. When some weather notification inserts into the RDBMS, the ParisTMANotifier Siddhi app retrieves this insert event. That insert event will contain the area that this weather incident happened. Using that area, ParisTMANotifier filters out all the subscribers that are in that given area using the subscriber collection in the Mongo database. To all those subscribers ParisTMANotifier will send an email notification about this incident.

Architecture Diagram

Prerequisites

Mandatory Requirements

  1. Siddhi tooling VM/Local distribution
  2. One of the Siddhi runner distributions
    1. VM/Local Runtime
    2. Docker Image
    3. K8s Operator (commands are given in deployment section)
  3. MySQL database
  4. Mongo database
  5. NATS streaming server
  6. Java 8 or higher

Requirements needed to deploy Siddhi in Docker/Kubernetes

  1. Docker
  2. Kubernetes cluster
    1. Minikube
    2. Google Kubernetes Engine(GKE)
    3. Docker for Mac
  3. Helm
  4. Refer to this documentation about Configuring a Google Kubernetes Engine (GKE) Cluster to deploy Siddhi apps in GKE.

Implementation

As illustrated in the previous architecture diagram you will need Siddhi apps called ParisTMAVehicleLocator and ParisTMANotifier.

  1. Start the Siddhi tooling runtime and go to the editor UI in http://localhost:9390/editor
  2. Use the following steps to start the Siddhi tooling runtime. Extract the downloaded zip and navigate to /bin. (TOOLING_HOME refers to the extracted folder). Use the following command in the command prompt (Windows) / terminal (Linux/Mac).
    1
    2
    For Windows: tooling.bat
    For Linux/Mac: ./tooling.sh
    
  3. Select File -> New option, then you could either use the source view or design view to write/build the Siddhi Application. You can find the Siddhi Application bellow, that implements the requirements mentioned above.

Let’s write (develop) the Siddhi Application, as given below.

Once the Siddhi app is created, you can use the Event Simulator option in the editor to simulate events to streams and perform developer testing. The implementation of those apps is described in the following subtopics.

Siddhi Query Guide

The execution steps and the logic of the Siddhi query described as comments in the following Siddhi app. Therefore here we are not going to explain in detail here. For more details about Siddhi queries please refer Siddhi query guide.

Vehicle Locator

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@App:name("ParisTMAVehicleLocator")
@App:description("Listen to the events send by NATS about vehicle locations and periodically update the Mongo database.")

/*
Purpose:
    The ParisTMA has subscribers that subscribe to get the service. All the subscriber details are saved in the Mongo database. In that database ParisTMA stores the current location of subscriber vehicles. The events related to current locations come as TCP requests through NATS. This app listens to those events and eventually update the database with the current location of each subscriber.

Input:
        NATS event with JSON payload: 
        {
            "vehicleId": "v001",
            "currentArea": "a002"
        }

Output:
    Update the collection(subscriber) documents of MongoDB(parisTma).

*/

-- NATS source that listens to the events
@source(
    type='nats', 
    cluster.id='${STAN_CLUSTER_ID}', 
    destination='VehicleInputTunnel', 
    bootstrap.servers='${STAN_SERVER_URL}', 
    @map(type='json')
)
define stream CurrentVehicleAreaInputStream(vehicleId string, currentArea string);

-- MongoDB store
@store(type='mongodb', mongodb.uri='${MONGODB_URI}')
define table subscriber(vehicleId string, currentArea string);

-- Retrieves events from NATS source and update in MongoDB
@info(name='update-or-insert-subscriber-location') 
from CurrentVehicleAreaInputStream
update or insert into subscriber 
on subscriber.vehicleId==vehicleId;

Notifier

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
@App:name("ParisTMANotifier")
@App:description("Listen to the CDC events about the weather updates and send notifications to the subscribed users.")

/*
Purpose:
    The ParisTMA has subscribers that subscribe to get the service. All the subscriber details are saved in the Mongo database. In that database ParisTMA stores the current location of subscriber vehicles. There is a separate system handle by the Paris weather forecast organization. They periodically update the MySQL database called 'paristma' with recent weather incidents. This app listens to those insertions using CDC and sends notifications to the subscribers who are in that particular area that incident happens.

Input:
    CDC insertions.

Output:
    Email to users that notifying about the incident. 
    Note: Even you can change this email notification to an SMS notification calling an SMS provider.
*/

-- CDC source that listens to insertions of the Incident table
@source(
    type='cdc',
    url='${MYSQL_URL}',
    username='siddhi_user',
    password='siddhiio',
    table.name='Incident',
    operation='insert',
    @map(
        type='keyvalue'
    )
)
define stream IncidentInputStream(incidentId int, incidentName string, incidentType string, incidentDetails string, incidentArea string);

-- MongoDB store of subscribers
@store(type='mongodb', mongodb.uri='${MONGODB_URI}')
define table subscriber(vehicleId string, currentArea string, subscriberName string, subscriberEmail string);

-- Email sink to send notifications
@sink(type='email', username="${PARIS_TMA_EMAIL}", address="${PARIS_TMA_EMAIL}", password="${PARIS_TMA_EMAIL_PASSWORD}", 
      subject="Weather Notification in {{incidentArea}}", to="{{subscriberEmail}}", host="smtp.gmail.com", port="465", 
      ssl.enable="true", auth="true", 
      @map(type='text', 
           @payload("""
            Hi {{subscriberName}}

            Recent reports state that there is {{incidentName}} in your area {{incidentArea}}. It is in {{incidentType}} state right now. 

            The report state that {{incidentDetails}}.

            Thanks, 
            Paris Transport Management Authority""")))
define stream UserNotificationStream (subscriberName string, subscriberEmail string, incidentName string, incidentType string, incidentDetails string, incidentArea string);

-- Listen to the insertions in IncidentInputStream and join it with subscriber collection in MongoDB to get user information. Send notifications using that user information.
@info(name='listen-and-notify') 
from IncidentInputStream#window.length(1) join subscriber
on IncidentInputStream.incidentArea==subscriber.currentArea
select 
    subscriber.subscriberName, 
    subscriber.subscriberEmail, 
    IncidentInputStream.incidentName, 
    IncidentInputStream.incidentType, 
    IncidentInputStream.incidentDetails, 
    IncidentInputStream.incidentArea
insert into UserNotificationStream;

Testing

Setup MongoDB

Refer this link to install MongoDB in your local machine. The ParisTMA system uses a Mongo database called parisTma and that parisTma collection contained a collection called the subscriber. In order to connect to the Mongo database, the Siddhi runtime will use a user called siddhi_user that identified by password siddhiio. You can create those MongoDB databases, collections, users and insert data using the following MongoDB commands.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
> use parisTma
> db.createCollection("subscriber")
> db.createUser(
   {    user: "siddhi_user",
      pwd: "siddhiio",
      roles:[{role: "userAdmin" , db:"parisTma"}]
   }
)

> db.subscriber.insert({
   vehicleId: 'v001',
   currentArea: 'a001',
   subscriberName: 'John Dus',
   subscriberEmail: 'b.wathsala.bw@gmail.com'
})

> db.subscriber.insert({
   vehicleId: 'v002',
   currentArea: 'a002',
   subscriberName: 'Natalie Jonson',
   subscriberEmail: 'b.wathsala.bw@gmail.com'
})

> db.subscriber.insert({
   vehicleId: 'v003',
   currentArea: 'a003',
   subscriberName: 'Mark Norman',
   subscriberEmail: 'buddhik@wso2.com'
})

Setup MySQL

After that, you have to create a MySQL database called paristma. It contained a table called Incident to store all the weather incidents happened recently. To access the MySQL database you also need to create a user called siddhi_user that identified by password siddhiio. The incident table will look like the following.

MySQL Incident Table

To connect to the MySQL database Siddhi tooling runtime needs the MySQL client library. You can download the MySQL JAR from here and then add the JAR to the ${TOOLING_HOME}/jars directory.

This example uses the Siddhi CDC in the default(listening) mode. The listening mode needs to change the configurations as described here. For example, you have to add the following configurations to the my.cnf file.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
[mysqld]
server-id         = 223344
log_bin           = mysql-bin
binlog_format     = row
binlog_row_image  = full
expire_logs_days  = 10
gtid_mode                 = on
enforce_gtid_consistency  = on
binlog_rows_query_log_events = on
log-error=/var/log/mysql/mysql.err
log-bin = /var/log/mysql/mysql-replication.log

Setup NATS Streaming Cluster

Now you need to install a NATS streaming server. For example, to install the NATS streaming cluster in MacOS use the following commands.

1
$ brew install nats-streaming-server

After the installation runs the NATS streaming cluster as below.

1
$ nats-streaming-server

The Siddhi app runtime will need the following JARs to connect to the NATS streaming server. Download those JARs using the following links and add those JARs to ${TOOLING_HOME}/jars directory.

It also needs the Protobuf bundle. Download the Protobuf bundle using the following link and add the bundle to ${TOOLING_HOME}/bundles directory.

Configurations

The above Siddhi apps receive the MySQL, MongoDB, and NATS connections details as environment variables. For local deployments for testing these apps, you have to set up those environmental variables as follows.

  1. STAN_CLUSTER_ID=test-cluster
  2. STAN_SERVER_URL=nats://localhost:4222
  3. MONGODB_URI=mongodb://127.0.0.1:27017/parisTma?&gssapiServiceName=mongodb
  4. MYSQL_URL=jdbc:mysql://127.0.0.1:3306/paristma
  5. PARIS_TMA_EMAIL=<TEST_EMAIL>
  6. PARIS_TMA_EMAIL_PASSWORD=<TEST_EMAIL_PASSWORD>

Tryout

After all those configurations and setups you will be able to starts the both Siddhi apps without any error. Testing the app can be done in two levels.

  1. Update the vehicle location periodically
  2. Send notifications

To test the vehicle location updates you have to send a request to the NATS streaming server. You can send the request using various NATS clients. In this use case, you can use NATS streaming Go language client to easily send requests. To do that you need to have to install Golang in your machine. Or else you can use other clients like (Java, JS, Python, or Ruby etc). You can use following Golang commands to send a request to the NATS streaming cluster.

1
2
$ go get github.com/nats-io/stan.go/
$ go run $GOPATH/src/github.com/nats-io/stan.go/examples/stan-pub/main.go -s localhost:4222 -c test-cluster VehicleInputTunnel "{\"vehicleId\": \"v001\", \"currentArea\":\"a003\"}"

To run the notification Siddhi app you just need to insert data entry to the Incident table. When you do that it the Siddhi app will simply send an email to the subscribers like below.

Email

Deployment

Deploy on VM/ Bare Metal

  1. First, you have to set up prerequisites as described in the previous sections.
    1. Setup MongoDB
    2. Setup MySQL
    3. Setup NATS Streaming
  2. Download the Siddhi runner distribution pack from here and unzip it.
  3. Download relevant JARs and add those JARs to ${RUNNER_HOME}/jars directory.
    1. MySQL
    2. Java NATS streaming
    3. JNATS
  4. Download Protobuf bundle and add the bundle to ${RUNNER_HOME}/bundles directory.
    1. Protobuf
  5. Copy your Siddhi file into <RUNNER_HOME>/wso2/runner/deployment/siddhi-files
  6. Start the following binary file to run the Siddhi runner server.
    1
    2
    For Linux/Mac: <RUNNER_HOME>/bin/runner.sh
    For Windows: <RUNNER_HOME>/bin/runner.bat
    
  7. Now you can try out the sample as described in this tryout section.

Deploy on Docker

Prerequisite

Install Docker in your machine.

Siddhi Docker Configurations

In the tooling editor itself, you can export your Siddhi app into a runnable docker artifact. You can go to Export->For Docker and it will give to a zip file that contained the following files.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
├── Dockerfile
├── bundles
│   └── protobuf-java-3.6.1.jar
├── configurations.yaml
├── jars
│   ├── java_nats_streaming_2.1.2.jar
│   ├── jnats_2.3.0.jar
│   └── mysql-connector-java-5.1.45-bin.jar
└── siddhi-files
    ├── ParisTMANotifier.siddhi
    └── ParisTMAVehicleLocator.siddhi

When you export the apps as a Docker, it will ask for the templated values to be filled. You have to use the following values for those templated values.

  1. STAN_CLUSTER_ID=test-cluster
  2. STAN_SERVER_URL=nats://nats:4222
  3. MONGODB_URI=mongodb://mongodb:27017/parisTma?&gssapiServiceName=mongodb
  4. MYSQL_URL=jdbc:mysql://mysqldb:3306/paristma
  5. PARIS_TMA_EMAIL=<TEST_EMAIL>
  6. PARIS_TMA_EMAIL_PASSWORD=<TEST_EMAIL_PASSWORD>

In order to change the configurations of MySQL Docker container you have create my.cnf file in your current directory and add the following content to that file. Also create directory called mysql.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
[mysqld]
server-id         = 223344
log_bin           = mysql-bin
binlog_format     = row
binlog_row_image  = full
expire_logs_days  = 10
gtid_mode                 = on
enforce_gtid_consistency  = on
binlog_rows_query_log_events = on
log-error=/var/log/mysql/mysql.err
log-bin = /var/log/mysql/mysql-replication.log

Now you need to have a Docker compose file like below to set up all the prerequisites. This compose file contains volume mounts to change configurations of the MySQL container.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
version: "3"
services:
    backend:
        container_name: paristma-notifier
        user: 802:802
        build:
          context: .
          dockerfile: ./Dockerfile
        links:
            - mysqldb
            - mongodb
            - nats
        networks:
            - default
        restart: on-failure

    mysqldb:
        image: 'mysql:5.7'
        container_name: paristma-mysqldb
        environment:
            MYSQL_USER: siddhi_user
            MYSQL_PASSWORD: siddhiio
            MYSQL_ROOT_PASSWORD: siddhiio
        ports:
            - "3304:3306"
        networks:
            - default
        restart: on-failure
        volumes:
            - ./my.cnf:/etc/my.cnf
            - ./mysql:/var/log/mysql
        command: --sql_mode=""

    mongodb:
        image: 'mongo:4.0.4'
        container_name: paristma-mongodb
        ports:
            - "27017-27019:27017-27019"
        networks:
            - default
        restart: on-failure

    nats:
        image: 'nats-streaming:0.16.2-linux'
        container_name: paristma-nats
        ports:
            - "4223:4223"
            - "8223:8223"
        networks:
            - default
        restart: on-failure

Now you can start each containers.

  1. First, you need to build the Docker composer.
    1
    docker-compose build
    
  2. Starts the MySQL container using the following command and set up the MySQL database as described above.
    1
    docker-compose up -d mysqldb
    
  3. Then, starts the MongoDB container using the following command and set up the MongoDB database as described above.
    1
    docker-compose up -d mongodb
    
  4. After that, starts the NATS container using the following command.
    1
    docker-compose up -d nats
    
  5. Finally, start the Siddhi backend runtime.
    1
    docker-compose up -d backend
    
    When you insert data entry to the Incident table, the Siddhi app will simply send an email to the subscribers like below.

Email

Deploy on Kubernetes

Prerequisites

  1. Kubernetes cluster
    1. Minikube
    2. Google Kubernetes Engine(GKE)
    3. Docker for Mac
  2. Install HELM

Siddhi Kubernetes Configurations

In the tooling editor itself, you can export your Siddhi app into a runnable Kubernetes artifact. You can go to Export->For Kubernetes and it will give to a zip file that contained the following files.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
├── Dockerfile
├── bundles
│   └── protobuf-java-3.6.1.jar
├── configurations.yaml
├── jars
│   ├── java_nats_streaming_2.1.2.jar
│   ├── jnats_2.3.0.jar
│   └── mysql-connector-java-5.1.45-bin.jar
├── siddhi-files
│   ├── ParisTMANotifier.siddhi
│   └── ParisTMAVehicleLocator.siddhi
└── siddhi-process.yaml

When you export the apps as a Kubernetes, it will ask for the templated values to be filled. You have to use the following values for those templated values.

  1. STAN_CLUSTER_ID=siddhi-stan
  2. STAN_SERVER_URL=nats://siddhi-nats:4222
  3. MONGODB_URI=mongodb://siddhi_user:siddhiio@paristma-mongodb:27017/parisTma
  4. MYSQL_URL=jdbc:mysql://mysqldb:3306/paristma
  5. PARIS_TMA_EMAIL=<TEST_EMAIL>
  6. PARIS_TMA_EMAIL_PASSWORD=<TEST_EMAIL_PASSWORD>

Setup MongoDB

Let’s install MongoDB in your Kubernetes cluster using Helm.

1
2
$ helm install stable/mongodb
$ helm install --name paristma --set volumePermissions.enabled=true,mongodbRootPassword=siddhiio,mongodbUsername=siddhi_user,mongodbPassword=siddhiio,mongodbDatabase=parisTma stable/mongodb 

Now you can access the MongoDB externally using following commands and set up the Mongo database as described previously.

1
2
3
$ export MONGODB_ROOT_PASSWORD=$(kubectl get secret --namespace default paristma-mongodb -o jsonpath="{.data.mongodb-root-password}" | base64 --decode)

$ kubectl run --namespace default paristma-mongodb-client --rm --tty -i --restart='Never' --image bitnami/mongodb --command -- mongo admin --host paristma-mongodb --authenticationDatabase admin -u root -p $MONGODB_ROOT_PASSWORD

Setup MySQL

In order to change the MySQL configurations for the Siddhi CDC extension, you need to create a file called values.yaml with including the following configurations.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
mysqlRootPassword: siddhiio
mysqlUser: siddhi_user
mysqlPassword: siddhiio
mysqlDatabase: paristma
configurationFiles:
    my.cnf: |-
        [mysqld]
        server-id         = 223344
        log_bin           = mysql-bin
        binlog_format     = row
        binlog_row_image  = full
        expire_logs_days  = 10
        gtid_mode                 = on
        enforce_gtid_consistency  = on
        binlog_rows_query_log_events = on
        log-error=/var/log/mysql/mysql.err
        log-bin = /var/log/mysql/mysql-replication.log

Now you can install MySQL in you Kubernetes cluster using Helm.

1
$ helm install --name mysqldb -f values.yaml stable/mysql

Use Kubernetes port forwading to access MySQL externally using 3307 port and create nessasary tables in the MySQL database as described previously.

1
$ kubectl port-forward svc/mysql-db 3307:3306

Setup NATS Streaming

Use the following kubectl commands to set up the NATS streaming cluster.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
$ kubectl apply -f https://github.com/nats-io/nats-operator/releases/download/v0.5.0/00-prereqs.yaml
$ kubectl apply -f https://github.com/nats-io/nats-operator/releases/download/v0.5.0/10-deployment.yaml
$ kubectl apply -f https://github.com/nats-io/nats-streaming-operator/releases/download/v0.2.2/default-rbac.yaml
$ kubectl apply -f https://github.com/nats-io/nats-streaming-operator/releases/download/v0.2.2/deployment.yaml

$ echo '
---
apiVersion: "nats.io/v1alpha2"
kind: "NatsCluster"
metadata:
  name: "siddhi-nats"
spec:
  size: 1
' | kubectl apply -f -

$ echo '
---
apiVersion: "streaming.nats.io/v1alpha1"
kind: "NatsStreamingCluster"
metadata:
  name: "siddhi-stan"
spec:
  size: 1
  natsSvc: "siddhi-nats"
' | kubectl apply -f -

For more details about the NATS streaming server, refer to this documentation.

Setup Siddhi Operator

Use the following kubectl commands to install the Siddhi operator in your Kubernetes cluster.

1
2
$ kubectl apply -f https://github.com/siddhi-io/siddhi-operator/releases/download/v0.2.0/00-prereqs.yaml
$ kubectl apply -f https://github.com/siddhi-io/siddhi-operator/releases/download/v0.2.0/01-siddhi-operator.yaml

Now you need to create your own docker image with including all the custom libraries and configuration changes that you have made. Use following command to build and push the docker image with the tag <DOCKER_HUB_USER_NAME>/siddhi-runner-alpine:latest.

1
2
$ docker build -t <DOCKER_HUB_USER_NAME>/siddhi-runner-alpine:latest .
$ docker push <DOCKER_HUB_USER_NAME>/siddhi-runner-alpine:latest
After the Kubernetes export now you already have this siddhi-process.yaml file.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
apiVersion: siddhi.io/v1alpha2
kind: SiddhiProcess
metadata:
  name: paris-tma

spec:
  apps:
  - script: |
      @App:name("ParisTMANotifier")
      @App:description("Listen to the CDC events about the weather updates and send notifications to the subscribed users.")

      /*
      Purpose:
          The ParisTMA has subscribers that subscribe to get the service. All the subscriber details are saved in the Mongo database. In that database ParisTMA stores the current location of subscriber vehicles. There is a separate system handle by the Paris weather forecast organization. They periodically update the MySQL database called 'paristma' with recent weather incidents. This app listens to those insertions using CDC and sends notifications to the subscribers who are in that particular area that incident happens.

      Input:
          CDC insertions.

      Output:
          Email to users that notifying about the incident.
          Note: Even you can change this email notification to an SMS notification calling an SMS provider.
      */

      -- CDC source that listens to insertions of the Incident table
      @source(
          type='cdc',
          url='${MYSQL_URL}',
          username='siddhi_user',
          password='siddhiio',
          table.name='Incident',
          operation='insert',
          @map(
              type='keyvalue'
          )
      )
      define stream IncidentInputStream(incidentId int, incidentName string, incidentType string, incidentDetails string, incidentArea string);

      -- MongoDB store of subscribers
      @store(type='mongodb', mongodb.uri='${MONGODB_URI}')
      define table subscriber(vehicleId string, currentArea string, subscriberName string, subscriberEmail string);

      -- Email sink to send notifications
      @sink(type='email', username="${PARIS_TMA_EMAIL}", address="${PARIS_TMA_EMAIL}", password="${PARIS_TMA_EMAIL_PASSWORD}",
            subject="Weather Notification in {{incidentArea}}", to="{{subscriberEmail}}", host="smtp.gmail.com", port="465",
            ssl.enable="true", auth="true",
            @map(type='text',
                 @payload("""
                  Hi {{subscriberName}}

                  Recent reports state that there is {{incidentName}} in your area {{incidentArea}}. It is in {{incidentType}} state right now.

                  The report state that {{incidentDetails}}.

                  Thanks,
                  Paris Transport Management Authority""")))
      define stream UserNotificationStream (subscriberName string, subscriberEmail string, incidentName string, incidentType string, incidentDetails string, incidentArea string);

      -- Listen to the insertions in IncidentInputStream and join it with subscriber collection in MongoDB to get user information. Send notifications using that user information.
      @info(name='listen-and-notify')
      from IncidentInputStream#window.length(1) join subscriber
      on IncidentInputStream.incidentArea==subscriber.currentArea
      select
          subscriber.subscriberName,
          subscriber.subscriberEmail,
          IncidentInputStream.incidentName,
          IncidentInputStream.incidentType,
          IncidentInputStream.incidentDetails,
          IncidentInputStream.incidentArea
      insert into UserNotificationStream;
  - script: |-
      @App:name("ParisTMAVehicleLocator")
      @App:description("Listen to the events send by NATS about vehicle locations and periodically update the Mongo database.")

      /*
      Purpose:
          The ParisTMA has subscribers that subscribe to get the service. All the subscriber details are saved in the Mongo database. In that database ParisTMA stores the current location of subscriber vehicles. The events related to current locations come as TCP requests through NATS. This app listens to those events and eventually update the database with the current location of each subscriber.

      Input:
              NATS event with JSON payload:
              {
                  "vehicleId": "v001",
                  "currentArea": "a002"
              }

      Output:
          Update the collection(subscriber) documents of MongoDB(parisTma).

      */

      -- NATS source that listens to the events
      @source(
          type='nats',
          cluster.id='${STAN_CLUSTER_ID}',
          destination='VehicleInputTunnel',
          bootstrap.servers='${STAN_SERVER_URL}',
          @map(type='json')
      )
      define stream CurrentVehicleAreaInputStream(vehicleId string, currentArea string);

      -- MongoDB store
      @store(type='mongodb', mongodb.uri='${MONGODB_URI}')
      define table subscriber(vehicleId string, currentArea string);

      -- Retrieves events from NATS source and update in MongoDB
      @info(name='update-or-insert-subscriber-location')
      from CurrentVehicleAreaInputStream
      update or insert into subscriber
      on subscriber.vehicleId==vehicleId;
  container:
    env:
    - name: MYSQL_URL
      value: jdbc:mysql://mysqldb:3306/paristma
    - name: MONGODB_URI
      value: mongodb://siddhi_user:siddhiio@paristma-mongodb:27017/parisTma
    - name: PARIS_TMA_EMAIL
      value: <TEST_EMAIL>
    - name: PARIS_TMA_EMAIL_PASSWORD
      value: <TEST_EMAIL_PASSWORD>
    - name: STAN_CLUSTER_ID
      value: siddhi-stan
    - name: STAN_SERVER_URL
      value: nats://siddhi-nats:4222
    image: <DOCKER_HUB_USER_NAME>/siddhi-runner-alpine:latest
  runner: |
    wso2.carbon:
      id: siddhi-runner
      name: Siddhi Runner Distribution
      ports:
        offset: 0
    transports:
      http:
        listenerConfigurations:
        - id: default
          host: 0.0.0.0
          port: 9090
        - id: msf4j-https
          host: 0.0.0.0
          port: 9443
          scheme: https
          keyStoreFile: ${carbon.home}/resources/security/wso2carbon.jks
          keyStorePassword: wso2carbon
          certPass: wso2carbon
        transportProperties:
        - name: server.bootstrap.socket.timeout
          value: 60
        - name: client.bootstrap.socket.timeout
          value: 60
        - name: latency.metrics.enabled
          value: true
    dataSources:
    - name: WSO2_CARBON_DB
      description: The datasource used for registry and user manager
      definition:
        type: RDBMS
        configuration:
          jdbcUrl: jdbc:h2:${sys:carbon.home}/wso2/${sys:wso2.runtime}/database/WSO2_CARBON_DB;DB_CLOSE_ON_EXIT=FALSE;LOCK_TIMEOUT=60000
          username: wso2carbon
          password: wso2carbon
          driverClassName: org.h2.Driver
          maxPoolSize: 10
          idleTimeout: 60000
          connectionTestQuery: SELECT 1
          validationTimeout: 30000
          isAutoCommit: false

Now you can install the SiddhiProcess using following kubectl command. Before you install the SiddhiProcess you have to add the docker image tag in the siddhi-process.yaml file. You have to add the docker image name(/siddhi-runner-alpine:latest) in the YAML entry spec.container.image.

1
$ kubectl apply -f siddhi-process.yaml

If all the Kubernetes artifacts deployed correctly, it will show the status like below.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
$ kubectl get deploy

NAME                      READY     UP-TO-DATE   AVAILABLE   AGE
bold-boxer-mongodb        1/1       1            1           2h
mysqldb                   1/1       1            1           2h
nats-operator             1/1       1            1           2h
nats-streaming-operator   1/1       1            1           2h
paris-tma-0               1/1       1            1           2h
paris-tma-1               1/1       1            1           2h
paristma-mongodb          1/1       1            1           2h
siddhi-operator           1/1       1            1           2h
tiller-deploy             1/1       1            1           2h

$ kubectl get svc

NAME                 TYPE        CLUSTER-IP       EXTERNAL-IP   PORT(S)                      AGE
bold-boxer-mongodb   ClusterIP   10.100.153.173   <none>        27017/TCP                    2h
kubernetes           ClusterIP   10.96.0.1        <none>        443/TCP                      21d
mysqldb              ClusterIP   10.106.112.140   <none>        3306/TCP                     2h
paristma-mongodb     ClusterIP   10.102.211.214   <none>        27017/TCP                    2h
siddhi-nats          ClusterIP   10.110.123.222   <none>        4222/TCP                     2h
siddhi-nats-mgmt     ClusterIP   None             <none>        6222/TCP,8222/TCP,7777/TCP   2h
siddhi-operator      ClusterIP   10.102.157.184   <none>        8383/TCP                     2h
tiller-deploy        ClusterIP   10.103.160.179   <none>        44134/TCP                    2h
$ kubectl get siddhi
NAME        STATUS    READY     AGE
paris-tma   Running   2/2       2h

When you insert data entry to the Incident table, the Siddhi app will simply send an email to the subscribers like below.

Email

Top