Spark 2.2.1: array_contains в условии соединения вызывает ошибку «больше, чем spark.driver.maxResultSize» - PullRequest
0 голосов
/ 27 января 2019

Я начал видеть следующую ошибку после развертывания некоторых изменений в SQL-запросе Spark в среде AWS Glue Spark 2.2.1:

org.apache.spark.SparkException: задание прервано из-за сбоя этапа: Общий размер сериализованных результатов 164 задач (1031,4 МБ) больше, чем spark.driver.maxResultSize (1024,0 МБ)

Я попытался отключить широковещательные соединения с помощью set ("spark.sql.autoBroadcastJoinThreshold",«-1») и увеличение maxResultSize, которое вызвало другие ошибки, но проблема сохранялась до тех пор, пока я не заменил следующее соединение

X left outer join Y on array_contains(X.ids, Y.id)

на

val sqlDF = spark.sql("select * from X lateral view explode(ids) t as id")
sqlDF.createOrReplaceTempView("X_exploded")
...
X_exploded left outer join Y on X_exploded.id = Y.id

Я использую среду управления AWS Glue и нене имеют доступа к плану запросов.Однако мне любопытно, почему объединение в массиве array_contains может привести к тому, что в драйвер будет доставлено больше данных, чем взорвется и будет использовано точное совпадение?

Обратите внимание, что таблица X содержит 350 КБ данных в формате json / gzip, а таблица Y содержитоколо 50 ГБ json / zip.

Спасибо!

Ответы [ 2 ]

0 голосов
/ 28 января 2019

Вы можете использовать командную строку, --conf spark.driver.maxResultSize = 4g, чтобы увеличить максимальный размер результата.

0 голосов
/ 28 января 2019

Похоже, что ваш предыдущий подход возвращает все значения из Y, если функция array_contains возвращает true.

В последующем подходе Explode создает новую строку для каждого элемента и, таким образом, устраняет любые дубликаты и в конечном итоге уменьшает количество возвращаемых строк.

...