Scatter and Gather (JSON)

Provides example on performing scatter and gather on JSON values.

For more information on performing scatter and gather on string, refer other examples under data pipelining section.

define stream PurchaseStream
                (order string, store string);

@info(name = 'Scatter-query')
from PurchaseStream#json:tokenize(order, '$.order.items')
select json:getString(order, '$.order.id') as orderId,
       jsonElement as item,
       store
insert into TokenizedItemStream;

@info(name = 'Transform-query')
from TokenizedItemStream
select orderId,
       ifThenElse(json:getString(item, 'name') == "cake",
                  json:toString(
                    json:setElement(item, 'price',
                      json:getDouble(item, 'price') - 5
                    )
                  ),
                  item) as item,
       store
insert into DiscountedItemStream;

@info(name = 'Gather-query')
from DiscountedItemStream#window.batch()
select orderId, json:group(item) as items, store
insert into GroupedItemStream;

@info(name = 'Format-query')
from GroupedItemStream
select str:fillTemplate("""
    {"discountedOrder":
        {"id":"{{1}}", "store":"{{3}}", "items":{{2}} }
    }""", orderId, items, store) as discountedOrder
insert into DiscountedOrderStream;
define stream PurchaseStream
                (order string, store string);
@info(name = 'Scatter-query')
from PurchaseStream#json:tokenize(order, '$.order.items')
select json:getString(order, '$.order.id') as orderId,
       jsonElement as item,
       store

Scatter elements under $.order.items in to separate events.

insert into TokenizedItemStream;
@info(name = 'Transform-query')
from TokenizedItemStream
select orderId,
       ifThenElse(json:getString(item, 'name') == "cake",
                  json:toString(
                    json:setElement(item, 'price',
                      json:getDouble(item, 'price') - 5
                    )
                  ),
                  item) as item,
       store

Provide $5 discount to cakes.

insert into DiscountedItemStream;
@info(name = 'Gather-query')
from DiscountedItemStream#window.batch()

Collect events traveling as a batch via batch() window.

select orderId, json:group(item) as items, store

Combine item from all events in a batch as a single JSON Array.

insert into GroupedItemStream;
@info(name = 'Format-query')
from GroupedItemStream
select str:fillTemplate("""
    {"discountedOrder":
        {"id":"{{1}}", "store":"{{3}}", "items":{{2}} }
    }""", orderId, items, store) as discountedOrder

Format the final JSON by combining orderId, items, and store.

insert into DiscountedOrderStream;

Input

Below event is sent to PurchaseStream,

[{
   "order":{
      "id":"501",
      "items":[{"name":"cake", "price":25.0},
               {"name":"cookie", "price":15.0},
               {"name":"bun", "price":20.0}
      ]
   }
}, 'CA']

Output

After processing, following events will be arriving at TokenizedItemStream:

['501', '{"name":"cake","price":25.0}', 'CA'],
['501', '{"name":"cookie","price":15.0}', 'CA'],
['501', '{"name":"bun","price":20.0}', 'CA']

The events arriving at DiscountedItemStream will be as follows:

['501', '{"name":"cake","price":20.0}', 'CA'],
['501', '{"name":"cookie","price":15.0}', 'CA'],
['501', '{"name":"bun","price":20.0}', 'CA']

The event arriving at GroupedItemStream will be as follows:

['501', '[{"price":20.0,"name":"cake"},{"price":15.0,"name":"cookie"},{"price":20.0,"name":"bun"}]', 'CA']

The event arriving at DiscountedOrderStream will be as follows:

['{"discountedOrder": {"id":"501", "store":"CA", "items":[{"price":20.0,"name":"cake"}, {"price":15.0,"name":"cookie"}, {"price":20.0,"name":"bun"}] } }']

Top