У меня есть фрагмент кода, который группирует мои данные, но при выводе выдает исключение.
Этот класс используется как ключ в KV
class CKey {
private Long id;
private Long subId;
}
Это часть моего Задание потока данных
TupleTag<CItem> itemsTuple = //...
TupleTag<CMeta> metaTuple = //...
//...
PCollection<KV<CKey, CItem>> items = null;
PCollection<KV<CKey, CMeta>> meta;
KeyedPCollectionTuple.of(itemsTuple, items).and(metaTuple, meta.next())
.apply(CoGroupByKey.create())
.apply(new CustomGroupPairsFn());
Пользовательская функция для объединения данных
class CustomGroupPairsFn extends DoFn<KV<CKey, CoGbkResult>, MyCustomObject> {
@ProcessElement
public void processElement(@Element KV<CKey, CoGbkResult> element, OutputReceiver<MyCustomObject> out) {
CoGbkResult pair = element.getValue();
Iterator<CItem> citem = pair.getAll(ITEMS).iterator();
Iterator<CMeta> cmeta = pair.getAll(METADATA).iterator();
try {
out.output(new MyCustomObject(citem.next(), cmeta));
} catch (Exception e) {
log.error("Error occurred", e);
}
}
}
В try
есть только 1 строка кода, и исключение выдается внутри, исключение:
![enter image description here](https://i.stack.imgur.com/u3wgD.png)
Как решить проблему?