pyspark на emr с boto3, копия результата объекта s3 с тайм-аутом фьючерса после [100000 миллисекунд] - PullRequest
0 голосов
/ 09 октября 2019

У меня есть приложение pyspark, которое преобразует csv в паркет, и перед этим я копирую некоторый объект S3 из корзины в другое.

pyspark со свечой 2.4, emr 5.27, maximizeResourceAllocation установлен в true

У меня есть различные размеры файлов CSV, от 80 КБ до 500 МБ.

Тем не менее, мой кластер EMR (он не выходит из строя на локальном с помощью spark-submit) завершается с ошибкой на 70% для файла, которыйсоставляет 166 МБ (предыдущий на 480 МБ успешно).

Работа проста:

def organise_adwords_csv():
    s3 = boto3.resource('s3')
    bucket = s3.Bucket(S3_ORIGIN_RAW_BUCKET)
    for obj in bucket.objects.filter(Prefix=S3_ORIGIN_ADWORDS_RAW + "/"):
        key = obj.key

        copy_source = {
            'Bucket': S3_ORIGIN_RAW_BUCKET,
            'Key': key
        }

        key_tab = obj.key.split("/")
        if len(key_tab) < 5:
            print("continuing from length", obj)
            continue
        file_name = ''.join(key_tab[len(key_tab)-1:len(key_tab)])
        if file_name == '':
            print("continuing", obj)
            continue

        table = file_name.split("_")[1].replace("-", "_")
        new_path = "{0}/{1}/{2}".format(S3_DESTINATION_ORDERED_ADWORDS_RAW_PATH, table, file_name)
        print("new_path", new_path) <- the last print will end here
        try:
            s3.meta.client.copy(copy_source, S3_DESTINATION_RAW_BUCKET, new_path)
            print("copy done")
        except Exception as e:
            print(e)
            print("an exception occured while copying")

if __name__=='__main__':
    organise_adwords_csv()

    print("copy Final done") <- never printed

    spark = SparkSession.builder.appName("adwords_transform") \
    ...

, но в стандартном выводе ошибки / исключения не отображаются.

ВЖурналы stderr:

19/10/09 16:16:57 INFO ApplicationMaster: Waiting for spark context initialization...
19/10/09 16:18:37 ERROR ApplicationMaster: Uncaught exception: 
java.util.concurrent.TimeoutException: Futures timed out after [100000 milliseconds]
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:223)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:227)
    at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:220)
    at org.apache.spark.deploy.yarn.ApplicationMaster.runDriver(ApplicationMaster.scala:468)
    at org.apache.spark.deploy.yarn.ApplicationMaster.org$apache$spark$deploy$yarn$ApplicationMaster$$runImpl(ApplicationMaster.scala:305)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply$mcV$sp(ApplicationMaster.scala:245)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply(ApplicationMaster.scala:245)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply(ApplicationMaster.scala:245)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$3.run(ApplicationMaster.scala:779)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
    at org.apache.spark.deploy.yarn.ApplicationMaster.doAsUser(ApplicationMaster.scala:778)
    at org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:244)
    at org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:803)
    at org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala)
19/10/09 16:18:37 INFO ApplicationMaster: Final app status: FAILED, exitCode: 13, (reason: Uncaught exception: java.util.concurrent.TimeoutException: Futures timed out after [100000 milliseconds]
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:223)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:227)
    at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:220)
    at org.apache.spark.deploy.yarn.ApplicationMaster.runDriver(ApplicationMaster.scala:468)
    at org.apache.spark.deploy.yarn.ApplicationMaster.org$apache$spark$deploy$yarn$ApplicationMaster$$runImpl(ApplicationMaster.scala:305)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply$mcV$sp(ApplicationMaster.scala:245)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply(ApplicationMaster.scala:245)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply(ApplicationMaster.scala:245)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$3.run(ApplicationMaster.scala:779)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
    at org.apache.spark.deploy.yarn.ApplicationMaster.doAsUser(ApplicationMaster.scala:778)
    at org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:244)
    at org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:803)
    at org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala)
)
19/10/09 16:18:37 INFO ShutdownHookManager: Shutdown hook called

Я полностью слепой, я не понимаю, что терпит неудачу / почему. Как я могу понять это? На локальном он работает как шарм (но, конечно, очень медленно)

Редактировать: После многих попыток я могу подтвердить, что функция:

s3.meta.client.copy(copy_source, S3_DESTINATION_RAW_BUCKET, new_path)

делает EMRТайм-аут кластера, даже если он уже обработал 80% файлов. У кого-нибудь есть рекомендации по этому поводу?

1 Ответ

0 голосов
/ 10 октября 2019
s3.meta.client.copy(copy_source, S3_DESTINATION_RAW_BUCKET, new_path)

Это не удастся для любого исходного объекта размером более 5 ГБ. пожалуйста, используйте многочастную загрузку в AWS. Смотри https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#multipartupload

...