Scatter and Gather (String)

Provides example on performing scatter and gather on string values.

For more information on performing scatter and gather on json, refer other examples under data pipelining section.

define stream PurchaseStream
                (userId string, items string, store string);

@info(name = 'Scatter-query')
from PurchaseStream#str:tokenize(items, ',', true)
select userId, token as item, store
insert into TokenizedItemStream;

@info(name = 'Transform-query')
from TokenizedItemStream
select userId, str:concat(store, "-", item) as itemKey
insert into TransformedItemStream;

@info(name = 'Gather-query')
from TransformedItemStream#window.batch()
select userId, str:groupConcat(itemKey, ",") as itemKeys
insert into GroupedPurchaseItemStream;
define stream PurchaseStream
                (userId string, items string, store string);
@info(name = 'Scatter-query')
from PurchaseStream#str:tokenize(items, ',', true)
select userId, token as item, store

Scatter value of items in to separate events by ,.

insert into TokenizedItemStream;
@info(name = 'Transform-query')
from TokenizedItemStream
select userId, str:concat(store, "-", item) as itemKey

Concat tokenized item with store.

insert into TransformedItemStream;
@info(name = 'Gather-query')
from TransformedItemStream#window.batch()

Collect events traveling as a batch via batch() window.

select userId, str:groupConcat(itemKey, ",") as itemKeys

Concat all events in a batch separating them by ,.

insert into GroupedPurchaseItemStream;

Input

Below event containing a JSON string is sent to PurchaseStream,

['501', 'cake,cookie,bun,cookie', 'CA']

Output

After processing, the events arriving at TokenizedItemStream will be as follows:

['501', 'cake', 'CA'], ['501', 'cookie', 'CA'], ['501', 'bun', 'CA']

The events arriving at TransformedItemStream will be as follows:

['501', 'CA-cake'], ['501', 'CA-cookie'], ['501', 'CA-bun']

The event arriving at GroupedPurchaseItemStream will be as follows:

['501', 'CA-cake,CA-cookie,CA-bun']

Top