Scatter and Gather (JSON)
Provides example on performing scatter and gather on JSON values.
For more information on performing scatter and gather on string, refer other examples under data pipelining section.
define stream PurchaseStream
(order string, store string);
@info(name = 'Scatter-query')
from PurchaseStream#json:tokenize(order, '$.order.items')
select json:getString(order, '$.order.id') as orderId,
jsonElement as item,
store
insert into TokenizedItemStream;
@info(name = 'Transform-query')
from TokenizedItemStream
select orderId,
ifThenElse(json:getString(item, 'name') == "cake",
json:toString(
json:setElement(item, 'price',
json:getDouble(item, 'price') - 5
)
),
item) as item,
store
insert into DiscountedItemStream;
@info(name = 'Gather-query')
from DiscountedItemStream#window.batch()
select orderId, json:group(item) as items, store
insert into GroupedItemStream;
@info(name = 'Format-query')
from GroupedItemStream
select str:fillTemplate("""
{"discountedOrder":
{"id":"{{1}}", "store":"{{3}}", "items":{{2}} }
}""", orderId, items, store) as discountedOrder
insert into DiscountedOrderStream;
|
|
|
|
|
Scatter elements under |
|
|
|
|
|
Provide |
|
|
|
|
|
Collect events traveling as a batch via |
|
Combine |
|
|
|
|
|
Format the final JSON by combining |
|
Input
Below event is sent to PurchaseStream
,
[{
"order":{
"id":"501",
"items":[{"name":"cake", "price":25.0},
{"name":"cookie", "price":15.0},
{"name":"bun", "price":20.0}
]
}
}, 'CA']
Output
After processing, following events will be arriving at TokenizedItemStream
:
['501'
, '{"name":"cake","price":25.0}'
, 'CA'
],
['501'
, '{"name":"cookie","price":15.0}'
, 'CA'
],
['501'
, '{"name":"bun","price":20.0}'
, 'CA'
]
The events arriving at DiscountedItemStream
will be as follows:
['501'
, '{"name":"cake","price":20.0}'
, 'CA'
],
['501'
, '{"name":"cookie","price":15.0}'
, 'CA'
],
['501'
, '{"name":"bun","price":20.0}'
, 'CA'
]
The event arriving at GroupedItemStream
will be as follows:
['501'
, '[{"price":20.0,"name":"cake"},{"price":15.0,"name":"cookie"},{"price":20.0,"name":"bun"}]'
, 'CA'
]
The event arriving at DiscountedOrderStream
will be as follows:
['{"discountedOrder":
{"id":"501",
"store":"CA",
"items":[{"price":20.0,"name":"cake"},
{"price":15.0,"name":"cookie"},
{"price":20.0,"name":"bun"}] }
}'
]