Siddhi 5.1 as a Kubernetes Microservice¶
This section provides information on running Siddhi Apps natively in Kubernetes via Siddhi Kubernetes Operator.
Siddhi can be configured using SiddhiProcess
kind and passed to the Siddhi operator for deployment.
Here, the Siddhi applications containing stream processing logic can be written inline in SiddhiProcess
yaml or passed as .siddhi
files via contig maps. SiddhiProcess
yaml can also be configured with the necessary system configurations. For more details about how to configure SiddhiProcess
YAML, refer to this configuration guide which describe the usage of all the YAML specifications.
Prerequisites¶
-
A Kubernetes cluster v1.10.11 or higher.
- Minikube
- Google Kubernetes Engine(GKE) Cluster
- Docker for Mac
- Or any other Kubernetes cluster
- Distributed deployment of Siddhi apps need NATS operator v0.5.0+ and NATS streaming operator v0.2.2+.
- Note that if your Kubernetes version is v1.16 or higher, then use the NATS streaming operator v0.3.0+ versions. If your Kubernetes version is less than v1.16, then you have to use NATS streaming operator v0.2.2 version. The reason for this version incompatibility is Kubernetes v1.16 was removed the apps/v1beta2 API group.
- Admin privileges to install Siddhi operator
Minikube
Siddhi operator automatically creates NGINX ingress. Therefore it to work we can either enable ingress on Minikube using the following command.
minikube addons enable ingressor disable Siddhi operator's automatically ingress creation.
Google Kubernetes Engine (GKE) Cluster
To install Siddhi operator, you have to give cluster admin permission to your account. In order to do that execute the following command (by replacing "your-address@email.com" with your account email address).
kubectl create clusterrolebinding user-cluster-admin-binding --clusterrole=cluster-admin --user=your-address@email.com
Docker for Mac
Siddhi operator automatically creates NGINX ingress. Therefore to enable ingress creation on Docker for Mac apply the prerequisites and Docker for Mac specific commands following the official documentation.
Users can also disable Siddhi Operator automatically creating ingress by following the documentation.
Port Forwarding for Testing & Debugging Purposes
Instead of creating ingress you can enable port forwarding (kubectl port-forward
) to access the application in the Kubernetes cluster. This will help a lot for TCP connections as well.
kubectl port-forward svc/mysql-db 13306:3306For more details please refer this Kubernetes official documentation
Install Siddhi Operator¶
To install the Siddhi Kubernetes operator run the following commands.
kubectl apply -f https://github.com/siddhi-io/siddhi-operator/releases/download/v0.2.2/00-prereqs.yaml
kubectl apply -f https://github.com/siddhi-io/siddhi-operator/releases/download/v0.2.2/01-siddhi-operator.yaml
You can verify the installation by making sure the following deployments are running in your Kubernetes cluster.
$ kubectl get deployment
NAME DESIRED CURRENT UP-TO-DATE AVAILABLE AGE
siddhi-operator 1 1 1 1 1m
Using a custom-built Siddhi runner image
If you need to use a custom-built siddhi-runner
image for all the SiddhiProcess
deployments, you have to configure siddhiRunnerImage
entry in siddhi-operator-config
config map.
Refer the documentation on creating custom Siddhi runner images bundling additional JARs here.
If you are pulling the custom-built image from a private Docker registry/repository, specify the corresponding kubernetes secret as siddhiRunnerImageSecret
entry in siddhi-operator-config
config map. For more details on using docker images from private registries/repositories refer this documentation.
Deploy and run Siddhi App¶
Siddhi applications can be deployed on Kubernetes using the Siddhi operator.
Here we will be creating a very simple Siddhi stream processing application that receives power consumption from several devices in a house. If the power consumption of dryer exceeds the consumption limit of 6000W then that Siddhi app sends an alert from printing a log.
This can be created using a SiddhiProcess YAML file as given below.
apiVersion: siddhi.io/v1alpha2
kind: SiddhiProcess
metadata:
name: power-surge-app
spec:
apps:
- script: |
@App:name("PowerSurgeDetection")
@App:description("App consume events from HTTP as a JSON message of { 'deviceType': 'dryer', 'power': 6000 } format and inserts the events into DevicePowerStream, and alerts the user if the power level is greater than or equal to 600 by printing a message in the log.")
/*
Input: deviceType string and powerConsuption int(Watt)
Output: Alert user from printing a log, if there is a power surge in the dryer. In other words, notify when power is greater than or equal to 600W.
*/
@source(
type='http',
receiver.url='${RECEIVER_URL}',
basic.auth.enabled='false',
@map(type='json')
)
define stream DevicePowerStream(deviceType string, power int);
@sink(type='log', prefix='LOGGER')
define stream PowerSurgeAlertStream(deviceType string, power int);
@info(name='power-filter')
from DevicePowerStream[deviceType == 'dryer' and power >= 600]
select deviceType, power
insert into PowerSurgeAlertStream;
container:
env:
-
name: RECEIVER_URL
value: "http://0.0.0.0:8080/checkPower"
image: "siddhiio/siddhi-runner-ubuntu:5.1.0"
Always listen on 0.0.0.0 with the Siddhi Application running inside a container environment.
If you listen on localhost inside the container, nothing outside the container can connect to your application.
Siddhi Tooling
You can also use the powerful Siddhi Editor to implement and test steam processing applications.
Configuring Siddhi
To configure databases, extensions, authentication, periodic state persistence, and statistics for Siddhi as Kubernetes Microservice refer Siddhi Config Guide.
To deploy the above Siddhi app in your Kubernetes cluster, copy above YAML to a file with name power-surge-app.yaml
and execute the following command.
kubectl create -f <absolute-yaml-file-path>/power-surge-app.yaml
TLS secret
Within the SiddhiProcess, a TLS secret named siddhi-tls
is configured. If a Kubernetes secret with the same name does not exist in the Kubernetes cluster, the NGINX will ignore it and use a self-generated certificate. Configuring a secret will be necessary for calling HTTPS endpoints, refer deploy and run Siddhi apps with HTTPS section for more details.
If the power-surge-app
is deployed successfully, it should create SiddhiProcess, deployment, service, and ingress as following.
$ kubectl get SiddhiProcesses
NAME STATUS READY AGE
power-surge-app Running 1/1 2m
$ kubectl get deployment
NAME READY UP-TO-DATE AVAILABLE AGE
power-surge-app-0 1/1 1 1 2m
siddhi-operator 1/1 1 1 2m
$ kubectl get service
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
kubernetes ClusterIP 10.96.0.1 <none> 443/TCP 2d
power-surge-app-0 ClusterIP 10.96.44.182 <none> 8080/TCP 2m
siddhi-operator ClusterIP 10.98.78.238 <none> 8383/TCP 2m
$ kubectl get ingress
NAME HOSTS ADDRESS PORTS AGE
siddhi siddhi 10.0.2.15 80 2m
Invoke Siddhi Applications
To invoke the Siddhi App, obtain the external IP of the ingress load balancer using kubectl get ingress
command as following.
$ kubectl get ingress
NAME HOSTS ADDRESS PORTS AGE
siddhi siddhi 10.0.2.15 80 2m
Then, add the host siddhi
and related external IP (ADDRESS
) to the /etc/hosts
file in your machine.
Minikube
For Minikube, you have to use Minikube IP as the external IP. Hence, run minikube ip
command to get the IP of the Minikube cluster.
Docker for Mac
For Docker for Mac, you have to use 0.0.0.0
as the external IP.
Use the following CURL command to send events to power-surge-app
deployed in Kubernetes.
curl -X POST \
http://siddhi/power-surge-app-0/8080/checkPower \
-H 'Accept: */*' \
-H 'Content-Type: application/json' \
-H 'Host: siddhi' \
-d '{
"deviceType": "dryer",
"power": 60000
}'
View Siddhi Process Logs
Since the output of power-surge-app
is logged, you can see the output by monitoring the associated pod's logs.
To find the power-surge-app
pod use the kubectl get pods
command. This will list down all the deployed pods.
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
power-surge-app-0-646c4f9dd5-rxzkq 1/1 Running 0 4m
siddhi-operator-6698d8f69d-6rfb6 1/1 Running 0 4m
Here, the pod starting with the SiddhiProcess name (in this case power-surge-app-
) is the pod we need to monitor.
To view the logs, run the kubectl logs <pod name>
command.
This will show all the Siddhi process logs, along with the filtered output events as given below.
$ kubectl logs power-surge-app-0-646c4f9dd5-rxzkq
...
[2019-07-12 07:12:48,925] INFO {org.wso2.transport.http.netty.contractimpl.listener.ServerConnectorBootstrap$HttpServerConnector} - HTTP(S) Interface starting on host 0.0.0.0 and port 9443
[2019-07-12 07:12:48,927] INFO {org.wso2.transport.http.netty.contractimpl.listener.ServerConnectorBootstrap$HttpServerConnector} - HTTP(S) Interface starting on host 0.0.0.0 and port 9090
[2019-07-12 07:12:48,941] INFO {org.wso2.carbon.kernel.internal.CarbonStartupHandler} - Siddhi Runner Distribution started in 6.853 sec
[2019-07-12 07:17:22,219] INFO {io.siddhi.core.stream.output.sink.LogSink} - LOGGER : Event{timestamp=1562915842182, data=[dryer, 60000], isExpired=false}
Get Siddhi process status¶
List Siddhi processes¶
List the Siddhi process using the kubectl get sps
or kubectl get SiddhiProcesses
commands as follows.
$ kubectl get sps
NAME STATUS READY AGE
power-surge-app Running 1/1 5m
$ kubectl get SiddhiProcesses
NAME STATUS READY AGE
power-surge-app Running 1/1 5m
View Siddhi process configs¶
Describe the Siddhi process configuration details using kubectl describe sp
command as follows.
$ kubectl describe sp power-surge-app
Name: power-surge-app
Namespace: default
Labels: <none>
Annotations: kubectl.kubernetes.io/last-applied-configuration={"apiVersion":"siddhi.io/v1alpha2","kind":"SiddhiProcess","metadata":{"annotations":{},"name":"power-surge-app","namespace":"default"},"spec":{"apps":[...
API Version: siddhi.io/v1alpha2
Kind: SiddhiProcess
Metadata:
Creation Timestamp: 2019-07-12T07:12:35Z
Generation: 1
Resource Version: 148205
Self Link: /apis/siddhi.io/v1alpha2/namespaces/default/siddhiprocesses/power-surge-app
UID: 6c6d90a4-a474-11e9-a05b-080027f4eb25
Spec:
Apps:
Script: @App:name("PowerSurgeDetection")
@App:description("App consume events from HTTP as a JSON message of { 'deviceType': 'dryer', 'power': 6000 } format and inserts the events into DevicePowerStream, and alerts the user if the power level is greater than or equal to 600 by printing a message in the log.")
/*
Input: deviceType string and powerConsuption int(Watt)
Output: Alert user from printing a log, if there is a power surge in the dryer. In other words, notify when power is greater than or equal to 600W.
*/
@source(
type='http',
receiver.url='${RECEIVER_URL}',
basic.auth.enabled='false',
@map(type='json')
)
define stream DevicePowerStream(deviceType string, power int);
@sink(type='log', prefix='LOGGER')
define stream PowerSurgeAlertStream(deviceType string, power int);
@info(name='power-filter')
from DevicePowerStream[deviceType == 'dryer' and power >= 600]
select deviceType, power
insert into PowerSurgeAlertStream;
Container:
Env:
Name: RECEIVER_URL
Value: http://0.0.0.0:8080/checkPower
Name: BASIC_AUTH_ENABLED
Value: false
Image: siddhiio/siddhi-runner-ubuntu:5.1.0
Status:
Nodes: <nil>
Ready: 1/1
Status: Running
Events:
Type Reason Age From Message
---- ------ ---- ---- -------
Normal DeploymentCreated 11m siddhiprocess-controller power-surge-app-0 deployment created successfully
Normal ServiceCreated 11m siddhiprocess-controller power-surge-app-0 service created successfully
View Siddhi process logs¶
To view the Siddhi process logs, first get the Siddhi process pods using the kubectl get pods
command as follows.
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
power-surge-app-0-646c4f9dd5-rxzkq 1/1 Running 0 4m
siddhi-operator-6698d8f69d-6rfb6 1/1 Running 0 4m
Then to retrieve the Siddhi process logs, run kubectl logs <pod name>
command. Here <pod name>
should be replaced with the name of the pod that starts with the relevant SiddhiProcess's name.
A sample output logs are of this command is as follows.
$ kubectl logs power-surge-app-0-646c4f9dd5-rxzkq
...
[2019-07-12 07:12:48,925] INFO {org.wso2.transport.http.netty.contractimpl.listener.ServerConnectorBootstrap$HttpServerConnector} - HTTP(S) Interface starting on host 0.0.0.0 and port 9443
[2019-07-12 07:12:48,927] INFO {org.wso2.transport.http.netty.contractimpl.listener.ServerConnectorBootstrap$HttpServerConnector} - HTTP(S) Interface starting on host 0.0.0.0 and port 9090
[2019-07-12 07:12:48,941] INFO {org.wso2.carbon.kernel.internal.CarbonStartupHandler} - Siddhi Runner Distribution started in 6.853 sec
[2019-07-12 07:17:22,219] INFO {io.siddhi.core.stream.output.sink.LogSink} - LOGGER : Event{timestamp=1562915842182, data=[dryer, 60000], isExpired=false}
Change the Default Configurations of Siddhi Runner¶
Siddhi runner use <SIDDHI_RUNNER_HOME>/conf/runner/deployment.yaml
file as the default configuration file. In the deployment.yaml
the file you can configure data sources that you planned to use, add refs, and enable state persistence, etc. To change the configurations of the deployment.yaml
, you can add runner
YAML spec like below to your SiddhiProcess YAML file. For example, the following config change will enable file system state persistence.
runner: |
statePersistence:
enabled: true
intervalInMin: 1
revisionsToKeep: 2
persistenceStore: io.siddhi.distribution.core.persistence.FileSystemPersistenceStore
config:
location: siddhi-app-persistence
Using a custom-built Siddhi runner image¶
You can change the custom built siddhi-runner
image in two ways.
- Change the image to all the SiddhiProcess deployments.
- Change the image only for a particular SiddhiProcess deployment.
To change the siddhi-runner
image for all the SiddhiProcess deployments you can use the siddhi-operator-config
config map. You can update siddhiImage
to change the image and if you are using a private docker registry/repository, then you can create a Kubernetes secret that contains credentials to the registry and specify that secret name siddhiImageSecret
spec. Refer the documentation on creating custom Siddhi runner images bundling additional JARs here. For more details on using docker images from private registries/repositories refer this documentation.
apiVersion: v1
kind: ConfigMap
metadata:
name: siddhi-operator-config
data:
siddhiHome: /home/siddhi_user/siddhi-runner/
siddhiProfile: runner
siddhiImage: <YOUR-CUSTOM-IMAGE>
autoIngressCreation: "true"
siddhiImageSecret: <SECRET>
If you need to use a custom-built siddhi-runner
image for a specific SiddhiProcess
deployment, you have to configure container.image
spec in the SiddhiProcess
. If you are pulling the custom-built image from a private Docker registry/repository, specify the corresponding Kubernetes secret as imagePullSecret
argument in the SiddhiProcess
YAML file.
container:
image: <YOUR-CUSTOM-IMAGE>
imagePullSecret: <SECRET>
Deploy and run Siddhi App using config maps¶
Siddhi operator allows you to deploy Siddhi app configurations via config maps instead of just adding them inline. Through this, you can also run multiple Siddhi Apps in a single SiddhiProcess.
This can be done by passing the config maps containing Siddhi app files to the SiddhiProcess's apps configuration as follows.
apps:
- configMap: power-surge-cm1
- configMap: power-surge-cm2
Sample on deploying and running Siddhi Apps via config maps
Here we will be creating a very simple Siddhi stream processing application that receives power consumption from several devices in a house. If the power consumption of dryer exceeds the consumption limit of 6000W then that Siddhi app sends an alert from printing a log.
@App:name("PowerSurgeDetection")
@App:description("App consume events from HTTP as a JSON message of { 'deviceType': 'dryer', 'power': 6000 } format and inserts the events into DevicePowerStream, and alerts the user if the power level is greater than or equal to 600 by printing a message in the log.")
/*
Input: deviceType string and powerConsuption int(Watt)
Output: Alert user from printing a log, if there is a power surge in the dryer. In other words, notify when power is greater than or equal to 600W.
*/
@source(
type='http',
receiver.url='${RECEIVER_URL}',
basic.auth.enabled='false',
@map(type='json')
)
define stream DevicePowerStream(deviceType string, power int);
@sink(type='log', prefix='LOGGER')
define stream PowerSurgeAlertStream(deviceType string, power int);
@info(name='power-filter')
from DevicePowerStream[deviceType == 'dryer' and power >= 600]
select deviceType, power
insert into PowerSurgeAlertStream;
Siddhi Tooling
You can also use the powerful Siddhi Editor to implement and test steam processing applications.
Save the above Siddhi App file as PowerSurgeDetection.siddhi
, and use this file to create a Kubernetes config map with the name power-surge-cm
.
This can be achieved by running the following command.
kubectl create configmap power-surge-cm --from-file=<absolute-file-path>/PowerSurgeDetection.siddhi
The created config map can be added to SiddhiProcess YAML under the apps
entry as follows.
apiVersion: siddhi.io/v1alpha2
kind: SiddhiProcess
metadata:
name: power-surge-app
spec:
apps:
- configMap: power-surge-cm
container:
env:
-
name: RECEIVER_URL
value: "http://0.0.0.0:8080/checkPower"
image: "siddhiio/siddhi-runner-ubuntu:5.1.0"
Save the YAML file as power-surge-app.yaml
, and use the following command to deploy the SiddhiProcess.
kubectl create -f <absolute-yaml-file-path>/power-surge-app.yaml
Using a config, created from a directory containing multiple Siddhi files
SiddhiProcess's apps.configMap
configuration also supports a config map that is created from a directory containing multiple Siddhi files.
Use kubectl create configmap siddhi-apps --from-file=<DIRECTORY_PATH>
command to create a config map from a directory.
Invoke Siddhi Applications
To invoke the Siddhi App, first obtain the external IP of the ingress load balancer using kubectl get ingress
command as follows.
$ kubectl get ingress
NAME HOSTS ADDRESS PORTS AGE
siddhi siddhi 10.0.2.15 80 2m
Then, add the host siddhi
and related external IP (ADDRESS
) to the /etc/hosts
file in your machine.
Minikube
For Minikube, you have to use Minikube IP as the external IP. Hence, run minikube ip
command to get the IP of the Minikube cluster.
Use the following CURL command to send events to power-surge-app
deployed in Kubernetes.
curl -X POST \
http://siddhi/power-surge-app-0/8080/checkPower \
-H 'Accept: */*' \
-H 'Content-Type: application/json' \
-H 'Host: siddhi' \
-H 'cache-control: no-cache' \
-d '{
"deviceType": "dryer",
"power": 60000
}'
View Siddhi Process Logs
Since the output of power-surge-app
is logged, you can see the output by monitoring the associated pod's logs.
To find the power-surge-app
pod use the kubectl get pods
command. This will list down all the deployed pods.
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
power-surge-app-0-646c4f9dd5-tns7l 1/1 Running 0 2m
siddhi-operator-6698d8f69d-6rfb6 1/1 Running 0 8m
Here, the pod starting with the SiddhiProcess name (in this case power-surge-app-
) is the pod we need to monitor.
To view the logs, run the kubectl logs <pod name>
command.
This will show all the Siddhi process logs, along with the filtered output events as given below.
$ kubectl logs power-surge-app-0-646c4f9dd5-tns7l
...
[2019-07-12 07:50:32,861] INFO {org.wso2.carbon.kernel.internal.CarbonStartupHandler} - Siddhi Runner Distribution started in 8.048 sec
[2019-07-12 07:50:32,864] INFO {org.wso2.transport.http.netty.contractimpl.listener.ServerConnectorBootstrap$HttpServerConnector} - HTTP(S) Interface starting on host 0.0.0.0 and port 9443
[2019-07-12 07:50:32,866] INFO {org.wso2.transport.http.netty.contractimpl.listener.ServerConnectorBootstrap$HttpServerConnector} - HTTP(S) Interface starting on host 0.0.0.0 and port 9090
[2019-07-12 07:51:42,488] INFO {io.siddhi.core.stream.output.sink.LogSink} - LOGGER : Event{timestamp=1562917902484, data=[dryer, 60000], isExpired=false}
Deploy Siddhi Apps without Ingress creation¶
By default, Siddhi operator creates an NGINX ingress and exposes your HTTP/HTTPS through that ingress. If you need to disable automatic ingress creation, you have to change the autoIngressCreation
value in the Siddhi siddhi-operator-config
config map to false
or null
as below.
# This config map used to parse configurations to the Siddhi operator.
apiVersion: v1
kind: ConfigMap
metadata:
name: siddhi-operator-config
data:
siddhiHome: /home/siddhi_user/siddhi-runner/
siddhiProfile: runner
siddhiImage: siddhiio/siddhi-runner-alpine:5.1.0
autoIngressCreation: "false"
Deploy and run Siddhi App with HTTPS¶
Configuring TLS will allow Siddhi ingress NGINX to expose HTTPS endpoints of your Siddhi Apps. To do so, create a Kubernetes secret(siddhi-tls
) and add that to the TLS configuration in siddhi-operator-config
config map as given below.
ingressTLS: siddhi-tls
Sample on deploying and running Siddhi App with HTTPS
First, you need to create a certificate using the following commands. For more details about the certificate creation refers this.
openssl req -x509 -nodes -days 365 -newkey rsa:2048 -keyout siddhi.key -out siddhi.crt -subj "/CN=siddhi/O=siddhi"
After that, create a kubernetes secret called siddhi-tls
, which we intended to add to the TLS configurations using the following command.
kubectl create secret tls siddhi-tls --key siddhi.key --cert siddhi.crt
The created secret then need to be added to the siddhi-operator-config
config map as follow.
apiVersion: v1
kind: ConfigMap
metadata:
name: siddhi-operator-config
data:
siddhiHome: /home/siddhi_user/siddhi-runner/
siddhiProfile: runner
siddhiImage: siddhiio/siddhi-runner-ubuntu:5.1.0
autoIngressCreation: "true"
ingressTLS: siddhi-tls
When this is done Siddhi operator will now enable TLS support via the NGINX ingress, and you will be able to access all the HTTPS endpoints.
Invoke Siddhi Applications
You can use now send the events to following HTTPS endpoint.
https://siddhi/power-surge-app-0/8080/checkPower
Further, you can use the following CURL command to send a request to the deployed Siddhi applications via HTTPS.
curl --cacert siddhi.crt -X POST \
https://siddhi/power-surge-app-0/8080/checkPower \
-H 'Accept: */*' \
-H 'Content-Type: application/json' \
-H 'Host: siddhi' \
-H 'cache-control: no-cache' \
-d '{
"deviceType": "dryer",
"power": 60000
}'
View Siddhi Process Logs
The output logs show the event that you sent using the previous CURL command.
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
power-surge-app-0-646c4f9dd5-kk5md 1/1 Running 0 2m
siddhi-operator-6698d8f69d-6rfb6 1/1 Running 0 10m
$ kubectl logs monitor-app-667c97c898-rrtfs
...
[2019-07-12 09:06:15,173] INFO {org.wso2.transport.http.netty.contractimpl.listener.ServerConnectorBootstrap$HttpServerConnector} - HTTP(S) Interface starting on host 0.0.0.0 and port 9443
[2019-07-12 09:06:15,184] INFO {org.wso2.transport.http.netty.contractimpl.listener.ServerConnectorBootstrap$HttpServerConnector} - HTTP(S) Interface starting on host 0.0.0.0 and port 9090
[2019-07-12 09:06:15,187] INFO {org.wso2.carbon.kernel.internal.CarbonStartupHandler} - Siddhi Runner Distribution started in 10.819 sec
[2019-07-12 09:07:50,098] INFO {io.siddhi.core.stream.output.sink.LogSink} - LOGGER : Event{timestamp=1562922470093, data=[dryer, 60000], isExpired=false}
Externally publish data to NATS from Siddhi¶
The default ingress creation of the Siddhi operator allows accessing HTTP/HTTPS endpoints externally. By default, it will not support TCP endpoints. Sometimes you may have some TCP endpoints to configure like NATS and Kafka sources and access those endpoints externally.
@source(type='nats', @map(type='text'), destination='SP_NATS_INPUT_TEST', bootstrap.servers='nats://localhost:4222',client.id='nats_client',server.id='test-cluster',queue.group.name = 'group_nats',durable.name = 'nats-durable',subscription.sequence = '100')
define stream inputStream (name string, age int, country string);
To access these TCP connections externally you can do it as in the following example.
First, you have to disable automatic ingress creation in the Siddhi operator. Then you have to manually create ingress and enable the TCP configurations. To enable TCP configurations in NGINX ingress please refer to this documentation.
To create NATS cluster you will need a NATS spec like below.
apiVersion: nats.io/v1alpha2
kind: NatsCluster
metadata:
name: nats-siddhi
spec:
size: 1
Save this yaml as nats-cluster.yaml
and deploy it using kubeclt
.
$ kubeclt apply -f nats-cluster.yaml
Likewise, create a nats streaming cluster as below.
apiVersion: streaming.nats.io/v1alpha1
kind: NatsStreamingCluster
metadata:
name: stan-siddhi
spec:
size: 1
natsSvc: nats-siddhi
Save this yaml as stan-cluster.yaml
and deploy it using kubeclt
.
$ kubeclt apply -f stan-cluster.yaml
Now you can deploy the following Siddhi app that contained a NATS source.
apiVersion: siddhi.io/v1alpha2
kind: SiddhiProcess
metadata:
name: power-consume-app
spec:
apps:
- script: |
@App:name("PowerConsumptionSurgeDetection")
@App:description("App consumes events from NATS as a text message of { 'deviceType': 'dryer', 'power': 6000 } format and inserts the events into DevicePowerStream, and alerts the user if the power consumption in 1 minute is greater than or equal to 10000W by printing a message in the log for every 30 seconds.")
/*
Input: deviceType string and powerConsuption int(Joules)
Output: Alert user from printing a log, if there is a power surge in the dryer within 1 minute period.
Notify the user in every 30 seconds when total power consumption is greater than or equal to 10000W in 1 minute time period.
*/
@source(
type='nats',
cluster.id='siddhi-stan',
destination = 'PowerStream',
bootstrap.servers='nats://siddhi-nats:4222',
@map(type='text')
)
define stream DevicePowerStream(deviceType string, power int);
@sink(type='log', prefix='LOGGER')
define stream PowerSurgeAlertStream(deviceType string, powerConsumed long);
@info(name='surge-detector')
from DevicePowerStream#window.time(1 min)
select deviceType, sum(power) as powerConsumed
group by deviceType
having powerConsumed > 10000
output every 30 sec
insert into PowerSurgeAlertStream;
container:
image: "siddhiio/siddhi-runner-ubuntu:5.1.0"
persistentVolumeClaim:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 1Gi
storageClassName: standard
volumeMode: Filesystem
runner: |
statePersistence:
enabled: true
intervalInMin: 1
revisionsToKeep: 2
persistenceStore: io.siddhi.distribution.core.persistence.FileSystemPersistenceStore
config:
location: siddhi-app-persistence
Save this yaml as power-consume-app.yaml
and deploy it using kubeclt
.
$ kubeclt apply -f power-consume-app.yaml
This commands will create Kubernetes artifacts like below.
$ kubectl get svc
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
kubernetes ClusterIP 10.96.0.1 <none> 443/TCP 12d
power-consume-app-0 ClusterIP 10.99.148.217 <none> 4222/TCP 5m
siddhi-nats ClusterIP 10.105.250.215 <none> 4222/TCP 5m
siddhi-nats-mgmt ClusterIP None <none> 6222/TCP,8222/TCP,7777/TCP 5m
siddhi-operator ClusterIP 10.102.251.237 <none> 8383/TCP 5m
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
nats-operator-b8f4977fc-8gnjd 1/1 Running 0 5m
nats-streaming-operator-64b565bcc7-r9rpw 1/1 Running 0 5m
power-consume-app-0-84f6774bd8-jl95w 1/1 Running 0 5m
siddhi-nats-1 1/1 Running 0 5m
siddhi-operator-6c6c5d8fcc-hvl7j 1/1 Running 0 5m
siddhi-stan-1 1/1 Running 0 5m
Now you have to create an ingress for the siddhi-nats
service.
apiVersion: networking.k8s.io/v1beta1
kind: Ingress
metadata:
name: siddhi-nats
annotations:
nginx.ingress.kubernetes.io/rewrite-target: /
spec:
rules:
- http:
paths:
- path: /nats
backend:
serviceName: siddhi-nats
servicePort: 4222
Save this yaml as siddhi-nats.yaml
and deploy it using kubeclt
.
$ kubeclt apply -f siddhi-nats.yaml
Now you can send messages directly to the NATS streaming server that running on your Kubernetes cluster. You have to send those messages to nats://<KUBERNETES_CLUSTER_IP>:4222
URI. To send messages to this NATS streaming cluster you can use a Siddhi app that has NATS sink or samples provided by NATS.
Minikube External TCP Access
The TCP configuration change that described in the ingress NGINX documentation occurred connection refused problems in Minikube. To configure TCP external access properly in Minikube please refer to the steps described in this comment.
Deploy and run Siddhi App in Distributed Mode¶
Siddhi apps can be in two different types.
- Stateless Siddhi apps
- Stateful Siddhi apps
The deployment of the stateful Siddhi apps follows distributed architecture to ensure high availability. The fully distributed deployment of Siddhi apps will be handle using Siddhi distributed annotations.
Without Messaging System | With Messaging System | |
---|---|---|
Without Distributed Annotations | Case 1: The given Siddhi app will be deployed in a stateless mode in a single kubernetes deployment. | Case 2: If given Siddhi app contains stateful queries then the Siddhi app divided into two partial Siddhi apps (passthrough and process) and deployed in two kubernetes deployments. Use the configured messaging system to communicate between two apps. |
With Distributed Annotations | Case 3: WIP(Work In Progress) | Case 4: WIP(Work In Progress) |
The previously described Siddhi app deployments fall under this Case 1 category. The following sample will cover the Siddhi app deployments which fall under Case 2.
Sample on deploying and running Siddhi App with a Messaging System
The Siddhi operator currently supports NATS as the messaging system. Therefore it is prerequisite to deploying NATS operator and NATS streaming operator in your kubernetes cluster before you install the Siddhi app.
- Refer this documentation to install NATS operator and NATS streaming operator.
- Install the Siddhi operator.
- Create a persistence volume in your cluster. Sample persistence volume can be created as follows.
echo ' --- kind: PersistentVolume apiVersion: v1 metadata: name: siddhi-pv labels: type: local spec: storageClassName: standard persistentVolumeReclaimPolicy: Recycle capacity: storage: 1Gi accessModes: - ReadWriteOnce hostPath: path: "/home/siddhi_user/" # For NFS in GKE use the following block # nfs: # server: <SERVER_IP> # path: <PATH> ' | kubectl apply -f -
Now we need a NATS cluster and NATS streaming cluster to run the Siddhi app deployment. For this, there are two cases handled by the operator.
-
User can create NATS cluster and NATS streaming cluster as described in this documentation. Specify cluster details in the YAML file like following.
messagingSystem: type: nats config: bootstrapServers: - "nats://example-nats:4222" clusterId: example-stan
-
If the user only specifies messaging system as NATS like below then Siddhi operator will automatically create NATS cluster(
siddhi-nats
) and NATS streaming cluster(siddhi-stan
), and connect two partial apps.messagingSystem: type: nats
Before installing a Siddhi app you have to check that all prerequisites(Siddhi-operator, nats-operator, and nats-streaming-operator) up and running perfectly like below.
$ kubectl get deployments
NAME READY UP-TO-DATE AVAILABLE AGE
nats-operator 1/1 1 1 5m
nats-streaming-operator 1/1 1 1 5m
siddhi-operator 1/1 1 1 5m
Now you need to specify a YAML file like below to create stateful Siddhi app deployment.
apiVersion: siddhi.io/v1alpha2
kind: SiddhiProcess
metadata:
name: power-consume-app
spec:
apps:
- script: |
@App:name("PowerConsumptionSurgeDetection")
@App:description("App consumes events from HTTP as a JSON message of { 'deviceType': 'dryer', 'power': 6000 } format and inserts the events into DevicePowerStream, and alerts the user if the power consumption in 1 minute is greater than or equal to 10000W by printing a message in the log for every 30 seconds.")
/*
Input: deviceType string and powerConsuption int(Joules)
Output: Alert user from printing a log, if there is a power surge in the dryer within 1 minute period.
Notify the user in every 30 seconds when total power consumption is greater than or equal to 10000W in 1 minute time period.
*/
@source(
type='http',
receiver.url='${RECEIVER_URL}',
basic.auth.enabled='false',
@map(type='json')
)
define stream DevicePowerStream(deviceType string, power int);
@sink(type='log', prefix='LOGGER')
define stream PowerSurgeAlertStream(deviceType string, powerConsumed long);
@info(name='power-consumption-window')
from DevicePowerStream#window.time(1 min)
select deviceType, sum(power) as powerConsumed
group by deviceType
having powerConsumed > 10000
output every 30 sec
insert into PowerSurgeAlertStream;
container:
env:
-
name: RECEIVER_URL
value: "http://0.0.0.0:8080/checkPower"
-
name: BASIC_AUTH_ENABLED
value: "false"
image: "siddhiio/siddhi-runner-ubuntu:5.1.0"
messagingSystem:
type: nats
persistentVolumeClaim:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 1Gi
storageClassName: standard
volumeMode: Filesystem
runner: |
statePersistence:
enabled: true
intervalInMin: 1
revisionsToKeep: 2
persistenceStore: io.siddhi.distribution.core.persistence.FileSystemPersistenceStore
config:
location: siddhi-app-persistence
Save this YAML as power-consume-app.yaml
as use kubectl
to deploy the app.
kubectl apply -f power-consume-app.yaml
This kubectl
execution in the Siddhi operator will do the following tasks.
- Create a NATS cluster and streaming cluster since the user did not specify it.
- Parse the given Siddhi app and create two partial Siddhi apps(passthrough and process). Then deploy both apps in separate deployments to distribute I/O time. Check health of the Siddhi runner and make deployments up and running.
- Create a service for passthrough app.
- Create an ingress rule that maps to passthrough service.
After a successful deployment, your kubernetes cluster should have these artifacts.
$ kubectl get SiddhiProcesses
NAME STATUS READY AGE
power-consume-app Running 2/2 5m
$ kubectl get deployments
NAME READY UP-TO-DATE AVAILABLE AGE
nats-operator 1/1 1 1 10m
nats-streaming-operator 1/1 1 1 10m
power-consume-app-0 1/1 1 1 5m
power-consume-app-1 1/1 1 1 5m
siddhi-operator 1/1 1 1 10m
$ kubectl get service
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
kubernetes ClusterIP 10.96.0.1 <none> 443/TCP 2d7h
power-consume-app-0 ClusterIP 10.105.67.227 <none> 8080/TCP 5m
siddhi-nats ClusterIP 10.100.205.21 <none> 4222/TCP 10m
siddhi-nats-mgmt ClusterIP None <none> 6222/TCP,8222/TCP,7777/TCP 10m
siddhi-operator ClusterIP 10.103.229.109 <none> 8383/TCP 10m
$ kubectl get ingress
NAME HOSTS ADDRESS PORTS AGE
siddhi siddhi 10.0.2.15 80 10m
$ kubectl get pv
NAME CAPACITY ACCESS MODES RECLAIM POLICY STATUS CLAIM STORAGECLASS REASON AGE
siddhi-pv 1Gi RWO Recycle Bound default/power-consume-app-1-pvc standard 10m
$ kubectl get pvc
NAME STATUS VOLUME CAPACITY ACCESS MODES STORAGECLASS AGE
power-consume-app-1-pvc Bound siddhi-pv 1Gi RWO standard 5m
Here power-consume-app-0
is the passthrough deployment and power-consume-app-1
is the process deployment.
Now you can send an HTTP request to the passthrough app.
curl -X POST \
http://siddhi/power-consume-app-0/8080/checkPower \
-H 'Accept: */*' \
-H 'Content-Type: application/json' \
-H 'Host: siddhi' \
-d '{
"deviceType": "dryer",
"power": 60000
}'
The process app logs will show that event.
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
nats-operator-dd7f4945f-x4vf8 1/1 Running 0 10m
nats-streaming-operator-6fbb6695ff-9rmlx 1/1 Running 0 10m
power-consume-app-0-7486b87979-6tccx 1/1 Running 0 5m
power-consume-app-1-588996fcfb-prncj 1/1 Running 0 5m
siddhi-nats-1 1/1 Running 0 5m
siddhi-operator-6698d8f69d-w2kvj 1/1 Running 0 10m
siddhi-stan-1 1/1 Running 1 5m
$ kubectl logs power-consume-app-1-588996fcfb-prncj
JAVA_HOME environment variable is set to /opt/java/openjdk
CARBON_HOME environment variable is set to /home/siddhi_user/siddhi-runner
RUNTIME_HOME environment variable is set to /home/siddhi_user/siddhi-runner/wso2/runner
Picked up JAVA_TOOL_OPTIONS: -XX:+UnlockExperimentalVMOptions -XX:+UseCGroupMemoryLimitForHeap
[2019-07-12 14:09:16,648] INFO {org.wso2.carbon.launcher.extensions.OSGiLibBundleDeployerUtils updateOSGiLib} - Successfully updated the OSGi bundle information of Carbon Runtime: runner
...
[2019-07-12 14:12:04,969] INFO {io.siddhi.core.stream.output.sink.LogSink} - LOGGER : Event{timestamp=1562940716559, data=[dryer, 60000], isExpired=false}
Top