Задача висит в Spark Streaming от кафки - PullRequest
0 голосов
/ 31 октября 2019

Я использую kafka 0.10 для получения сообщений в режиме реального времени и использую spark 2.2 в качестве обработчика для анализа записей и сохранения их в файле данных Hadoop.

1) Я использую пакетный интервал в 120 секунд и получаю 110записей / сек.

2) Использование динамического выделения памяти в spark с включением spark.dynamicallocation.enable = true

3) Потоковое задание способно успешно завершить пакеты первоначально от 100 до 120 пакетов.

4) И затем из следующего пакета некоторые задачи в задании продолжают работать в течение 6 часов, 8 часов и задача завершается. Когда я вижу stderr журнала, его код отображения выглядит как

Генерируетсякод (строка 195 звонков stopEarly()).

/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIteratorForCodegenStage1(references);
/* 003 */ }
/* 004 */
/* 005 */ // codegenStageId=1
/* 006 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator
{
/* 007 */   private Object[] references;
/* 008 */   private scala.collection.Iterator[] inputs;
/* 009 */   private boolean agg_initAgg;
/* 010 */   private boolean agg_bufIsNull;
/* 011 */   private double agg_bufValue;
/* 012 */   private boolean agg_bufIsNull1;
/* 013 */   private long agg_bufValue1;
/* 014 */   private agg_FastHashMap agg_fastHashMap;
/* 015 */   private org.apache.spark.unsafe.KVIterator<UnsafeRow, UnsafeRow> agg_fastHashMapIter;
/* 016 */   private org.apache.spark.unsafe.KVIterator agg_mapIter;
/* 017 */   private org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap agg_hashMap;
/* 018 */   private org.apache.spark.sql.execution.UnsafeKVExternalSorter agg_sorter;
/* 019 */   private scala.collection.Iterator inputadapter_input;
/* 020 */   private boolean agg_agg_isNull11;
/* 021 */   private boolean agg_agg_isNull25;
/* 022 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder[] agg_mutableStateArray1
= new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder[2];
/* 023 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] agg_mutableStateArray2
= new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[2];
/* 024 */   private UnsafeRow[] agg_mutableStateArray = new UnsafeRow[2];
/* 025 */
/* 026 */   public GeneratedIteratorForCodegenStage1(Object[] references) {
/* 027 */     this.references = references;
/* 028 */   }
/* 029 */
/* 030 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 031 */     partitionIndex = index;
/* 032 */     this.inputs = inputs;
/* 033 */
/* 034 */     agg_fastHashMap = new agg_FastHashMap(((org.apache.spark.sql.execution.aggregate.HashAggregateExec)
references[0] /* plan */).getTaskMemoryManager(), ((org.apache.spark.sql.execution.aggregate.HashAggregateExec)
references[0] /* plan */).getEmptyAggregationBuffer());
/* 035 */     agg_hashMap = ((org.apache.spark.sql.execution.aggregate.HashAggregateExec)
references[0] /* plan */).createHashMap();
/* 036 */     inputadapter_input = inputs[0];
/* 037 */     agg_mutableStateArray[0] = new UnsafeRow(1);
/* 038 */     agg_mutableStateArray1[0] = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_mutableStateArray[0],
32);
/* 039 */     agg_mutableStateArray2[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_mutableStateArray1[0],
1);
/* 040 */     agg_mutableStateArray[1] = new UnsafeRow(3);
/* 041 */     agg_mutableStateArray1[1] = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_mutableStateArray[1],
32);
/* 042 */     agg_mutableStateArray2[1] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_mutableStateArray1[1],
3);

etc etc..

 - Did anyone face these type of issue? If so can please help!!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...