Avro исключение "не открыто" при записи общих записей с использованием Apache Beam - PullRequest
0 голосов
/ 16 ноября 2018

Я использую AvroIO.<MyCustomType>writeCustomTypeToGenericRecords() для записи общих записей в GCS внутри задания потоковой передачи данных. В первые несколько минут все работает нормально, однако примерно через 10 минут задание выдает следующую ошибку:

java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: org.apache.avro.AvroRuntimeException: not open
        com.google.cloud.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:183)
        com.google.cloud.dataflow.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue(GroupAlsoByWindowFnRunner.java:102)
        org.apache.beam.runners.core.ReduceFnRunner.lambda$onTrigger$1(ReduceFnRunner.java:1057)
        org.apache.beam.runners.core.ReduceFnContextFactory$OnTriggerContextImpl.output(ReduceFnContextFactory.java:438)
        org.apache.beam.runners.core.SystemReduceFn.onTrigger(SystemReduceFn.java:125)
        org.apache.beam.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:1060)
        org.apache.beam.runners.core.ReduceFnRunner.onTimers(ReduceFnRunner.java:768)
        com.google.cloud.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:95)
        com.google.cloud.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:42)
        com.google.cloud.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:115)
        com.google.cloud.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73)
        org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
        com.google.cloud.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:133)
        com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:43)
        com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:48)
        com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:200)
        com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:158)
        com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:75)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1227)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:136)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:966)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.beam.sdk.util.UserCodeException: org.apache.avro.AvroRuntimeException: not open
        org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34)
        org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn$DoFnInvoker.invokeProcessElement(Unknown Source)
        org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)
        org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:237)
        com.google.cloud.dataflow.worker.StreamingSideInputDoFnRunner.processElement(StreamingSideInputDoFnRunner.java:72)
        com.google.cloud.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:324)
        com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:43)
        com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:48)
        com.google.cloud.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:181)
        com.google.cloud.dataflow.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue(GroupAlsoByWindowFnRunner.java:102)
        org.apache.beam.runners.core.ReduceFnRunner.lambda$onTrigger$1(ReduceFnRunner.java:1057)
        org.apache.beam.runners.core.ReduceFnContextFactory$OnTriggerContextImpl.output(ReduceFnContextFactory.java:438)
        org.apache.beam.runners.core.SystemReduceFn.onTrigger(SystemReduceFn.java:125)
        org.apache.beam.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:1060)
        org.apache.beam.runners.core.ReduceFnRunner.onTimers(ReduceFnRunner.java:768)
        com.google.cloud.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:95)
        com.google.cloud.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:42)
        com.google.cloud.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:115)
        com.google.cloud.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73)
        org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
        com.google.cloud.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:133)
        com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:43)
        com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:48)
        com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:200)
        com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:158)
        com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:75)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1227)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:136)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:966)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.avro.AvroRuntimeException: not open
        org.apache.avro.file.DataFileWriter.assertOpen(DataFileWriter.java:82)
        org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:299)
        org.apache.beam.sdk.io.AvroSink$AvroWriter.write(AvroSink.java:123)
        org.apache.beam.sdk.io.WriteFiles.writeOrClose(WriteFiles.java:550)
        org.apache.beam.sdk.io.WriteFiles.access$1000(WriteFiles.java:112)
        org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn.processElement(WriteFiles.java:718)

Задание потока данных продолжает работать нормально, хотя. Просто, чтобы дать некоторое представление о работе потоковой передачи: он извлекает сообщения из Pub / Sub, создает фиксированное окно в 5 минут с триггером из 10 000 сообщений (в зависимости от того, что наступит раньше), обрабатывает сообщения и, наконец, записывает в корзину GCP, посредством чего каждый конкретный Тип сообщения отправляется в определенную папку в зависимости от типа сообщения, используя .to(new AvroEventDynamicDestinations(avroBaseDir, schemaView)).

ОБНОВЛЕНИЕ 1: Глядя на временную метку этой ошибки, кажется, что она появляется точно с интервалом в 10 секунд, то есть 6 в минуту.

1 Ответ

0 голосов
/ 22 ноября 2018

У меня было точно такое же исключение. Моя проблема возникла из-за неправильной схемы, точнее нулевой схемы (не найдена в реестре схем)

...