Я хотел бы сохранить (parquet
) каждое значение потока в определенном c каталоге, путь которого задается ключом. Должно быть меньше пяти разных ключей, столько разных каталогов.
Поскольку я не нашел ни одного примера того, что хочу сделать, я попробовал следующий подход: filter()
поток по каждому ключу, найденному внутри ; со следующим кодом:
stream
.foreachRDD{ (rdd, time) =>
import spark.implicits._
if (rdd.take(1).length != 0) {
val directories= rdd.map(_._1).distinct().collect()
lDates.foreach { directory =>
rdd
.filter(_._1==directory).map(_._2)
.toDF()
.write.format("parquet").mode("append").save(directory)
}
}
}
Но barbari c collect()
серьезно сказывается на вычислениях и приводит к некоторым scheduling delay
...
Будет ли у кого-нибудь лучший способ реализовать это или повысить производительность?
РЕДАКТИРОВАТЬ: У меня нет доступа к структурированной потоковой передаче.