Как назначить группы сообщений окнам, обнаружив первое сообщение группы? - PullRequest
0 голосов
/ 12 декабря 2018

У меня следующая проблема: я получаю сообщения, которые должны быть сгруппированы, и каждая группа сообщений должна быть обработана.Я могу обнаружить только первое сообщение каждой группы.После этого конкретного первого сообщения следующие сообщения принадлежат этой группе, пока не будет обнаружено первое сообщение следующей группы.

Мой подход к решению этой проблемы заключался в написании пользовательского триггера, который возвращает FIRE_PURGE, когда он обнаруживаетпервое сообщение группы (путем переопределения onElement).Моя цель состояла в том, чтобы назначить все сообщения одной группы одному окну.

Проблема этого подхода состоит в том, что первое сообщение каждой группы всегда назначается окну предыдущей группы.

Что я получаю: [aaaaaaab], [bbbbbbbbc] ... Что я хочу: [aaaaaaa], [bbbbbbbb] ...

Соответствующий код из основной функции:

            esRawInputStream.filter(new FilterFunction<JsonNode>() {
                @Override
                public boolean filter(JsonNode doc) throws Exception {
                    return // some condition
                }
            }).keyBy(new KeySelector<JsonNode, String>() {
                @Override
                public String getKey(JsonNode doc) throws Exception {
                    return doc.findValue("meta_charge_point_id").asText();
                }
            }).window(GlobalWindows.create())
                    .trigger(new CustomEventTrigger<JsonNode, GlobalWindow>())
                    .fold(new SessionBucket(), new FoldFunction<JsonNode, SessionBucket>() {
                        @Override
                        public SessionBucket fold(SessionBucket b, JsonNode msg) throws Exception {
                            b.addMessage(msg);
                            return b;
                        }
                    }).addSink(new FileSink<SessionBucket>());

Триггер:

public class CustomEventTrigger<T, W extends Window> extends Trigger {
    private String currentSessionId = "foo";

    @Override
    public TriggerResult onElement(Object element, long timestamp, Window window, TriggerContext ctx) throws Exception {
        JsonNode jsonElement = null;
        if (element instanceof JsonNode) {
            jsonElement = (JsonNode) element;

        } else {
            // raise
        }
        TriggerResult res = TriggerResult.CONTINUE;
        String elementSessionId = jsonElement.findValue("ocpp_session_id").asText();
        if (!elementSessionId.equals(currentSessionId)) {
            currentSessionId = elementSessionId;
            res = TriggerResult.FIRE_AND_PURGE;
        }
        return res;
    }

    @Override
    public TriggerResult onProcessingTime(long time, Window window, TriggerContext ctx) throws Exception {
        return null;
    }

    @Override
    public TriggerResult onEventTime(long time, Window window, TriggerContext ctx) throws Exception {
        return null;
    }

    @Override
    public void clear(Window window, TriggerContext ctx) throws Exception {

    }
} 

1 Ответ

0 голосов
/ 15 декабря 2018

Этот вариант использования не очень хорошо подходит для оконного API Флинка.Позвольте мне предложить альтернативу, которая заключается в том, чтобы сделать это с помощью функции flatmap с состоянием.

Вот пример того, как это может выглядеть:

public class Segmenting {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        env.fromElements(1, 2, 2, 3, 3, 3, 1, 4, 4, 4, 4, 2, 2)
            // key the stream so we can used keyed state
            .keyBy(event -> 1)
            .flatMap(new RichFlatMapFunction<Integer, List<Integer>>() {
                private transient ValueState<Integer> currentValue;
                private transient ListState<Integer> list;

                @Override
                public void open(Configuration parameters) throws Exception {
                    currentValue = getRuntimeContext().getState(new ValueStateDescriptor<>("currentValue", Integer.class));
                    list = getRuntimeContext().getListState(new ListStateDescriptor<>("list", Integer.class));
                }

                @Override
                public void flatMap(Integer event, Collector<List<Integer>> collector) throws Exception {
                    Integer value = currentValue.value();

                    if (value == event) {
                        list.add(event);
                    } else {
                        if (value != null) {
                            List<Integer> result = new ArrayList<>();
                            list.get().forEach(result::add);
                            collector.collect(result);
                        }
                        currentValue.update(event);
                        list.clear();
                        list.add(event);
                    }
                }
            })
            .print();

        env.execute();
    }
}

Вывод

[1]
[2, 2]
[3, 3, 3]
[1]
[4, 4, 4, 4]

Кстати, я предполагаю, что данные в порядкеи избегаю параллельной обработки, чтобы поддерживать ее в порядке.Для большинства приложений потоковой обработки это было бы нереалистичным предположением.Если ваши данные будут не в порядке, вы можете использовать это в качестве отправной точки, но окончательное решение будет более сложным.

...