gRPC Service Integration

This application demonstrates how to achieve gRPC service integration (request-response) while processing events in the realtime. There could be use cases which need to integrate with an external gRPC service to make some decision when processing events. The below example demonstrates such a requirement.

define stream TicketBookingStream (name string, phoneNo string,
        movie string, ticketClass string, qty int,
        bookingTime long);

@sink(type='grpc-call',
    publisher.url =
    'grpc://localhost:5003/org.wso2.grpc.EventService/process',
    sink.id= 'ticket-price', @map(type='json'))
define stream TicketPriceFinderStream (name string,
        phoneNo string, movie string, ticketClass string,
        qty int, bookingTime long);

@source(type='grpc-call-response',
    receiver.url =
    'grpc://localhost:9763/org.wso2.grpc.EventService/process',
    sink.id= 'ticket-price',
    @map(type='json', @attributes(customerName='trp:name',
        phoneNo='trp:phoneNo', movie='trp:movie',
        qty='trp:qty', bookingTime='trp:bookingTime',
        ticketPrice='price')))

define stream TicketPriceResponseStream (customerName string,
        phoneNo string, movie string, qty int,
        ticketPrice double, bookingTime long);

@sink(type='log')
define stream TotalTicketPaymentStream (customerName string,
        phoneNo string, movie string, totalAmount double,
        bookingTime long);

@info(name = 'filter-basic-ticket-bookings')
from TicketBookingStream[ticketClass == "BASIC"]
select name as customerName, phoneNo, movie,
    qty * 20.0 as totalAmount, bookingTime
insert into TotalTicketPaymentStream;

@info(name = 'filter-non-basic-tickets')
from TicketBookingStream[ticketClass != "BASIC"]
select *
insert into TicketPriceFinderStream;

@info(name = 'total-price-calculator')
from TicketPriceResponseStream
select customerName, phoneNo, movie,
    (qty * ticketPrice) as totalAmount, bookingTime
insert into TotalTicketPaymentStream;
define stream TicketBookingStream (name string, phoneNo string,
        movie string, ticketClass string, qty int,
        bookingTime long);

Defines TicketBookingStream which is the input stream that contains the ticket booking events.

@sink(type='grpc-call',
    publisher.url =
    'grpc://localhost:5003/org.wso2.grpc.EventService/process',
    sink.id= 'ticket-price', @map(type='json'))

The grpc-call sink is used for scenarios where we send a request out and expect a response back. In default mode this will use EventService process method. grpc-call-response source is used to receive the responses. A unique sink.id is used to correlate between the sink and its corresponding source.

define stream TicketPriceFinderStream (name string,
        phoneNo string, movie string, ticketClass string,
        qty int, bookingTime long);

Defines TicketPriceFinderStream to forward events to the gRPC endpoint.

@source(type='grpc-call-response',
    receiver.url =
    'grpc://localhost:9763/org.wso2.grpc.EventService/process',
    sink.id= 'ticket-price',

This grpc-call-response source receives responses received from gRPC server for requests sent from a grpc-call sink. The source will receive responses for sink with the same sink.id.

    @map(type='json', @attributes(customerName='trp:name',
        phoneNo='trp:phoneNo', movie='trp:movie',
        qty='trp:qty', bookingTime='trp:bookingTime',
        ticketPrice='price')))

Attributes of the event that initiated the call, and the response headers and properties via transport properties in the format trp:<attribute name> and trp:<header/property> respectively.

define stream TicketPriceResponseStream (customerName string,
        phoneNo string, movie string, qty int,
        ticketPrice double, bookingTime long);

Defines TicketPriceResponseStream which contains the response events for the ticket price requests.

@sink(type='log')
define stream TotalTicketPaymentStream (customerName string,
        phoneNo string, movie string, totalAmount double,
        bookingTime long);

Defines TotalTicketPaymentStream which contains the events with the total payment amount for tickets.

@info(name = 'filter-basic-ticket-bookings')
from TicketBookingStream[ticketClass == "BASIC"]
select name as customerName, phoneNo, movie,
    qty * 20.0 as totalAmount, bookingTime
insert into TotalTicketPaymentStream;

Filter the ticket bookings of class Basic and apply the default ticket price as 20 USD.

@info(name = 'filter-non-basic-tickets')
from TicketBookingStream[ticketClass != "BASIC"]
select *
insert into TicketPriceFinderStream;

Filter the ticket bookings other than the class Basic and route them to stream called TicketPriceFinderStream.

@info(name = 'total-price-calculator')
from TicketPriceResponseStream
select customerName, phoneNo, movie,
    (qty * ticketPrice) as totalAmount, bookingTime

Calculate the total ticket payment amount based on the price amount received from the gRPC service.

insert into TotalTicketPaymentStream;

Input and Output

Let’s assume there is an external gRPC service that responds with the ticket price based on the gRPC request.

When an event with values [“Mohan”, “+181234579212”, “Iron Man”, “Gold”, 4, 0130] is sent to TicketBookingStream stream then a gRPC request is sent to the loan gRPC service to find the ticket price if ticket class is not basic. Then, gRPC server responds with the ticket price as shown below.

{
    "price": 25
}

There is a grpc-call-response source configured to consume the response from the gRPC server and those responses will be mapped to the stream called TicketPriceResponseStream. Then Siddhi calculates the total ticket payment amount accordingly and pushes it to a stream called TotalTicketPaymentStream. In this example, those events are logged to the console. Sample console log is give below,

TotalPurchaseCalculator : TotalTicketPaymentStream : Event{timestamp=1575449841536, data=[Mohan, +181234579212, Iron Man, 100.0, 0130], isExpired=false}

Top