Получите неожиданное исключение при использовании потока данных - PullRequest
0 голосов
/ 17 февраля 2020

когда я использую поток данных с этими шагами: - Чтение из bigquery - Преобразование строки таблицы в json строку - Вставка вasticsearch (7.5.2) Выглядит отлично, работает с ~ 100 тыс. Записей, но в реальном (8 млн. Записей ~ 65 ГБ) ) исключение потока данных после 300k вставки.

Error message from worker: java.lang.RuntimeException: unexpected org.apache.beam.runners.dataflow.worker.util.common.worker.CachingShuffleBatchReader.read(CachingShuffleBatchReader.java:104) org.apache.beam.runners.dataflow.worker.util.common.worker.BatchingShuffleEntryReader$ShuffleReadIterator.fillEntries(BatchingShuffleEntryReader.java:125) org.apache.beam.runners.dataflow.worker.util.common.worker.BatchingShuffleEntryReader$ShuffleReadIterator.fillEntriesIfNeeded(BatchingShuffleEntryReader.java:119) org.apache.beam.runners.dataflow.worker.util.common.worker.BatchingShuffleEntryReader$ShuffleReadIterator.hasNext(BatchingShuffleEntryReader.java:84) org.apache.beam.runners.dataflow.worker.util.common.ForwardingReiterator.hasNext(ForwardingReiterator.java:63) org.apache.beam.runners.dataflow.worker.util.common.worker.GroupingShuffleEntryIterator.advance(GroupingShuffleEntryIterator.java:109) org.apache.beam.runners.dataflow.worker.GroupingShuffleReader$GroupingShuffleReaderIterator.advance(GroupingShuffleReader.java:272) org.apache.beam.runners.dataflow.worker.GroupingShuffleReader$GroupingShuffleReaderIterator.start(GroupingShuffleReader.java:266) org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:361) org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:194) org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159) org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77) org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.executeWork(BatchDataflowWorker.java:411) org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:380) org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:305) org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:140) org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:120) org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:107) java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) java.base/java.lang.Thread.run(Thread.java:834) Caused by: java.util.concurrent.ExecutionException: java.io.IOException: DEADLINE_EXCEEDED: (g)RPC timed out when customerdataworker-khanhn-02170116-o318-harness-8pqr talking to customerdataworker-khanhn-02170116-o318-harness-f8zt:12346. Server unresponsive (ping error: Deadline Exceeded, {"created":"@1581934551.886578453","description":"Deadline Exceeded","file":"third_party/grpc/src/core/ext/filters/deadline/deadline_filter.cc","file_line":69,"grpc_status":4}). Typically one can self manage this issue, please read: https://cloud.google.com/dataflow/docs/guides/common-errors#tsg-rpc-timeout org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:531) org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:492) org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.AbstractFuture$TrustedFuture.get(AbstractFuture.java:83) org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:196) org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.getAndRecordStats(LocalCache.java:2312) org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278) org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154) org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044) org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952) org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974) org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958) org.apache.beam.runners.dataflow.worker.util.common.worker.CachingShuffleBatchReader.read(CachingShuffleBatchReader.java:101) ... 21 more Caused by: java.io.IOException: DEADLINE_EXCEEDED: (g)RPC timed out when customerdataworker-khanhn-02170116-o318-harness-8pqr talking to customerdataworker-khanhn-02170116-o318-harness-f8zt:12346. Server unresponsive (ping error: Deadline Exceeded, {"created":"@1581934551.886578453","description":"Deadline Exceeded","file":"third_party/grpc/src/core/ext/filters/deadline/deadline_filter.cc","file_line":69,"grpc_status":4}). Typically one can self manage this issue, please read: https://cloud.google.com/dataflow/docs/guides/common-errors#tsg-rpc-timeout org.apache.beam.runners.dataflow.worker.ApplianceShuffleReader.readIncludingPosition(Native Method) org.apache.beam.runners.dataflow.worker.ChunkingShuffleBatchReader.read(ChunkingShuffleBatchReader.java:58) org.apache.beam.runners.dataflow.worker.util.common.worker.CachingShuffleBatchReader$1.load(CachingShuffleBatchReader.java:70) org.apache.beam.runners.dataflow.worker.util.common.worker.CachingShuffleBatchReader$1.load(CachingShuffleBatchReader.java:66) org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528) org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277) ... 27 more

Я также использую эти конфиги в качестве рекомендаций при (увеличение размера диска, количество рабочих мест): https://cloud.google.com/dataflow/docs/guides/common-errors#tsg -rp c -timeout но, похоже, все еще не работает. Мои текущие конфиги:

--runner=DataflowRunner \
      --numWorkers=10 \
      --maxNumWorkers=20 \
      --diskSizeGb=150 \
      --workerMachineType=n1-standard-1 \
      --region=asia-east1" && \

--- Обновление 1: Войдите в систему в стеке драйвера введите описание изображения здесь

1 Ответ

1 голос
/ 20 февраля 2020

Спасибо за ответы, я решил проблему. Причина root заключается в том, что экземпляры виртуальных машин, созданные потоком данных, имеют правила брандмауэра, поэтому мы должны разрешить соединения между ними.

...