Я хотел бы разбить СДР на последовательности элементов, разделенных разделителем.
Скажи, у меня есть этот СДР:
val data = Array(1, 2, 0, 4, 5, 0, 4, 5, 6, 1, 2, 0, 4)
val distData = sc.parallelize(data)
Я хотел бы преобразовать его в следующий СДР:
(Seq(1, 2), Seq(4, 5), Seq(4, 5, 6, 1, 2), Seq(4))
И я хотел бы сохранить «лень» этого СДР - вот и все, я хочу иметь возможность взять (n) без оценки всех данных.
Мне удалось решить эту проблему с помощью аккумулятора и groupByKey, но, к сожалению, это не сохранит лени, потому что groupByKey необходимо оценить весь RDD.
val seqId = sc.longAccumulator("Sequance id")
distData
.flatMap { e =>
if (e == 0) {
seqId.add(1)
None
} else
Some(seqId.value, e)
}
.groupByKey()
.map { case(id, e) => e.toSeq }
Есть идеи, как решить эту проблему ленивым образом?