Hi,
1) is there any way to reset an int32 variable such as totalCount to zero after every 10 minutes? I have an aggregate operator with some int32 totalCount variables. These variables keep on increasing. Then I have a tumbling window with eviction policy of time value 600, which is 600 seconds or 10 minutes. I wanted to reset the value of all my totalCount variables to zero whenever the eviction policy of the tumbling window is triggered.
2) Or is there any way to insert a punctuation mark after every 10 minutes by using the Punctor operator? By doing this, I can change my eviction policy to punctuation mode, and I can add onPunct in the logic clause to reset those variables to zero whenever I detected a punctuation mark. Is this method also one of the alternative ways? Are both of these methods giving me the same outcome ?
Thanks in advance!
Answer by hnasgaard (1441) | Nov 25, 2015 at 06:17 AM
Have you considered using the Custom aggregation function? With that you can control what happens at eviction time. Here some sample code: type CountContext = int32 sum;
boolean init(mutable CountContext c) {
c.sum = 0;
return false;
}
boolean process(int32 i, mutable CountContext c) {
c.sum += i;
return false;
}
int32 result(mutable CountContext c) {
int32 count = c.sum;
c.sum = 0;
return count;
}
composite Main {
graph
stream<int32 i> Beat = Beacon() {
param period : .1;
output Beat : i = 1;
}
stream<int32 count> Agg = Aggregate(Beat) {
logic state : {
mutable CountContext countContext;
}
window Beat : tumbling, time(60.0);
output Agg : count=Custom(init(countContext), process(i, countContext), result(countContext));
}
() as Out = Custom(Agg as IPort0) {
logic onTuple IPort0 : println(IPort0);
}
}