Чтение данных из S3 с использованием pyspark throws java .lang.NumberFormatException: для входной строки: "100M" - PullRequest
1 голос
/ 11 февраля 2020

Я использую следующий код для чтения некоторых json данных из S3:

df = spark_sql_context.read.json("s3a://test_bucket/test.json")
df.show()

Приведенный выше код выдает следующее исключение:

py4j.protocol.Py4JJavaError: An error occurred while calling o64.json.
: java.lang.NumberFormatException: For input string: "100M"
    at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
    at java.lang.Long.parseLong(Long.java:589)
    at java.lang.Long.parseLong(Long.java:631)
    at org.apache.hadoop.conf.Configuration.getLong(Configuration.java:1538)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:248)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3303)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3352)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3320)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:479)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:361)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:547)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:545)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
    at scala.collection.immutable.List.flatMap(List.scala:355)
    at org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary(DataSource.scala:545)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:359)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
    at org.apache.spark.sql.DataFrameReader.json(DataFrameReader.scala:391)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)

Я прочитал несколько других ТАК сообщения на эту топи c (как этот или этот ) и сделали все, что они упомянули, но, кажется, ничто не решает мою проблему.

Я используя spark-2.4.4-bin-without-hadoop и hadoop-3.1.2. Что касается файлов JAR, у меня есть:

  • aws - java -sdk-bundle-1.11.199.jar
  • has oop - aws -3.0.0.jar
  • has oop -common-3.0.0.jar

Кроме того, для запуска кода используйте следующую команду spark-submit:

/opt/spark-2.4.4-bin-without-hadoop/bin/spark-submit 
--conf spark.app.name=read_json --master yarn --deploy-mode client --num-executors 2 
--executor-cores 2 --executor-memory 2G --driver-cores 2 --driver-memory 1G 
--jars /home/my_project/jars/aws-java-sdk-bundle-1.11.199.jar,
/home/my_project/jars/hadoop-aws-3.0.0.jar,/home/my_project/jars/hadoop-common-3.0.0.jar 
--conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" --conf "spark.rpc.askTimeout=600s" /home/my_project/read_json.py

Что-нибудь, чего мне здесь не хватает?

Ответы [ 2 ]

1 голос
/ 11 февраля 2020

Из трассировки стека выдается ошибка, когда он пытается прочитать один из параметров конфигурации, поэтому проблема связана с одним из параметров конфигурации по умолчанию, который теперь требует формат чисел c.

В моем Если ошибка была устранена после добавления следующего параметра конфигурации в команду spark-submit:

--conf fs.s3a.multipart.size=104857600

См. Настройка загрузки S3A .

0 голосов
/ 12 февраля 2020

Я публикую то, что в итоге сделал, чтобы устранить проблему для тех, кто может увидеть такое же исключение:

Я добавил hadoop-aws к HADOOP_OPTIONAL_TOOLS в has oop -env. sh , Я также удалил все конфигурации в спарк для s3a, кроме доступа / секрета, и все работало. Мой код до изменений:

# Setup the Spark Process
conf = SparkConf() \
       .setAppName(app_name) \
       .set("spark.hadoop.mapred.output.compress", "true") \
       .set("spark.hadoop.mapred.output.compression.codec", "true") \
       .set("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec") \
       .set("spark.hadoop.mapred.output.compression.`type", "BLOCK") \
       .set("spark.speculation", "false")\
       .set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.BasicAWSCredentialsProvider")\
       .set("com.amazonaws.services.s3.enableV4", "true")

# Some other configs

spark_context._jsc.hadoopConfiguration().set(
            "fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem"
)

spark_context._jsc.hadoopConfiguration().set(
            "fs.s3a.access.key", s3_key
)

spark_context._jsc.hadoopConfiguration().set(
            "fs.s3a.secret.key", s3_secret
)

spark_context._jsc.hadoopConfiguration().set(
            "fs.s3a.multipart.size", "104857600"
)

И после:

# Setup the Spark Process
conf = SparkConf() \
       .setAppName(app_name) \
       .set("spark.hadoop.mapred.output.compress", "true") \
       .set("spark.hadoop.mapred.output.compression.codec", "true") \
       .set("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec") \
       .set("spark.hadoop.mapred.output.compression.`type", "BLOCK") \
       .set("spark.speculation", "false")

# Some other configs

spark_context._jsc.hadoopConfiguration().set(
            "fs.s3a.access.key", s3_key
)

spark_context._jsc.hadoopConfiguration().set(
            "fs.s3a.secret.key", s3_secret
)

Это, вероятно, означает, что это была проблема пути к классу. hadoop-aws не добавлялся к пути к классам, и поэтому под прикрытием он по умолчанию использовался для какой-либо другой реализации S3AFileSystem.java. * oop и искра - огромная боль в этой области, потому что есть так много разных мест и способов загрузки вещей, и java также касается порядка, потому что, если это не произойдет в правильном порядке, это будет просто go с тем, что было загружено последним. Надеюсь, что это поможет другим, сталкивающимся с той же проблемой.

...