Wait & Retry

This example explains how errors are handled at Sink level by wait and retry mode.

In this mode, publishing threads wait in back-off and re-trying mode, and only send the events when the connection is re-established. During this time the threads will not consume any new messages causing the systems to introduce back pressure on the systems that publish to it.

Refer the Siddhi query guide for more information.

define stream GlucoseReadingStream (locationRoom string,
    locationBed string, timeStamp string, sensorID long,
    patientFirstName string, patientLastName string,
    sensorValue double);

@sink(type = 'http', on.error='wait',
    publisher.url = "http://localhost:8080/logger",
    method = "POST",
    @map(type = 'json'))
define stream AbnormalGlucoseReadingStream
    (timeStampInLong long, locationRoom string,
    locationBed string, sensorID long,
    patientFullName string, sensorReadingValue double);

@info(name='abnormal-reading-identifier')
from GlucoseReadingStream[sensorValue > 220]
select math:parseLong(timeStamp) as timeStampInLong,
    locationRoom, locationBed, sensorID,
    str:concat(patientFirstName, " ", patientLastName)
        as patientFullName,
        sensorValue as sensorReadingValue
insert into AbnormalGlucoseReadingStream;
define stream GlucoseReadingStream (locationRoom string,
    locationBed string, timeStamp string, sensorID long,
    patientFirstName string, patientLastName string,
    sensorValue double);

Defines GlucoseReadingStream stream which contains events related to Glucose readings.

@sink(type = 'http', on.error='wait',
    publisher.url = "http://localhost:8080/logger",
    method = "POST",
    @map(type = 'json'))

If HTTP endpoint is unavailable then threads who bring events via AbnormalGlucoseReadingStream wait in back-off and re-trying mode. Errors can be gracefully handled by configuring on.error parameter.

define stream AbnormalGlucoseReadingStream
    (timeStampInLong long, locationRoom string,
    locationBed string, sensorID long,
    patientFullName string, sensorReadingValue double);
@info(name='abnormal-reading-identifier')
from GlucoseReadingStream[sensorValue > 220]
select math:parseLong(timeStamp) as timeStampInLong,
    locationRoom, locationBed, sensorID,

Identifies the abnormal Glucose reading if sensorValue > 220

    str:concat(patientFirstName, " ", patientLastName)
        as patientFullName,

Concatenate string attributes patientFirstName and patientLastName

        sensorValue as sensorReadingValue
insert into AbnormalGlucoseReadingStream;

Above is a simple example to publish abnormal Glucose reading events to an unavailable HTTP endpoint and error is handled by wait and retry mode.

prerequisites

Download the mock logger service from here.

Input & Output

  • Below event is sent to GlucoseReadingStream stream,

    ['Get-1024', 'Level2', '1576829362', 10348, 'Alex', 'John', 250]

    You could see ConnectException is get printed since logger service is unavailable.

  • Then, execute the below command to start the mock logger service.

    java -jar logservice-1.0.0.jar

  • Now, you could see the event sent in step #1 is get logged in the logger service console as given below.

    LoggerService:42 - {event={timeStampInLong=1.576829362E9, locationRoom=Get-1024, locationBed=Level2, sensorID=10348.0, patientFullName=Alex John, sensorReadingValue=250.0}}

Top