gRPC Service Integration
This application demonstrates how to achieve gRPC service integration (request-response) while processing events in the realtime. There could be use cases which need to integrate with an external gRPC service to make some decision when processing events. The below example demonstrates such a requirement.
define stream TicketBookingStream (name string, phoneNo string,
movie string, ticketClass string, qty int,
bookingTime long);
@sink(type='grpc-call',
publisher.url =
'grpc://localhost:5003/org.wso2.grpc.EventService/process',
sink.id= 'ticket-price', @map(type='json'))
define stream TicketPriceFinderStream (name string,
phoneNo string, movie string, ticketClass string,
qty int, bookingTime long);
@source(type='grpc-call-response',
receiver.url =
'grpc://localhost:9763/org.wso2.grpc.EventService/process',
sink.id= 'ticket-price',
@map(type='json', @attributes(customerName='trp:name',
phoneNo='trp:phoneNo', movie='trp:movie',
qty='trp:qty', bookingTime='trp:bookingTime',
ticketPrice='price')))
define stream TicketPriceResponseStream (customerName string,
phoneNo string, movie string, qty int,
ticketPrice double, bookingTime long);
@sink(type='log')
define stream TotalTicketPaymentStream (customerName string,
phoneNo string, movie string, totalAmount double,
bookingTime long);
@info(name = 'filter-basic-ticket-bookings')
from TicketBookingStream[ticketClass == "BASIC"]
select name as customerName, phoneNo, movie,
qty * 20.0 as totalAmount, bookingTime
insert into TotalTicketPaymentStream;
@info(name = 'filter-non-basic-tickets')
from TicketBookingStream[ticketClass != "BASIC"]
select *
insert into TicketPriceFinderStream;
@info(name = 'total-price-calculator')
from TicketPriceResponseStream
select customerName, phoneNo, movie,
(qty * ticketPrice) as totalAmount, bookingTime
insert into TotalTicketPaymentStream;
|
Defines |
|
The |
|
Defines |
|
This |
|
Attributes of the event that initiated the call, and the response headers and properties via transport properties in
the format |
|
Defines |
|
Defines |
|
|
|
Filter the ticket bookings of class |
|
|
|
Filter the ticket bookings other than the class |
|
|
|
Calculate the total ticket payment amount based on the |
|
Input and Output
Let’s assume there is an external gRPC service that responds with the ticket price based on the gRPC request.
When an event with values [“Mohan”, “+181234579212”, “Iron Man”, “Gold”, 4, 0130] is sent to TicketBookingStream
stream then a gRPC request is sent to the loan gRPC service to find the ticket price if ticket class is not basic
.
Then, gRPC server responds with the ticket price as shown below.
{
"price": 25
}
There is a grpc-call-response
source configured to consume the response from the gRPC server and those responses will be mapped to the stream called TicketPriceResponseStream
.
Then Siddhi calculates the total ticket payment amount accordingly and pushes it to a stream called TotalTicketPaymentStream
. In this example, those events are logged to the console.
Sample console log is give below,
TotalPurchaseCalculator : TotalTicketPaymentStream : Event{timestamp=1575449841536, data=[Mohan, +181234579212, Iron Man, 100.0, 0130], isExpired=false}