Я использую kafka 0.10 для получения сообщений в режиме реального времени и использую spark 2.2 в качестве обработчика для анализа записей и сохранения их в файле данных Hadoop.
1) Я использую пакетный интервал в 120 секунд и получаю 110записей / сек.
2) Использование динамического выделения памяти в spark с включением spark.dynamicallocation.enable = true
3) Потоковое задание способно успешно завершить пакеты первоначально от 100 до 120 пакетов.
4) И затем из следующего пакета некоторые задачи в задании продолжают работать в течение 6 часов, 8 часов и задача завершается. Когда я вижу stderr журнала, его код отображения выглядит как
Генерируетсякод (строка 195 звонков stopEarly()
).
/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIteratorForCodegenStage1(references);
/* 003 */ }
/* 004 */
/* 005 */ // codegenStageId=1
/* 006 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator
{
/* 007 */ private Object[] references;
/* 008 */ private scala.collection.Iterator[] inputs;
/* 009 */ private boolean agg_initAgg;
/* 010 */ private boolean agg_bufIsNull;
/* 011 */ private double agg_bufValue;
/* 012 */ private boolean agg_bufIsNull1;
/* 013 */ private long agg_bufValue1;
/* 014 */ private agg_FastHashMap agg_fastHashMap;
/* 015 */ private org.apache.spark.unsafe.KVIterator<UnsafeRow, UnsafeRow> agg_fastHashMapIter;
/* 016 */ private org.apache.spark.unsafe.KVIterator agg_mapIter;
/* 017 */ private org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap agg_hashMap;
/* 018 */ private org.apache.spark.sql.execution.UnsafeKVExternalSorter agg_sorter;
/* 019 */ private scala.collection.Iterator inputadapter_input;
/* 020 */ private boolean agg_agg_isNull11;
/* 021 */ private boolean agg_agg_isNull25;
/* 022 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder[] agg_mutableStateArray1
= new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder[2];
/* 023 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] agg_mutableStateArray2
= new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[2];
/* 024 */ private UnsafeRow[] agg_mutableStateArray = new UnsafeRow[2];
/* 025 */
/* 026 */ public GeneratedIteratorForCodegenStage1(Object[] references) {
/* 027 */ this.references = references;
/* 028 */ }
/* 029 */
/* 030 */ public void init(int index, scala.collection.Iterator[] inputs) {
/* 031 */ partitionIndex = index;
/* 032 */ this.inputs = inputs;
/* 033 */
/* 034 */ agg_fastHashMap = new agg_FastHashMap(((org.apache.spark.sql.execution.aggregate.HashAggregateExec)
references[0] /* plan */).getTaskMemoryManager(), ((org.apache.spark.sql.execution.aggregate.HashAggregateExec)
references[0] /* plan */).getEmptyAggregationBuffer());
/* 035 */ agg_hashMap = ((org.apache.spark.sql.execution.aggregate.HashAggregateExec)
references[0] /* plan */).createHashMap();
/* 036 */ inputadapter_input = inputs[0];
/* 037 */ agg_mutableStateArray[0] = new UnsafeRow(1);
/* 038 */ agg_mutableStateArray1[0] = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_mutableStateArray[0],
32);
/* 039 */ agg_mutableStateArray2[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_mutableStateArray1[0],
1);
/* 040 */ agg_mutableStateArray[1] = new UnsafeRow(3);
/* 041 */ agg_mutableStateArray1[1] = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_mutableStateArray[1],
32);
/* 042 */ agg_mutableStateArray2[1] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_mutableStateArray1[1],
3);
etc etc..
- Did anyone face these type of issue? If so can please help!!