Я читаю данные из Kafka с использованием Spark Structured Streaming и пытаюсь создать новый столбец на основе содержимого списка.
Я моделирую свои данные так:
case class Product(properties: List[Property])
case class Property(code: String, value: String)
И я читаю так:
spark
.readStream
.load()
.select($"value".cast("STRING").as("value"))
.select(from_json($"value", schema).as("product").as[Product])
.withColumn("articleType", when(array_contains($"properties.code", "ATY"), $"properties.value")
Этот подход создает новый столбец с именем articleType, содержащий все значения свойств, когда присутствует ATY, но я только хочу, чтобы значение ATY было в столбце.
В основном я хотел бы сделать что-то вроде этого
properties.filter(_.code == "ATY").map(_.value)
Я довольно новичок в Spark, поэтому, возможно, это неправильный подход, но любые указатели были бы полезны.