Извлечь новый столбец из потока - PullRequest
0 голосов
/ 07 ноября 2018

Я читаю данные из Kafka с использованием Spark Structured Streaming и пытаюсь создать новый столбец на основе содержимого списка.

Я моделирую свои данные так:

case class Product(properties: List[Property])
case class Property(code: String, value: String)

И я читаю так:

spark
  .readStream
  .load()
  .select($"value".cast("STRING").as("value"))
  .select(from_json($"value", schema).as("product").as[Product])
  .withColumn("articleType", when(array_contains($"properties.code", "ATY"), $"properties.value")

Этот подход создает новый столбец с именем articleType, содержащий все значения свойств, когда присутствует ATY, но я только хочу, чтобы значение ATY было в столбце.

В основном я хотел бы сделать что-то вроде этого

properties.filter(_.code == "ATY").map(_.value)

Я довольно новичок в Spark, поэтому, возможно, это неправильный подход, но любые указатели были бы полезны.

1 Ответ

0 голосов
/ 14 ноября 2018

удалось решить это с помощью udf.

val getArticleType = udf((properties: Seq[Row]) => {
  properties.size.toString
  properties.find(_.getString(2) == "ATY").map(_.getString(1))
}, StringType)

И используйте это так:

.withColumn("articleType", getArticleType(col("properties")))
...