как создать и сопоставить схему в scala - PullRequest
1 голос
/ 08 мая 2020

Привет, у меня есть схема, которая выглядит следующим образом

|-- eventObject: struct (nullable = true)
|    |-- baseDivisionCode: string (nullable = true)
|    |-- countryCode: string (nullable = true)
|    |-- dcNumber: long (nullable = true)
|    |-- financialReportingGroup: string (nullable = true)
|    |-- itemList: array (nullable = true)
|    |    |-- element: struct (containsNull = true)
|    |    |    |-- availabletosellQty: long (nullable = true)
|    |    |    |-- distroAvailableQty: long (nullable = true)
|    |    |    |-- itemNumber: long (nullable = true)
|    |    |    |-- itemUPC: string (nullable = true)
|    |    |    |-- ossIndicator: string (nullable = true)
|    |    |    |-- turnAvailableQty: long (nullable = true)
|    |    |    |-- unitOfMeasurement: string (nullable = true)
|    |    |    |-- weightFormatType: string (nullable = true)
|    |    |    |-- whpkRatio: long (nullable = true)

, чтобы сопоставить это, я создал следующий тип схемы

|-- eventObject: struct (nullable = true)
|    |-- baseDivisionCode: string (nullable = true)
|    |-- countryCode: string (nullable = true)
|    |-- dcNumber: integer (nullable = true)
|    |-- financialReportingGroup: string (nullable = true)
|    |-- itemList: struct (nullable = true)
|    |    |-- availabletosellQty: long (nullable = true)
|    |    |-- distroAvailableQty: long (nullable = true)
|    |    |-- itemNumber: long (nullable = true)
|    |    |-- itemUPC: string (nullable = true)
|    |    |-- ossIndicator: string (nullable = true)
|    |    |-- turnAvailableQty: long (nullable = true)
|    |    |-- unitOfMeasurement: string (nullable = true)
|    |    |-- weightFormatType: string (nullable = true)
|    |    |-- whpkRatio: long (nullable = true)

, написав что-то вроде этого

 val testSchema = new StructType()
  .add("eventObject", new StructType()
    .add("baseDivisionCode", StringType)
    .add("countryCode",StringType)
    .add("dcNumber", IntegerType)
    .add("financialReportingGroup",StringType)

    .add("itemList",new StructType(
      Array(
        StructField("availabletosellQty",LongType),
        StructField("distroAvailableQty",LongType),
        StructField("itemNumber", LongType),
        StructField("itemUPC", StringType),
        StructField("ossIndicator",StringType),
        StructField("turnAvailableQty",LongType),
        StructField("unitOfMeasurement",StringType),
        StructField("weightFormatType",StringType),
        StructField("whpkRatio",LongType)))))

, но это не соответствует схеме, которую я получаю ... что я делаю в этом не так?

я получаю нулевые значения, когда пытаюсь заполнить некоторые данные ...

|-- eventObject: struct (nullable = true)
|    |-- baseDivisionCode: string (nullable = true)
|    |-- countryCode: string (nullable = true)
|    |-- dcNumber: long (nullable = true)
|    |-- financialReportingGroup: string (nullable = true)
|    |-- itemList: array (nullable = true)
|    |    |-- element: struct (containsNull = true)
|    |    |    |-- itemNumber: long (nullable = true)
|    |    |    |-- itemUPC: string (nullable = true)
|    |    |    |-- unitOfMeasurement: string (nullable = true)
|    |    |    |-- availabletosellQty: long (nullable = true)
|    |    |    |-- turnAvailableQty: long (nullable = true)
|    |    |    |-- distroAvailableQty: long (nullable = true)
|    |    |    |-- ossIndicator: string (nullable = true)
|    |    |    |-- weightFormatType: string (nullable = true)
|-- kafka_timestamp: timestamp (nullable = true)

|-- baseDivisionCode: string (nullable = true)
|-- countryCode: string (nullable = true)
|-- dcNumber: long (nullable = true)
|-- financialReportingGroup: string (nullable = true)
|-- itemList: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- itemNumber: long (nullable = true)
|    |    |-- itemUPC: string (nullable = true)
|    |    |-- unitOfMeasurement: string (nullable = true)
|    |    |-- availabletosellQty: long (nullable = true)
|    |    |-- turnAvailableQty: long (nullable = true)
|    |    |-- distroAvailableQty: long (nullable = true)
|    |    |-- ossIndicator: string (nullable = true)
|    |    |-- weightFormatType: string (nullable = true)
|-- kafka_timestamp: timestamp (nullable = true)

когда я еще пытаюсь сгладить его, его ошибка вызывает массив "Исключение в потоке" main "org. apache .spark. sql .AnalysisException: может только звездочка расширять типы данных структуры . Атрибут: ArrayBuffer(itemList); "

пытаюсь получить его на

|-- facilityCountryCode: string (nullable = true)
|-- facilityNum: string (nullable = true)
|-- WMT_CorrelationId: string (nullable = true)
|-- WMT_IdempotencyKey: string (nullable = true)
|-- WMT_Timestamp: string (nullable = true)
|-- countryCode: string (nullable = true)
|-- dcNumber: integer (nullable = true)
|-- financialReportingGroup: string (nullable = true)
|-- baseDivisionCode: string (nullable = true)
|-- itemNumber: integer (nullable = true)
|-- itemUPC: string (nullable = true)
|-- unitOfMeasurement: string (nullable = true)
|-- availabletosellQty: integer (nullable = true)
|-- turnAvailableQty: integer (nullable = true)
|-- distroAvailableQty: integer (nullable = true)
|-- ossIndicator: string (nullable = true)
|-- weightFormatType: string (nullable = true)
|-- kafka_timestamp: timestamp (nullable = true)
|-- year-month-day: integer (nullable = true)
|-- month: integer (nullable = true)
|-- day: integer (nullable = true)
|-- hour: integer (nullable = true)

вот что я сделал

val testParsed=TestExploded.select($"exploded.*",$"kafka_timestamp")

val testFlattened=testParsed.select($"eventObject.*",$"kafka_timestamp")

val test_flattened_further=testFlattened.select($"countryCode",
  $"dcNumber",$"financialReportingGroup",$"baseDivisionCode",**$"itemList.*"**,$"kafka_timestamp")

1 Ответ

0 голосов
/ 08 мая 2020

Используйте ArrayType, чтобы указать тип массива:

val testSchema = new StructType()
    .add("eventObject", new StructType()
    .add("baseDivisionCode", StringType)
    .add("countryCode", StringType)
    .add("dcNumber", LongType)
    .add("financialReportingGroup", StringType)
    .add("itemList", new ArrayType(
      new StructType(
        Array(
          StructField("itemNumber", LongType),
          StructField("itemUPC", StringType),
          StructField("unitOfMeasurement", StringType),
          StructField("availabletosellQty", LongType),
          StructField("turnAvailableQty", LongType),
          StructField("distroAvailableQty", LongType),
          StructField("ossIndicator", StringType),
          StructField("weightFormatType", StringType))), containsNull = true)))

Чтобы полностью сгладить DataFrame, вы можете использовать разнесенный массив структур и переместить тип структуры в столбцы верхнего уровня с помощью синтаксиса select("structColName.*") следующим образом :

df
  .select("eventObject.*")
  .select(
    col("baseDivisionCode"),
    col("countryCode"),
    col("dcNumber"),
    col("financialReportingGroup"),
    explode(col("itemList")).as("explodedItemList"))
  .select(
    col("baseDivisionCode"),
    col("countryCode"),
    col("dcNumber"),
    col("financialReportingGroup"),
    col("explodedItemList.*")
  )
  .printSchema()

Будет выведено:

root
 |-- baseDivisionCode: string (nullable = true)
 |-- countryCode: string (nullable = true)
 |-- dcNumber: long (nullable = true)
 |-- financialReportingGroup: string (nullable = true)
 |-- itemNumber: long (nullable = true)
 |-- itemUPC: string (nullable = true)
 |-- unitOfMeasurement: string (nullable = true)
 |-- availabletosellQty: long (nullable = true)
 |-- turnAvailableQty: long (nullable = true)
 |-- distroAvailableQty: long (nullable = true)
 |-- ossIndicator: string (nullable = true)
 |-- weightFormatType: string (nullable = true)
...