У меня есть RDD (RDD [(String, Iterable [Event])], ключ которого представляет месяц в году, а значения - это миллионы событий, произошедших за этот месяц.
Я хочу пройтись по каждому ключу и создать RDD для событий ключа. Затем я хочу создать RDD для каждого дня событий текущего месяца, чтобы я мог отправить их в соответствующее местоположение s3 (структура «directory»bucketName / year / month / day).
Проблема в том, что кажется, что вы не можете создавать RDD внутри foreach другого RDD. Поэтому я не уверен, как достичь того, что я хочу, без необходимости загружать весь основнойСДР в память (которая наверняка вытеснит память водителя и, в первую очередь, лишит смысла использовать Spark).
Может быть, есть способ достичь того, чего я хочу, используя Spark, я просто неЯ знал это и надеялся, что кто-то здесь может помочь.
Вот код, который у меня есть на данный момент:
private def store(
eventsByMonth: RDD[(String, Iterable[Event])]
)(
implicit sqlContext: SQLContext
): Try[Unit] =
Try(
eventsByMonth
.foreach {
case (_, events: Iterable[Event]) =>
writeToS3Files(sqlContext.sparkContext.parallelize(events.toSeq))
}
)
private def writeToS3Files(events: RDD[Event])(
implicit sqlContext: SQLContext
): Try[Unit] =
Try(
// outputFilePath will contain the day that these events are related to.
events.groupBy(_.outputFilePath).foreach {
case (filePath: String, eventsForFile: Iterable[Event]) =>
writeToS3File(filePath, sqlContext.sparkContext.parallelize(eventsForFile.toSeq))
}
)
private def writeToS3File(filePath: String, events: RDD[Event]): Try[Unit] = {
val fileNameWithPath = s"${filePath}${UUID.randomUUID().toString}.gz"
Try(events.saveAsTextFile(fileNameWithPath, classOf[GzipCodec]))
}