Привет, у меня есть схема, которая выглядит следующим образом
|-- eventObject: struct (nullable = true)
| |-- baseDivisionCode: string (nullable = true)
| |-- countryCode: string (nullable = true)
| |-- dcNumber: long (nullable = true)
| |-- financialReportingGroup: string (nullable = true)
| |-- itemList: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- availabletosellQty: long (nullable = true)
| | | |-- distroAvailableQty: long (nullable = true)
| | | |-- itemNumber: long (nullable = true)
| | | |-- itemUPC: string (nullable = true)
| | | |-- ossIndicator: string (nullable = true)
| | | |-- turnAvailableQty: long (nullable = true)
| | | |-- unitOfMeasurement: string (nullable = true)
| | | |-- weightFormatType: string (nullable = true)
| | | |-- whpkRatio: long (nullable = true)
, чтобы сопоставить это, я создал следующий тип схемы
|-- eventObject: struct (nullable = true)
| |-- baseDivisionCode: string (nullable = true)
| |-- countryCode: string (nullable = true)
| |-- dcNumber: integer (nullable = true)
| |-- financialReportingGroup: string (nullable = true)
| |-- itemList: struct (nullable = true)
| | |-- availabletosellQty: long (nullable = true)
| | |-- distroAvailableQty: long (nullable = true)
| | |-- itemNumber: long (nullable = true)
| | |-- itemUPC: string (nullable = true)
| | |-- ossIndicator: string (nullable = true)
| | |-- turnAvailableQty: long (nullable = true)
| | |-- unitOfMeasurement: string (nullable = true)
| | |-- weightFormatType: string (nullable = true)
| | |-- whpkRatio: long (nullable = true)
, написав что-то вроде этого
val testSchema = new StructType()
.add("eventObject", new StructType()
.add("baseDivisionCode", StringType)
.add("countryCode",StringType)
.add("dcNumber", IntegerType)
.add("financialReportingGroup",StringType)
.add("itemList",new StructType(
Array(
StructField("availabletosellQty",LongType),
StructField("distroAvailableQty",LongType),
StructField("itemNumber", LongType),
StructField("itemUPC", StringType),
StructField("ossIndicator",StringType),
StructField("turnAvailableQty",LongType),
StructField("unitOfMeasurement",StringType),
StructField("weightFormatType",StringType),
StructField("whpkRatio",LongType)))))
, но это не соответствует схеме, которую я получаю ... что я делаю в этом не так?
я получаю нулевые значения, когда пытаюсь заполнить некоторые данные ...
|-- eventObject: struct (nullable = true)
| |-- baseDivisionCode: string (nullable = true)
| |-- countryCode: string (nullable = true)
| |-- dcNumber: long (nullable = true)
| |-- financialReportingGroup: string (nullable = true)
| |-- itemList: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- itemNumber: long (nullable = true)
| | | |-- itemUPC: string (nullable = true)
| | | |-- unitOfMeasurement: string (nullable = true)
| | | |-- availabletosellQty: long (nullable = true)
| | | |-- turnAvailableQty: long (nullable = true)
| | | |-- distroAvailableQty: long (nullable = true)
| | | |-- ossIndicator: string (nullable = true)
| | | |-- weightFormatType: string (nullable = true)
|-- kafka_timestamp: timestamp (nullable = true)
|-- baseDivisionCode: string (nullable = true)
|-- countryCode: string (nullable = true)
|-- dcNumber: long (nullable = true)
|-- financialReportingGroup: string (nullable = true)
|-- itemList: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- itemNumber: long (nullable = true)
| | |-- itemUPC: string (nullable = true)
| | |-- unitOfMeasurement: string (nullable = true)
| | |-- availabletosellQty: long (nullable = true)
| | |-- turnAvailableQty: long (nullable = true)
| | |-- distroAvailableQty: long (nullable = true)
| | |-- ossIndicator: string (nullable = true)
| | |-- weightFormatType: string (nullable = true)
|-- kafka_timestamp: timestamp (nullable = true)
когда я еще пытаюсь сгладить его, его ошибка вызывает массив "Исключение в потоке" main "org. apache .spark. sql .AnalysisException: может только звездочка расширять типы данных структуры . Атрибут: ArrayBuffer(itemList)
; "
пытаюсь получить его на
|-- facilityCountryCode: string (nullable = true)
|-- facilityNum: string (nullable = true)
|-- WMT_CorrelationId: string (nullable = true)
|-- WMT_IdempotencyKey: string (nullable = true)
|-- WMT_Timestamp: string (nullable = true)
|-- countryCode: string (nullable = true)
|-- dcNumber: integer (nullable = true)
|-- financialReportingGroup: string (nullable = true)
|-- baseDivisionCode: string (nullable = true)
|-- itemNumber: integer (nullable = true)
|-- itemUPC: string (nullable = true)
|-- unitOfMeasurement: string (nullable = true)
|-- availabletosellQty: integer (nullable = true)
|-- turnAvailableQty: integer (nullable = true)
|-- distroAvailableQty: integer (nullable = true)
|-- ossIndicator: string (nullable = true)
|-- weightFormatType: string (nullable = true)
|-- kafka_timestamp: timestamp (nullable = true)
|-- year-month-day: integer (nullable = true)
|-- month: integer (nullable = true)
|-- day: integer (nullable = true)
|-- hour: integer (nullable = true)
вот что я сделал
val testParsed=TestExploded.select($"exploded.*",$"kafka_timestamp")
val testFlattened=testParsed.select($"eventObject.*",$"kafka_timestamp")
val test_flattened_further=testFlattened.select($"countryCode",
$"dcNumber",$"financialReportingGroup",$"baseDivisionCode",**$"itemList.*"**,$"kafka_timestamp")