У меня следующая проблема: я получаю сообщения, которые должны быть сгруппированы, и каждая группа сообщений должна быть обработана.Я могу обнаружить только первое сообщение каждой группы.После этого конкретного первого сообщения следующие сообщения принадлежат этой группе, пока не будет обнаружено первое сообщение следующей группы.
Мой подход к решению этой проблемы заключался в написании пользовательского триггера, который возвращает FIRE_PURGE, когда он обнаруживаетпервое сообщение группы (путем переопределения onElement).Моя цель состояла в том, чтобы назначить все сообщения одной группы одному окну.
Проблема этого подхода состоит в том, что первое сообщение каждой группы всегда назначается окну предыдущей группы.
Что я получаю: [aaaaaaab], [bbbbbbbbc] ... Что я хочу: [aaaaaaa], [bbbbbbbb] ...
Соответствующий код из основной функции:
esRawInputStream.filter(new FilterFunction<JsonNode>() {
@Override
public boolean filter(JsonNode doc) throws Exception {
return // some condition
}
}).keyBy(new KeySelector<JsonNode, String>() {
@Override
public String getKey(JsonNode doc) throws Exception {
return doc.findValue("meta_charge_point_id").asText();
}
}).window(GlobalWindows.create())
.trigger(new CustomEventTrigger<JsonNode, GlobalWindow>())
.fold(new SessionBucket(), new FoldFunction<JsonNode, SessionBucket>() {
@Override
public SessionBucket fold(SessionBucket b, JsonNode msg) throws Exception {
b.addMessage(msg);
return b;
}
}).addSink(new FileSink<SessionBucket>());
Триггер:
public class CustomEventTrigger<T, W extends Window> extends Trigger {
private String currentSessionId = "foo";
@Override
public TriggerResult onElement(Object element, long timestamp, Window window, TriggerContext ctx) throws Exception {
JsonNode jsonElement = null;
if (element instanceof JsonNode) {
jsonElement = (JsonNode) element;
} else {
// raise
}
TriggerResult res = TriggerResult.CONTINUE;
String elementSessionId = jsonElement.findValue("ocpp_session_id").asText();
if (!elementSessionId.equals(currentSessionId)) {
currentSessionId = elementSessionId;
res = TriggerResult.FIRE_AND_PURGE;
}
return res;
}
@Override
public TriggerResult onProcessingTime(long time, Window window, TriggerContext ctx) throws Exception {
return null;
}
@Override
public TriggerResult onEventTime(long time, Window window, TriggerContext ctx) throws Exception {
return null;
}
@Override
public void clear(Window window, TriggerContext ctx) throws Exception {
}
}