HTTP Service Integration

This application demonstrates how to achieve HTTP service integration (request-response) while processing events in the realtime. There could be use cases which need to integrate with an external service to make some decision when processing events. The below example demonstrates such a requirement.

@sink(type='http-call',
    publisher.url='http://localhost:8005/validate-loan',
    method='POST', sink.id='loan-validation',
    @map(type='json'))
define stream LoanValidationStream (clientId long,
                    name string, amount double);

@source(type='http-call-response', sink.id='loan-validation',
    http.status.code='2\d+',
    @map(type='json', @attributes(customerName='trp:name',
        clientId='trp:clientId', loanAmount='trp:amount',
        interestRate='validation-response.rate',
        totalYears='validation-response.years-approved')))
define stream SuccessLoanRequestStream(clientId long,
           customerName string, loanAmount double,
           interestRate double, totalYears int);

@source(type='http-call-response', sink.id='loan-validation',
    http.status.code='400',
    @map(type='json', @attributes(customerName='trp:name',
        clientId='trp:clientId',
        failureReason='validation-response.reason')))
define stream FailureLoanRequestStream(clientId long,
                customerName string, failureReason string);

define stream LoanRequestStream (clientId long, name string,
                amount double);

@sink(type='log') 
define stream LoanResponseStream(clientId long,
                customerName string, message string);

from LoanRequestStream
select clientId, name, amount
insert into LoanValidationStream;

from SuccessLoanRequestStream
select clientId, customerName,
    "Loan Request is accepted for processing" as message
insert into LoanResponseStream;

from FailureLoanRequestStream
select clientId, customerName,
        str:concat("Loan Request is rejected due to ",
            failureReason) as message
insert into LoanResponseStream;
@sink(type='http-call',
    publisher.url='http://localhost:8005/validate-loan',
    method='POST', sink.id='loan-validation',
    @map(type='json'))

http-call sink publishes messages to endpoints via HTTP or HTTPS protocols. and consume responses through its corresponding http-call-response source

define stream LoanValidationStream (clientId long,
                    name string, amount double);

Defines LoanValidationStream to forward events to the loan validation purposes.

@source(type='http-call-response', sink.id='loan-validation',

The http-call-response source receives the responses for the calls made by its corresponding http-call sink. sink.id used to map the corresponding http-call sink.

    http.status.code='2\d+',

To handle messages with different HTTP status codes having different formats, multiple http-call-response sources are allowed to associate with a single http-call sink. In this case, it only handles HTTP responses which come with 2xx response codes.

    @map(type='json', @attributes(customerName='trp:name',
        clientId='trp:clientId', loanAmount='trp:amount',
        interestRate='validation-response.rate',
        totalYears='validation-response.years-approved')))

Attributes of the event that initiated the call, and the response headers and properties via transport properties in the format trp:<attribute name> and trp:<header/property> respectively.

define stream SuccessLoanRequestStream(clientId long,
           customerName string, loanAmount double,
           interestRate double, totalYears int);

Defines SuccessLoanRequestStream to process loan requests which are validated successfully.

@source(type='http-call-response', sink.id='loan-validation',
    http.status.code='400',
    @map(type='json', @attributes(customerName='trp:name',
        clientId='trp:clientId',
        failureReason='validation-response.reason')))

The http-call-response source which handles the responses which come with 400 HTTP response code.

define stream FailureLoanRequestStream(clientId long,
                customerName string, failureReason string);

Defines FailureLoanRequestStream to process loan requests which are rejected.

define stream LoanRequestStream (clientId long, name string,
                amount double);

Defines LoanRequestStream which is the input stream that contains the initial loan request events.

@sink(type='log') 
define stream LoanResponseStream(clientId long,
                customerName string, message string);

Defines LoanResponseStream which contains the events with the loan request responses.

from LoanRequestStream
select clientId, name, amount
insert into LoanValidationStream;

Project attributes clientId, name and amount to perform the http request to validate-loan service.

from SuccessLoanRequestStream
select clientId, customerName,
    "Loan Request is accepted for processing" as message
insert into LoanResponseStream;

Process the successful loan requests and generate a response message.

from FailureLoanRequestStream
select clientId, customerName,
        str:concat("Loan Request is rejected due to ",
            failureReason) as message
insert into LoanResponseStream;

Process the rejected loan requests and generate a response message.

Input and Output

Let’s assume there is an external HTTP service that performs the loan request validation.

When an event with values [002345, “David Warner”, 200000] is sent to LoanRequestStream stream then the respective event is sent to the loan validation service.

If the loan request is valid then Siddhi gets a response as shown below from the loan validation service. In this situation, the response code would be 200.

{
    "validation-response": {
        "isValid": true,
        "rate": 12.5,
        "years-approved": 5
    }
}

If the loan request is rejected then the loan validation service sent a response with response code 400 with below payload.

{
    "validation-response": {
        "isValid": false,
        "reason": "Not a registered user"
    }
}

Then Siddhi logs the response message accordingly. If the loan request is accepted then log message would be something similar as below.

LoanRequestProcessor : LoanResponseStream : Event{timestamp=1575293895348, data=[002345, David Warner, Loan Request is accepted for processing], isExpired=false

Top