OutOfMemoryError при выполнении операции соединения в Flink - PullRequest
0 голосов
/ 22 ноября 2018

Мы конвертируем один из наших конвейеров свиноводства в мерцание, используя луч Apache.Конвейер Pig считывает два разных набора данных (R1 и R2) из ​​hdfs, обогащает их, присоединяет их и возвращает обратно в hdfs.Набор данных R1 искажен.В некотором смысле, у него есть несколько ключей с большим количеством записей.Когда мы преобразовали конвейер PIG в Apache Beam и запустили его с помощью Flink на кластере производственной пряжи, мы получили следующую ошибку

2018-11-21 16:52:25,307 ERROR org.apache.flink.runtime.operators.BatchTask                  - Error in task code:  GroupReduce (GroupReduce at CoGBK/GBK) (25/100)
java.lang.RuntimeException: Emitting the record caused an I/O exception: Failed to serialize element. Serialized size (> 1136656562 bytes) exceeds JVM heap space
        at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:69)
        at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
        at org.apache.beam.runners.flink.translation.functions.SortingFlinkCombineRunner.combine(SortingFlinkCombineRunner.java:140)
        at org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction.reduce(FlinkReduceFunction.java:85)
        at org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator$TupleUnwrappingNonCombinableGroupReducer.reduce(PlanUnwrappingReduceGroupOperator.java:111)
        at org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:131)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Failed to serialize element. Serialized size (> 1136656562 bytes) exceeds JVM heap space
        at org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:323)
        at org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:149)
        at org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper.write(DataOutputViewWrapper.java:48)
        at java.io.DataOutputStream.write(DataOutputStream.java:107)
        at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
        at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
        at java.io.ObjectOutputStream.writeNonProxyDesc(ObjectOutputStream.java:1286)
        at java.io.ObjectOutputStream.writeClassDesc(ObjectOutputStream.java:1231)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1427)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1577)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:351)
        at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:170)
        at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:50)
        at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
        at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:71)
        at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:58)
        at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:32)
        at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:98)
        at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:60)
        at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
        at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:71)
        at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36)
        at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:529)
        at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:520)
        at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:480)
        at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.serialize(CoderTypeSerializer.java:83)
        at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
        at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
        at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:131)
        at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
        at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
        ... 9 more
Caused by: java.lang.OutOfMemoryError: Java heap space
        at org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:305)
        at org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:149)
        at org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper.write(DataOutputViewWrapper.java:48)
        at java.io.DataOutputStream.write(DataOutputStream.java:107)
        at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
        at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
        at java.io.ObjectOutputStream.writeNonProxyDesc(ObjectOutputStream.java:1286)
        at java.io.ObjectOutputStream.writeClassDesc(ObjectOutputStream.java:1231)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1427)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1577)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:351)
        at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:170)
        at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:50)
        at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
        at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:71)
        at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:58)
        at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:32)
        at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:98)
        at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:60)
        at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
        at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:71)
        at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36)
        at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:529)
        at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:520)
        at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:480)
        at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.serialize(CoderTypeSerializer.java:83)
        at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
        at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
        at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:131)
        at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
        at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)

Из представления исключений на панели управления Flink Job Manager мы увидели, что этопроисходит при операции соединения.
Когда я говорю, что набор данных R1 перекошен, есть некоторые ключи с числом вхождений до 8 000 000, в то время как большинство ключей встречаются только один раз.В наборе данных R2 есть записи, ключи которых встречаются не более одного раза.
Кроме того, если исключить такие ключи с большим числом вхождений, конвейер работает абсолютно нормально, что доказывает, что это происходит только благодаря этим нескольким ключам.

Версия Hadoop: 2.7.1 Версия луча: 2.8.0 Версия Flink Runner: 2.8.0

Дайте мне знать, какую дополнительную информацию я должен получить и опубликовать здесь, чтобы вы могли помочь мне решить эту проблему.

...