У меня есть миллионы файлов изображений, сохраненных в GCS. Когда я пытаюсь получить к ним доступ с помощью sc.binaryFiles(...)
в pyspark, узел драйвера перегружен: все четыре ядра ЦП работают с максимальной нагрузкой 100%, а память - с 16 ГБ. Однако все рабочие узлы простаивают: в основном, загрузка ЦП 0% и минимальное влияние на память.
Мне удалось свести проблему к следующим строчкам:
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
# Spark boilerplate
sc = SparkContext.getOrCreate()
spark = SparkSession.builder.getOrCreate()
images_rdd = sc.binaryFiles('gs://my_bucket/')
images_rdd.count()
I начать pyspark, как это. У меня 52 машины по 4 ядра в каждой. Я пробовал то же самое с 52 исполнителями, и я получаю тот же эффект, поэтому я думаю, что проблема связана с GCS или binaryFiles(...)
.
pyspark --num-executors 104 --executor-cores 4 --py-files ~/dependencies.zip,~/model.h5
В конечном итоге pyspark умирает с этим сообщением:
: java.io.IOException: Failed to listFileInfo for 'gs://my_bucket/'
at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.listFileInfo(GoogleCloudStorageFileSystem.java:1095)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.listStatus(GoogleHadoopFileSystemBase.java:999)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1804)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1849)
at org.apache.hadoop.fs.FileSystem$4.<init>(FileSystem.java:2014)
at org.apache.hadoop.fs.FileSystem.listLocatedStatus(FileSystem.java:2013)
at org.apache.hadoop.fs.FileSystem.listLocatedStatus(FileSystem.java:1996)
at org.apache.hadoop.mapred.LocatedFileStatusFetcher$ProcessInputDirCallable.call(LocatedFileStatusFetcher.java:226)
at org.apache.hadoop.mapred.LocatedFileStatusFetcher$ProcessInputDirCallable.call(LocatedFileStatusFetcher.java:203)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: java.lang.OutOfMemoryError: GC overhead limit exceeded
at com.google.cloud.hadoop.repackaged.gcs.com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:552)
at com.google.cloud.hadoop.repackaged.gcs.com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:513)
at com.google.cloud.hadoop.repackaged.gcs.com.google.common.util.concurrent.FluentFuture$TrustedFuture.get(FluentFuture.java:82)
at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.util.LazyExecutorService$ExecutingFutureImpl$Delegated.get(LazyExecutorService.java:529)
at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.util.LazyExecutorService$ExecutingFutureImpl$Created.get(LazyExecutorService.java:420)
at com.google.cloud.hadoop.repackaged.gcs.com.google.common.util.concurrent.ForwardingFuture.get(ForwardingFuture.java:62)
at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.listFileInfo(GoogleCloudStorageFileSystem.java:1075)
... 12 more
Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
Почему Spark делает драйвер узким местом и не распределяет какую-либо работу на своих рабочих машинах? Я просто делаю простой подсчет, и я ничего не возвращаю водителю. Я не понимаю, как это может иметь проблемы с производительностью.
Я использую Spark версии 2.4.4.
Обновление
Как проверить, хотя это не помогло бы моему коду, я попробовал другой метод, который читает из GCS, и вижу те же проблемы.
images_rdd = sc.wholeTextFiles('gs://my_bucket/')
Возможно ли, что интеграция Spark с GCS включает в себя отправку всех имена файлов обратно на узел драйвера перед загрузкой их рабочим?