Partition Events by Value

Provides example on partitioning events by attribute values.

For more information on partitioning events based on value ranges, refer other examples under data pipelining section. For more information on partition refer the Siddhi query guide.

define stream LoginStream
        ( userID string, loginSuccessful bool);

@purge(enable='true', interval='10 sec',
       idle.period='1 hour')
partition with ( userID of LoginStream )
begin
    @info(name='Aggregation-query')
    from LoginStream#window.length(3)
    select userID, loginSuccessful, count() as attempts
    group by loginSuccessful
    insert into #LoginAttempts;

    @info(name='Alert-query')
    from #LoginAttempts[loginSuccessful==false and attempts==3]
    select userID, "3 consecutive login failures!" as message
    insert into UserSuspensionStream;
end;
define stream LoginStream
        ( userID string, loginSuccessful bool);
@purge(enable='true', interval='10 sec',
       idle.period='1 hour')

Optional purging configuration, to remove partition instances that haven’t received events for 1 hour by checking every 10 sec.

partition with ( userID of LoginStream )

Partitions the events based on userID.

begin
    @info(name='Aggregation-query')
    from LoginStream#window.length(3)
    select userID, loginSuccessful, count() as attempts
    group by loginSuccessful

Calculates success and failure login attempts from last 3 events of each userID.

    insert into #LoginAttempts;

Inserts results to #LoginAttempts inner stream that is only accessible within the partition instance.

    @info(name='Alert-query')
    from #LoginAttempts[loginSuccessful==false and attempts==3]
    select userID, "3 consecutive login failures!" as message
    insert into UserSuspensionStream;

Consumes events from the inner stream, and suspends userIDs that have 3 consecutive login failures.

end;

Partition Behavior

When events are sent to LoginStream stream, following events will be generated at #LoginAttempts inner stream via Aggregation-query query, and UserSuspensionStream stream via Alert-query query.

Input to TemperatureStream At #LoginAttempts Output at UserSuspensionStream
['1001', false] ['1001', false, 1] -
['1002', true] ['1002', true, 1] -
['1002', false] ['1002', false, 1] -
['1002', false] ['1002', false, 2] -
['1001', false] ['1001', false, 2] -
['1001', true] ['1001', true, 1] -
['1001', false] ['1001', false, 2] -
['1002', false] ['1002', false, 2] ['1002', '3 consecutive login failures!']
Top