Scatter and Gather (String)
Provides example on performing scatter and gather on string values.
For more information on performing scatter and gather on json, refer other examples under data pipelining section.
define stream PurchaseStream
(userId string, items string, store string);
@info(name = 'Scatter-query')
from PurchaseStream#str:tokenize(items, ',', true)
select userId, token as item, store
insert into TokenizedItemStream;
@info(name = 'Transform-query')
from TokenizedItemStream
select userId, str:concat(store, "-", item) as itemKey
insert into TransformedItemStream;
@info(name = 'Gather-query')
from TransformedItemStream#window.batch()
select userId, str:groupConcat(itemKey, ",") as itemKeys
insert into GroupedPurchaseItemStream;
|
|
|
|
|
Scatter value of |
|
|
|
|
|
Concat tokenized |
|
|
|
|
|
Collect events traveling as a batch via |
|
Concat all events in a batch separating them by |
|
Input
Below event containing a JSON string is sent to PurchaseStream
,
['501'
, 'cake,cookie,bun,cookie'
, 'CA'
]
Output
After processing, the events arriving at TokenizedItemStream
will be as follows:
['501'
, 'cake'
, 'CA'
], ['501'
, 'cookie'
, 'CA'
], ['501'
, 'bun'
, 'CA'
]
The events arriving at TransformedItemStream
will be as follows:
['501'
, 'CA-cake'
], ['501'
, 'CA-cookie'
], ['501'
, 'CA-bun'
]
The event arriving at GroupedPurchaseItemStream
will be as follows:
['501'
, 'CA-cake,CA-cookie,CA-bun'
]