Размер результата Spark слишком велик - используйте - PullRequest
0 голосов
/ 10 мая 2018

мои входные данные хранятся в Cassandra, и я использую таблицу, первичным ключом которой является год, месяц, день и час, в качестве источника для агрегации Spark.Приложение My Spark

  1. объединяет две таблицы
  2. принимает объединенные таблицы и выбирает данные по часам
  3. объединяет выбранные блоки по часам
  4. выполняет агрегирование поНабор данных результата и сохранение в Cassandra

Упрощение

val ds1 = spark.read.cassandraFormat(table1, keyspace).load().as[T]
val ds2 = spark.read.cassandraFormat(table2, keyspace).load().as[T]
val dsInput = ds1.join(ds2).coalesce(150)
val dsUnion = for (x <- hours) yield dsInput.select( where hour = x) 
val dsResult = mySparkAggregation( dsUnion.reduce(_.union(_)).coalesce(10) )
dsResult.saveToCassadnra

`

Диаграмма результата выглядит следующим образом (для 3 часов / объединений)

enter image description here

Все работает нормально, когда я делаю только пару союзов, например, 24 (за один день), но когда я начал выполнять это задание Spark в течение 1 месяца (720 союзов), чем яначал получать такую ​​ошибку

Общий размер сериализованных результатов 1126 задач (1024,8 МБ) больше, чем spark.driver.maxResultSize (1024,0 МБ)

Другая тревогаДело в том, что задание создает ~ 100 тыс. заданий, и один из этапов (тот, который вызвал ошибку выше) содержит 74400 заданий, а при обработке 1125 происходит сбой из-за maxResultSize.Более того, кажется, что он должен перетасовывать данные за каждый час (объединение).

Я пытался объединить число задач после объединения - чем он говорит, что задача слишком большая.

Буду очень признателен за любую помощь, предложения?У меня ощущение, что я делаю что-то не так.


Я провел некоторое исследование и получил некоторый вывод. Допустим, у нас есть две таблицы

cb.people CREATE TABLE cb.people (id text PRIMARYКЛЮЧ, текст имени)

и cb.address CREATE TABLE cb.address (текст people_id ПЕРВИЧНЫЙ КЛЮЧ, текст имени)

со следующими данными

cassandra@cqlsh> select * from cb.people;

 id | name
----+---------
  3 | Mariusz
  2 |  Monica
  1 |    John



cassandra@cqlsh> select * from cb.address;

 people_id | name
-----------+--------
         3 | POLAND
         2 |    USA
         1 |    USA

Теперь яхотел бы получить объединенный результат для идентификаторов 1 и 2. Есть два возможных решения.

  1. Объединение два выберите для идентификаторов 1 и 2 из таблицы people, а затем объедините с таблицей адресов

    scala> val people =  spark.read.cassandraFormat("people", "cb").load()
    scala>  val usPeople = people.where(col("id") === "1") union people.where(col("id") === "2")
    scala> val address =  spark.read.cassandraFormat("address", "cb").load()
    scala> val joined = usPeople.join(address, address.col("people_id") === usPeople.col("id"))
    
  2. Соедините две таблицы, а затем объедините два и выберите для идентификатора 1 и 2

    scala> val peopleAddress = address.join(usPeople, address.col("people_id") === usPeople.col("id"))
    scala> val joined2 = peopleAddress.where(col("id") === "1") union peopleAddress.where(col("id") === "2")
    

оба возвращают одинаковый результат

+---------+----+---+------+                                                     
|people_id|name| id|  name|
+---------+----+---+------+
|        1| USA|  1|  John|
|        2| USA|  2|Monica|
+---------+----+---+------+

Но, глядя на объяснение, я вижу большую разницу

scala> joined.explain
== Physical Plan ==
*SortMergeJoin [people_id#10], [id#0], Inner
:- *Sort [people_id#10 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(people_id#10, 200)
:     +- *Filter (((people_id#10 = 1) || (people_id#10 = 2)) && isnotnull(people_id#10))
:        +- *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@3077e4aa [people_id#10,name#11] PushedFilters: [Or(EqualTo(people_id,1),EqualTo(people_id,2)), IsNotNull(people_id)], ReadSchema: struct<people_id:string,name:string>
+- *Sort [id#0 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(id#0, 200)
      +- Union
         :- *Filter isnotnull(id#0)
         :  +- *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@6846e4e8 [id#0,name#1] PushedFilters: [IsNotNull(id), *EqualTo(id,1)], ReadSchema: struct<id:string,name:string>
         +- *Filter isnotnull(id#0)
            +- *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@6846e4e8 [id#0,name#1] PushedFilters: [IsNotNull(id), *EqualTo(id,2)], ReadSchema: struct<id:string,name:string>

scala> joined2.explain
== Physical Plan ==
Union
:- *SortMergeJoin [people_id#10], [id#0], Inner
:  :- *Sort [people_id#10 ASC NULLS FIRST], false, 0
:  :  +- Exchange hashpartitioning(people_id#10, 200)
:  :     +- *Filter isnotnull(people_id#10)
:  :        +- *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@3077e4aa [people_id#10,name#11] PushedFilters: [*EqualTo(people_id,1), IsNotNull(people_id)], ReadSchema: struct<people_id:string,name:string>
:  +- *Sort [id#0 ASC NULLS FIRST], false, 0
:     +- Exchange hashpartitioning(id#0, 200)
:        +- Union
:           :- *Filter isnotnull(id#0)
:           :  +- *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@6846e4e8 [id#0,name#1] PushedFilters: [IsNotNull(id), *EqualTo(id,1)], ReadSchema: struct<id:string,name:string>
:           +- *Filter (isnotnull(id#0) && (id#0 = 1))
:              +- *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@6846e4e8 [id#0,name#1] PushedFilters: [IsNotNull(id), *EqualTo(id,2), EqualTo(id,1)], ReadSchema: struct<id:string,name:string>
+- *SortMergeJoin [people_id#10], [id#0], Inner
   :- *Sort [people_id#10 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(people_id#10, 200)
   :     +- *Filter isnotnull(people_id#10)
   :        +- *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@3077e4aa [people_id#10,name#11] PushedFilters: [IsNotNull(people_id), *EqualTo(people_id,2)], ReadSchema: struct<people_id:string,name:string>
   +- *Sort [id#0 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(id#0, 200)
         +- Union
            :- *Filter (isnotnull(id#0) && (id#0 = 2))
            :  +- *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@6846e4e8 [id#0,name#1] PushedFilters: [IsNotNull(id), *EqualTo(id,1), EqualTo(id,2)], ReadSchema: struct<id:string,name:string>
            +- *Filter isnotnull(id#0)
               +- *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@6846e4e8 [id#0,name#1] PushedFilters: [IsNotNull(id), *EqualTo(id,2)], ReadSchema: struct<id:string,name:string>

Теперь для меня совершенно ясно, что я сделал эту версию join2, когда в цикле для каждого объединения был назван join,Я думал, что Spark был бы достаточно умен, чтобы свести это к первой версии ...

Теперь текущий график выглядит намного лучше enter image description here

Я надеюсь, что другиелюди не сделают ту же ошибку, что и я :) К сожалению, я покрыл искру своим уровнем абстракции, который покрывает эту простую проблему, поэтому spark-shell очень помогла смоделировать проблему.

...