Firehose JSON -> S3 Parquet -> ETL Spark, ошибка: невозможно определить схему для Parquet - PullRequest
0 голосов
/ 26 июня 2018

Кажется, что это должно быть легко, как будто это основной вариант использования этого набора функций, но это было проблемой после проблемы.

Последняя попытка выполнить команды через конечную точку Glue Dev (как конечную точку PySpark, так и Scala).

Следуя инструкциям здесь: https://docs.aws.amazon.com/glue/latest/dg/dev-endpoint-tutorial-repl.html

import sys
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.transforms import *
glueContext = GlueContext(SparkContext.getOrCreate())
df = glueContext.create_dynamic_frame.from_catalog(database="mydb", table_name="mytable")

генерирует эту ошибку:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/mnt/tmp/spark-0ba544c9-0b5a-42bb-8a54-9b3138205179/userFiles-95708c09-59f6-4b6d-9016-d0375d511b7a/PyGlue.zip/awsglue/dynamicframe.py", line 557, in from_catalog
  File "/mnt/tmp/spark-0ba544c9-0b5a-42bb-8a54-9b3138205179/userFiles-95708c09-59f6-4b6d-9016-d0375d511b7a/PyGlue.zip/awsglue/context.py", line 136, in create_dynamic_frame_from_catalog
  File "/mnt/tmp/spark-0ba544c9-0b5a-42bb-8a54-9b3138205179/userFiles-95708c09-59f6-4b6d-9016-d0375d511b7a/PyGlue.zip/awsglue/data_source.py", line 36, in getFrame
  File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 69, in deco
    raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: u'Unable to infer schema for Parquet. It must be specified manually.;'

Он также генерирует это предупреждение в одной из строк настройки:

18/06/26 19:09:35 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.

Общая настройка довольно проста: у нас есть входящий поток данных Kinesis, процессор для этого потока, который создает поток данных JSON Kinesis, поток пожарных шлангов Kinesis, настроенный для записи этого потока JSON в файлы Parquet в S3, а затем для этого необходимы конфигурации каталога клея.

Афина может видеть данные очень хорошо, но ошибка скриптов Scala / PySpark.

Есть идеи / предложения?

1 Ответ

0 голосов
/ 27 июня 2018

Хорошо, до сих пор не ясно, почему это происходит, но нашел исправление!

По сути, вместо использования сгенерированного кода:

val datasource0 = glueContext.getCatalogSource(
        database = "db",
        tableName = "myTable",
        redshiftTmpDir = "",
        transformationContext = "datasource0"
    ).getDynamicFrame()

используйте этот код:

val crawledData = glueContext.getSourceWithFormat(
        connectionType = "s3",
        options = JsonOptions(s"""{"path": "s3://mybucket/mytable/*/*/*/*/"}"""),
        format = "parquet",
        transformationContext = "source"
    ).getDynamicFrame()

Ключевым битом здесь, похоже, был */*/*/*/ - если бы я просто указал корневую папку, я получил бы ошибку Parquet, и (по-видимому) обычный подстановочный знак /**/* не сработал бы.

...