Я вызываю эту функцию в Iterable
коллекции Java, полученной из функции GroupByKey
:
static class FindCompleteOrder extends DoFn<KV<String, Iterable<Order>>, Order> {
String COMPLETE_EVENT_NAME = "COMPLETE";
@ProcessElement
public void processElement(ProcessContext c) {
Iterable<Order> orders = c.element().getValue();
boolean complete = false;
do {
try {
Order order = orders.iterator().next();
if (order.getEventName().equals(COMPLETE_EVENT_NAME)) {
complete = true;
order.setComplete(complete);
c.output(order);
}
} catch (Exception e) {
LOG.error(e.getMessage());
}
} while (complete == false && orders.iterator().hasNext());
}
}
Функция выполняет итерацию списка Orders
и выводит первый экземпляр, соответствующийуказанное eventName
свойство.Цикл заканчивается, если найден либо Order
, либо итерация всей коллекции.
Случайные Order
экземпляры генерируются в восходящем направлении и публикуются в экземпляре Pub / Sub ,со скоростью 2 в секунду, где они потребляются экземпляром DataFlow , из которого вызывается эта функция.Прибл.Через 15 минут после начала операции начинают появляться предупреждения:
Обработка, застрявшая в шаге Поиск заказа, по крайней мере на 15 мсек без вывода или завершения
Предупреждение выдается в результатеспорадический сбой либо iterator().hasNext()
, либо iterator().next()
.Конечным результатом является то, что весь трубопровод останавливается.Связанный этап конвейера никогда не генерирует выходной сигнал.
Замена цикла стандартным циклом for решает проблему.Тем не менее, это означает итерацию всей коллекции;Я бы предпочел завершить цикл, когда найден соответствующий элемент, следовательно, цикл do-while.
Мне интересно знать, почему операции iterator
вызывают остановку канала.FAIA коллекция Iterable
является неизменной и не изменяется другими процессами.
Я использую Java 8 и Apache Beam 2.6 на Windows .