Почему AWS отклоняет мои подключения, когда я использую wholeTextFiles () с pyspark? - PullRequest
2 голосов
/ 22 мая 2019

Я использую

sc.wholeTextFiles(",".join(fs), minPartitions=200)

для загрузки 6k XML-файлов из S3 (каждый файл 50 МБ) на один узел обработки данных с 96cpus.Когда у меня minPartitions = 200, AWS отклоняет мои подключения, но когда я использую minPartitions = 50, все в порядке.Почему?

Некоторые журналы от Spark:

(...)
19/05/22 14:11:17 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:17 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:26 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:26 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:28 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:28 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:28 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:28 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:28 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:28 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:28 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:28 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:28 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:28 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:28 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:28 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:28 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:28 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:28 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:28 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:28 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:28 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:30 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:30 ERROR org.apache.spark.api.python.PythonRunner: Python worker exited unexpectedly (crashed)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 362, in main
    eval_type = read_int(infile)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 717, in read_int
    raise EOFError
EOFError

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:588)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:571)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1124)
    at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1130)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.InterruptedIOException: getFileStatus on s3a://uni-swim-firehose/tfms/2019/04/03/10/SWIM-TFMS-2-2019-04-03-10-51-52-0fd9f05a-cbc5-4c1c-aef2-aa275ee3c404.gz: com.amazonaws.SdkClientException: Unable to execute HTTP request: Timeout waiting for connection from pool```

1 Ответ

3 голосов
/ 23 мая 2019

com.amazonaws.SdkClientException: Unable to execute HTTP request: Timeout waiting for connection from pool

wholeTextfiles каждый файл имеет отдельное клиентское соединение с s3 в зависимости от количества разделов, которые у вас есть.и по умолчанию это 50 .

Следовательно, у вас нет икоты на 50 патентов.

Если вы попытались увеличить до 200, вы получили указанное выше исключение.

Решение:

см. Документацию amazon: Как устранить ошибку «Тайм-аут ожидания подключения из пула» в Amazon EMR?

fs.s3.maxConnections в файле конфигурации emrfs-site.xml.По умолчанию он равен 50.

, так как вы используете s3a с искрой, вы можете попробовать подключиться ниже максимума, как 200 , как показано в примере.


путь питона:

def create_spark_session(aws_access_key, aws_secret_key, app_name):
    try:

        spark = SparkSession.builder. \
            config("fs.s3a.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem"). \
            config("fs.s3a.awsAccessKeyId", aws_access_key). \
            config("fs.s3a.awsSecretAccessKey", aws_secret_key). \
            config("fs.s3a.fast.upload", "true"). \
            config("fs.s3a.multipart.size", "1G"). \
            config("fs.s3a.fast.upload.buffer", "disk"). \
            config("fs.s3a.connection.maximum", 200. \
            config("fs.s3a.attempts.maximum", 20). \
            config("fs.s3a.connection.timeout", 30). \
            config("fs.s3a.threads.max", 10). \
            config("fs.s3a.buffer.dir", "hdfs:///user/hadoop/temporary/s3a"). \
            appName(app_name). \
            getOrCreate()

        return spark
    except Exception as e:
        logging.error(e)
        sys.exit(-1)

Для пользователей Scala:

/**
      * example getSparkSessionForS3
      * @return
      */
    def getSparkSessionForS3():SparkSession = {
  val conf = new SparkConf()
    .setAppName("testS3File")
    .set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    .set("spark.hadoop.fs.s3a.endpoint", "yourendpoint")
    .set("spark.hadoop.fs.s3a.connection.maximum", "200")
    .set("spark.hadoop.fs.s3a.fast.upload", "true")
    .set("spark.hadoop.fs.s3a.connection.establish.timeout", "500")
    .set("spark.hadoop.fs.s3a.connection.timeout", "5000")
    .set("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2")
    .set("spark.hadoop.com.amazonaws.services.s3.enableV4", "true")
    .set("spark.hadoop.com.amazonaws.services.s3.enforceV4", "true")

  val spark = SparkSession
    .builder()
    .config(conf)
    .getOrCreate()
  spark
}

Дополнительная литература:

  1. amazon-s3 - лучшая практика и настройка для hadoopspark-in-the-cloud ---- номер слайда38
  2. https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/#aTimeout_waiting_for_connection_from_pool_when_writing_to_S3A

В # 2 обсуждались все эти исключения

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...