мои входные данные хранятся в Cassandra, и я использую таблицу, первичным ключом которой является год, месяц, день и час, в качестве источника для агрегации Spark.Приложение My Spark
- объединяет две таблицы
- принимает объединенные таблицы и выбирает данные по часам
- объединяет выбранные блоки по часам
- выполняет агрегирование поНабор данных результата и сохранение в Cassandra
Упрощение
val ds1 = spark.read.cassandraFormat(table1, keyspace).load().as[T]
val ds2 = spark.read.cassandraFormat(table2, keyspace).load().as[T]
val dsInput = ds1.join(ds2).coalesce(150)
val dsUnion = for (x <- hours) yield dsInput.select( where hour = x)
val dsResult = mySparkAggregation( dsUnion.reduce(_.union(_)).coalesce(10) )
dsResult.saveToCassadnra
`
Диаграмма результата выглядит следующим образом (для 3 часов / объединений)
Все работает нормально, когда я делаю только пару союзов, например, 24 (за один день), но когда я начал выполнять это задание Spark в течение 1 месяца (720 союзов), чем яначал получать такую ошибку
Общий размер сериализованных результатов 1126 задач (1024,8 МБ) больше, чем spark.driver.maxResultSize (1024,0 МБ)
Другая тревогаДело в том, что задание создает ~ 100 тыс. заданий, и один из этапов (тот, который вызвал ошибку выше) содержит 74400 заданий, а при обработке 1125 происходит сбой из-за maxResultSize.Более того, кажется, что он должен перетасовывать данные за каждый час (объединение).
Я пытался объединить число задач после объединения - чем он говорит, что задача слишком большая.
Буду очень признателен за любую помощь, предложения?У меня ощущение, что я делаю что-то не так.
Я провел некоторое исследование и получил некоторый вывод. Допустим, у нас есть две таблицы
cb.people CREATE TABLE cb.people (id text PRIMARYКЛЮЧ, текст имени)
и cb.address CREATE TABLE cb.address (текст people_id ПЕРВИЧНЫЙ КЛЮЧ, текст имени)
со следующими данными
cassandra@cqlsh> select * from cb.people;
id | name
----+---------
3 | Mariusz
2 | Monica
1 | John
cassandra@cqlsh> select * from cb.address;
people_id | name
-----------+--------
3 | POLAND
2 | USA
1 | USA
Теперь яхотел бы получить объединенный результат для идентификаторов 1 и 2. Есть два возможных решения.
Объединение два выберите для идентификаторов 1 и 2 из таблицы people, а затем объедините с таблицей адресов
scala> val people = spark.read.cassandraFormat("people", "cb").load()
scala> val usPeople = people.where(col("id") === "1") union people.where(col("id") === "2")
scala> val address = spark.read.cassandraFormat("address", "cb").load()
scala> val joined = usPeople.join(address, address.col("people_id") === usPeople.col("id"))
Соедините две таблицы, а затем объедините два и выберите для идентификатора 1 и 2
scala> val peopleAddress = address.join(usPeople, address.col("people_id") === usPeople.col("id"))
scala> val joined2 = peopleAddress.where(col("id") === "1") union peopleAddress.where(col("id") === "2")
оба возвращают одинаковый результат
+---------+----+---+------+
|people_id|name| id| name|
+---------+----+---+------+
| 1| USA| 1| John|
| 2| USA| 2|Monica|
+---------+----+---+------+
Но, глядя на объяснение, я вижу большую разницу
scala> joined.explain
== Physical Plan ==
*SortMergeJoin [people_id#10], [id#0], Inner
:- *Sort [people_id#10 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(people_id#10, 200)
: +- *Filter (((people_id#10 = 1) || (people_id#10 = 2)) && isnotnull(people_id#10))
: +- *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@3077e4aa [people_id#10,name#11] PushedFilters: [Or(EqualTo(people_id,1),EqualTo(people_id,2)), IsNotNull(people_id)], ReadSchema: struct<people_id:string,name:string>
+- *Sort [id#0 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(id#0, 200)
+- Union
:- *Filter isnotnull(id#0)
: +- *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@6846e4e8 [id#0,name#1] PushedFilters: [IsNotNull(id), *EqualTo(id,1)], ReadSchema: struct<id:string,name:string>
+- *Filter isnotnull(id#0)
+- *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@6846e4e8 [id#0,name#1] PushedFilters: [IsNotNull(id), *EqualTo(id,2)], ReadSchema: struct<id:string,name:string>
scala> joined2.explain
== Physical Plan ==
Union
:- *SortMergeJoin [people_id#10], [id#0], Inner
: :- *Sort [people_id#10 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(people_id#10, 200)
: : +- *Filter isnotnull(people_id#10)
: : +- *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@3077e4aa [people_id#10,name#11] PushedFilters: [*EqualTo(people_id,1), IsNotNull(people_id)], ReadSchema: struct<people_id:string,name:string>
: +- *Sort [id#0 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#0, 200)
: +- Union
: :- *Filter isnotnull(id#0)
: : +- *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@6846e4e8 [id#0,name#1] PushedFilters: [IsNotNull(id), *EqualTo(id,1)], ReadSchema: struct<id:string,name:string>
: +- *Filter (isnotnull(id#0) && (id#0 = 1))
: +- *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@6846e4e8 [id#0,name#1] PushedFilters: [IsNotNull(id), *EqualTo(id,2), EqualTo(id,1)], ReadSchema: struct<id:string,name:string>
+- *SortMergeJoin [people_id#10], [id#0], Inner
:- *Sort [people_id#10 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(people_id#10, 200)
: +- *Filter isnotnull(people_id#10)
: +- *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@3077e4aa [people_id#10,name#11] PushedFilters: [IsNotNull(people_id), *EqualTo(people_id,2)], ReadSchema: struct<people_id:string,name:string>
+- *Sort [id#0 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(id#0, 200)
+- Union
:- *Filter (isnotnull(id#0) && (id#0 = 2))
: +- *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@6846e4e8 [id#0,name#1] PushedFilters: [IsNotNull(id), *EqualTo(id,1), EqualTo(id,2)], ReadSchema: struct<id:string,name:string>
+- *Filter isnotnull(id#0)
+- *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@6846e4e8 [id#0,name#1] PushedFilters: [IsNotNull(id), *EqualTo(id,2)], ReadSchema: struct<id:string,name:string>
Теперь для меня совершенно ясно, что я сделал эту версию join2, когда в цикле для каждого объединения был назван join,Я думал, что Spark был бы достаточно умен, чтобы свести это к первой версии ...
Теперь текущий график выглядит намного лучше
Я надеюсь, что другиелюди не сделают ту же ошибку, что и я :) К сожалению, я покрыл искру своим уровнем абстракции, который покрывает эту простую проблему, поэтому spark-shell очень помогла смоделировать проблему.