Source and Sink

Provides introduction to sources and sink that are used to consume and publish events to external systems.

There are multiple source and sink types, but this example only explains http source, log sink, and kafka sink. For more info refer the Siddhi query guide.

@source(type='http', receiver.url='http://0.0.0.0:8006/temp',
    @map(type='json'))
define stream TemperatureStream (
                  sensorId string, temperature double);

@sink(type='log')
@sink(type='kafka', topic='temperature',
      bootstrap.servers='localhost:9092',
      @map(type='json',
           @payload("""{"temp":"{{temperature}}"}""")))
define stream TemperatureOnlyStream (temperature double);

@info(name = 'Simple-selection')
from TemperatureStream
select temperature
insert into TemperatureOnlyStream;
@source(type='http', receiver.url='http://0.0.0.0:8006/temp',
    @map(type='json'))

HTTP source to consume JSON messages with default mapping via url http://0.0.0.0:8006/temp.

define stream TemperatureStream (
                  sensorId string, temperature double);

Defines TemperatureStream stream having sensorId and temperature attributes of types string and double.

@sink(type='log')

Log sink to log Siddhi events arriving via TemperatureOnlyStream stream.

@sink(type='kafka', topic='temperature',
      bootstrap.servers='localhost:9092',
      @map(type='json',
           @payload("""{"temp":"{{temperature}}"}""")))

Kafka sink to map events arriving via TemperatureOnlyStream stream as custom JSON events, and publish to temperature topic.

define stream TemperatureOnlyStream (temperature double);

Defines TemperatureOnlyStream stream having temperature attribute of type double.

@info(name = 'Simple-selection')
from TemperatureStream
select temperature
insert into TemperatureOnlyStream;

Input

When a JSON message in the following default message format is sent to url http://0.0.0.0:8006/temp with content type application/json. It will automatically get mapped to an event in the TemperatureStream stream.

{
   "event":{
      "sensorId":"aq-14",
      "temperature":35.4
   }
}

To process custom input messages, please refer the examples related to Input Data Mapping.

Output

After processing, the event arriving at TemperatureOnlyStream will be emitted via log and kafka sinks.

As log sink uses passThrough mapper by default, it directly logs the Siddhi Events to the console as following;

Event{timestamp=1574515771712, data=[35.4], isExpired=false}

The kafka sink maps the event to a custom JSON message as below and publishes it to the temperature topic.

{"temp":"35.4"}

To output messages using other message formats, pleases refer the examples related to Output Data Mapping.

Top