Столбец разделов исчезает в результирующем наборе данных Spark - PullRequest
0 голосов
/ 30 мая 2019

Я попытался разделить фрейм данных Spark по столбцу отметки времени update_database_time и записать его в HDFS с определенной схемой Avro. Однако после вызова метода перераспределения я получаю следующее исключение:

Caused by: org.apache.spark.sql.avro.IncompatibleSchemaException: Cannot convert Catalyst type StructType(StructField(random_pk,DecimalType(38,0),true), StructField(random_string,StringType,true), StructField(code,StringType,true), StructField(random_bool,BooleanType,true), StructField(random_int,IntegerType,true), StructField(random_float,DoubleType,true), StructField(random_double,DoubleType,true), StructField(random_enum,StringType,true), StructField(random_date,DateType,true), StructField(random_decimal,DecimalType(4,2),true), StructField(update_database_time_tz,TimestampType,true), StructField(random_money,DecimalType(19,4),true)) to Avro type {"type":"record","name":"TestData","namespace":"DWH","fields":[{"name":"random_pk","type":["null",{"type":"bytes","logicalType":"decimal","precision":38,"scale":0}]},{"name":"random_string","type":["string","null"]},{"name":"code","type":["string","null"]},{"name":"random_bool","type":["boolean","null"]},{"name":"random_int","type":["int","null"]},{"name":"random_float","type":["double","null"]},{"name":"random_double","type":["double","null"]},{"name":"random_enum","type":["null",{"type":"enum","name":"enumType","symbols":["VAL_1","VAL_2","VAL_3"]}]},{"name":"random_date","type":["null",{"type":"int","logicalType":"date"}]},{"name":"random_decimal","type":["null",{"type":"bytes","logicalType":"decimal","precision":4,"scale":2}]},{"name":"update_database_time","type":["null",{"type":"long","logicalType":"timestamp-millis"}]},{"name":"update_database_time_tz","type":["null",{"type":"long","logicalType":"timestamp-millis"}]},{"name":"random_money","type":["null",{"type":"bytes","logicalType":"decimal","precision":19,"scale":4}]}]}.

Я предполагаю, что столбец для разделения исчезает в результате. Как я могу переопределить операцию, чтобы она не произошла?

Вот код, который я использую:

    dataDF.write
      .partitionBy("update_database_time")
      .format("avro")
      .option(
        "avroSchema",
        SchemaRegistry.getSchema(
          schemaRegistryConfig.url,
          schemaRegistryConfig.dataSchemaSubject,
          schemaRegistryConfig.dataSchemaVersion))
  .save(s"${hdfsURL}${pathToSave}")

1 Ответ

1 голос
/ 31 мая 2019

Исключением, которое вы предоставили, кажется, что ошибка возникает из-за несовместимых схем между извлеченной схемой AVRO и схемой Spark. Если взглянуть бегло, то, вероятно, наиболее тревожными являются следующие:

  1. (возможно, катализатор не знает, как преобразовать строку в enumType)

Схема искры:

StructField(random_enum,StringType,true)

Схема AVRO:

{
      "name": "random_enum",
      "type": [
        "null",
        {
          "type": "enum",
          "name": "enumType",
          "symbols": [
            "VAL_1",
            "VAL_2",
            "VAL_3"
          ]
        }
      ]
    }
  1. (update_databse_time_tz появляется только один раз в схеме кадра данных, но дважды в схеме AVRO)

Схема искры:

StructField(update_database_time_tz,TimestampType,true)

Схема AVRO:

{
      "name": "update_database_time",
      "type": [
        "null",
        {
          "type": "long",
          "logicalType": "timestamp-millis"
        }
      ]
    },
    {
      "name": "update_database_time_tz",
      "type": [
        "null",
        {
          "type": "long",
          "logicalType": "timestamp-millis"
        }
      ]
    }

Я бы предложил сначала объединить схемы и избавиться от этого исключения, прежде чем углубляться в другие возможные проблемы с разбиением.

РЕДАКТИРОВАТЬ: Что касается номера 2, я упустил лицо, что в схеме AVRO есть разные имена, что приводит к проблеме пропуска столбца update_database_time в кадре данных.

...