pyspark - чтение искаженного файла .gz - PullRequest
0 голосов
/ 30 августа 2018

Я читаю сжатый файл .gz в pyspark на EMR. Но файл имеет неправильный формат (это файл json с различным количеством столбцов в каждой строке) и получает следующее исключение. Может ли кто-нибудь дать подсказки о том, как прочитать неверно сформированный файл gz в pyspark?

Код:

rdd = sc.textFile("s3n://abc/us/web-logs/2018/08/29/00/0000.gz");
df = rdd.toDF();
print df.printSchema();
df = rdd.toDF();

Ошибка:

>     > Traceback (most recent call last):   File "<stdin>", line 1, in
>     > <module>   File "/usr/lib/spark/python/pyspark/sql/session.py", line
>     > 58, in toDF
>     >     return sparkSession.createDataFrame(self, schema, sampleRatio)   File "/usr/lib/spark/python/pyspark/sql/session.py",
> line 687, in
>     > createDataFrame
>     >     rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)   File "/usr/lib/spark/python/pyspark/sql/session.py",
>     > line 384, in _createFromRDD
>     >     struct = self._inferSchema(rdd, samplingRatio, names=schema)   File "/usr/lib/spark/python/pyspark/sql/session.py", line 355, in
>     > _inferSchema
>     >     first = rdd.first()   File "/usr/lib/spark/python/pyspark/rdd.py", line 1376, in first
>     >     rs = self.take(1)   File "/usr/lib/spark/python/pyspark/rdd.py", line 1328, in take
>     >     totalParts = self.getNumPartitions()   File "/usr/lib/spark/python/pyspark/rdd.py", line 2455, in getNumPartitions
>     >     return self._prev_jrdd.partitions().size()   File "/usr/lib/spark/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py",
>     > line 1160, in __call__   File
>     > "/usr/lib/spark/python/pyspark/sql/utils.py", line 63, in deco
>     >     return f(*a, **kw)   File "/usr/lib/spark/python/lib/py4j-0.10.6-src.zip/py4j/protocol.py", line
>     > 324, in get_return_value py4j.protocol.Py4JError: An error occurred
>     > while calling o81.partitions. Trace:
>     > com.amazon.ws.emr.hadoop.fs.shaded.com.google.inject.internal.util.$ComputationException:
>     > java.lang.ArrayIndexOutOfBoundsException: 16227     at
>     > com.amazon.ws.emr.hadoop.fs.shaded.com.google.inject.internal.util.$MapMaker$StrategyImpl.compute(MapMaker.java:553)
>     >     at
>     > com.amazon.ws.emr.hadoop.fs.shaded.com.google.inject.internal.util.$MapMaker$StrategyImpl.compute(MapMaker.java:419)
>     >     at
>     > com.amazon.ws.emr.hadoop.fs.shaded.com.google.inject.internal.util.$CustomConcurrentHashMap$ComputingImpl.get(CustomConcurrentHashMap.java:2041)
>     >     at
>     > com.amazon.ws.emr.hadoop.fs.shaded.com.google.inject.internal.util.$StackTraceElements.forMember(StackTraceElements.java:53)
>     >     at
>     > com.amazon.ws.emr.hadoop.fs.shaded.com.google.inject.internal.Errors.formatSource(Errors.java:690)
>     >     at
>     > com.amazon.ws.emr.hadoop.fs.shaded.com.google.inject.internal.Errors.format(Errors.java:555)
>     >     at
>     > com.amazon.ws.emr.hadoop.fs.shaded.com.google.inject.ProvisionException.getMessage(ProvisionException.java:59)
>     >     at java.lang.Throwable.getLocalizedMessage(Throwable.java:391)  at
>     > java.lang.Throwable.toString(Throwable.java:480)    at
>     > java.lang.Throwable.<init>(Throwable.java:311)  at
>     > java.lang.Exception.<init>(Exception.java:102)  at
>     > java.lang.RuntimeException.<init>(RuntimeException.java:96)     at
>     > py4j.Py4JException.<init>(Py4JException.java:56)    at
>     > py4j.Py4JJavaException.<init>(Py4JJavaException.java:59)    at
>     > py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:251)    at
>     > py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)  at
>     > py4j.Gateway.invoke(Gateway.java:282)   at
>     > py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>     >     at py4j.commands.CallCommand.execute(CallCommand.java:79)   at
>     > py4j.GatewayConnection.run(GatewayConnection.java:214)  at
>     > java.lang.Thread.run(Thread.java:748) Caused by:
>     > java.lang.ArrayIndexOutOfBoundsException: 16227     at
>     > com.amazon.ws.emr.hadoop.fs.shaded.com.google.inject.internal.asm.$ClassReader.readClass(Unknown
>     > Source)     at
>     > com.amazon.ws.emr.hadoop.fs.shaded.com.google.inject.internal.asm.$ClassReader.accept(Unknown
>     > Source)     at
>     > com.amazon.ws.emr.hadoop.fs.shaded.com.google.inject.internal.asm.$ClassReader.accept(Unknown
>     > Source)     at
>     > com.amazon.ws.emr.hadoop.fs.shaded.com.google.inject.internal.util.$LineNumbers.<init>(LineNumbers.java:62)
>     >     at
>     > com.amazon.ws.emr.hadoop.fs.shaded.com.google.inject.internal.util.$StackTraceElements$1.apply(StackTraceElements.java:36)
>     >     at
>     > com.amazon.ws.emr.hadoop.fs.shaded.com.google.inject.internal.util.$StackTraceElements$1.apply(StackTraceElements.java:33)
>     >     at
>     > com.amazon.ws.emr.hadoop.fs.shaded.com.google.inject.internal.util.$MapMaker$StrategyImpl.compute(MapMaker.java:549)
>     >     ... 20 more
...