Source and Sink
Provides introduction to sources and sink that are used to consume and publish events to external systems.
There are multiple source and sink types, but this example only explains http
source, log
sink, and kafka
sink. For more info refer the Siddhi query guide.
@source(type='http', receiver.url='http://0.0.0.0:8006/temp',
@map(type='json'))
define stream TemperatureStream (
sensorId string, temperature double);
@sink(type='log')
@sink(type='kafka', topic='temperature',
bootstrap.servers='localhost:9092',
@map(type='json',
@payload("""{"temp":"{{temperature}}"}""")))
define stream TemperatureOnlyStream (temperature double);
@info(name = 'Simple-selection')
from TemperatureStream
select temperature
insert into TemperatureOnlyStream;
|
|
|
Defines |
|
|
|
|
|
Defines |
|
Input
When a JSON message in the following default message format is sent to url http://0.0.0.0:8006/temp
with content type application/json
.
It will automatically get mapped to an event in the TemperatureStream
stream.
{
"event":{
"sensorId":"aq-14",
"temperature":35.4
}
}
To process custom input messages, please refer the examples related to Input Data Mapping.
Output
After processing, the event arriving at TemperatureOnlyStream
will be emitted via log
and kafka
sinks.
As log
sink uses passThrough
mapper by default, it directly logs the Siddhi Events to the console as following;
Event{timestamp=1574515771712, data=[35.4], isExpired=false}
The kafka
sink maps the event to a custom JSON message as below and publishes it to the temperature
topic.
{"temp":"35.4"}
To output messages using other message formats, pleases refer the examples related to Output Data Mapping.