Почему итеративные операции генерируют ошибки в функциях Apache Beam? - PullRequest
0 голосов
/ 02 октября 2018

Я вызываю эту функцию в Iterable коллекции Java, полученной из функции GroupByKey:

static class FindCompleteOrder extends DoFn<KV<String, Iterable<Order>>, Order> {
    String COMPLETE_EVENT_NAME = "COMPLETE";

    @ProcessElement
    public void processElement(ProcessContext c) {
        Iterable<Order> orders = c.element().getValue();
        boolean complete = false;

        do {
            try {
                Order order = orders.iterator().next();

                if (order.getEventName().equals(COMPLETE_EVENT_NAME)) {
                    complete = true;
                    order.setComplete(complete);
                    c.output(order);
                }
            } catch (Exception e) {
                LOG.error(e.getMessage());
            }
        } while (complete == false && orders.iterator().hasNext());
    }
}

Функция выполняет итерацию списка Orders и выводит первый экземпляр, соответствующийуказанное eventName свойство.Цикл заканчивается, если найден либо Order, либо итерация всей коллекции.

Случайные Order экземпляры генерируются в восходящем направлении и публикуются в экземпляре Pub / Sub ,со скоростью 2 в секунду, где они потребляются экземпляром DataFlow , из которого вызывается эта функция.Прибл.Через 15 минут после начала операции начинают появляться предупреждения:

Обработка, застрявшая в шаге Поиск заказа, по крайней мере на 15 мсек без вывода или завершения

Предупреждение выдается в результатеспорадический сбой либо iterator().hasNext(), либо iterator().next().Конечным результатом является то, что весь трубопровод останавливается.Связанный этап конвейера никогда не генерирует выходной сигнал.

Замена цикла стандартным циклом for решает проблему.Тем не менее, это означает итерацию всей коллекции;Я бы предпочел завершить цикл, когда найден соответствующий элемент, следовательно, цикл do-while.

Мне интересно знать, почему операции iterator вызывают остановку канала.FAIA коллекция Iterable является неизменной и не изменяется другими процессами.

Я использую Java 8 и Apache Beam 2.6 на Windows .

1 Ответ

0 голосов
/ 02 октября 2018

Каждый раз, когда вы вызываете orders.iterator(), вы создаете новый итератор, начиная с первого заказа.Это означает, что вы обрабатываете один и тот же заказ снова и снова в цикле.Ваш звонок на hasNext() всегда будет верным, если есть более одного заказа.Таким образом, если у вас более одного ордера или ваш первый ордер не установлен complete, цикл будет работать вечно, поэтому вы попадаете в тайм-аут.

Вместо этого вам следует один раз вызвать iterator() исохраните итератор вместо итерируемого, используя его для цикла:

static class FindCompleteOrder extends DoFn<KV<String, Iterable<Order>>, Order> {
    String COMPLETE_EVENT_NAME = "COMPLETE";

    @ProcessElement
    public void processElement(ProcessContext c) {
        Iterator<Order> orders = c.element().getValue().iterator();
        boolean complete = false;

        do {
            try {
                Order order = orders.next();

                if (order.getEventName().equals(COMPLETE_EVENT_NAME)) {
                    complete = true;
                    order.setComplete(complete);
                    c.output(order);
                }
            } catch (Exception e) {
                LOG.error(e.getMessage());
            }
        } while (complete == false && orders.hasNext());
    }
}
...