Создание СДР из основного СДР - PullRequest
0 голосов
/ 20 марта 2019

У меня есть RDD (RDD [(String, Iterable [Event])], ключ которого представляет месяц в году, а значения - это миллионы событий, произошедших за этот месяц.

Я хочу пройтись по каждому ключу и создать RDD для событий ключа. Затем я хочу создать RDD для каждого дня событий текущего месяца, чтобы я мог отправить их в соответствующее местоположение s3 (структура «directory»bucketName / year / month / day).

Проблема в том, что кажется, что вы не можете создавать RDD внутри foreach другого RDD. Поэтому я не уверен, как достичь того, что я хочу, без необходимости загружать весь основнойСДР в память (которая наверняка вытеснит память водителя и, в первую очередь, лишит смысла использовать Spark).

Может быть, есть способ достичь того, чего я хочу, используя Spark, я просто неЯ знал это и надеялся, что кто-то здесь может помочь.

Вот код, который у меня есть на данный момент:

 private def store(
    eventsByMonth: RDD[(String, Iterable[Event])]
  )(
    implicit sqlContext: SQLContext
  ): Try[Unit] =
    Try(
      eventsByMonth
        .foreach {
          case (_, events: Iterable[Event]) =>
            writeToS3Files(sqlContext.sparkContext.parallelize(events.toSeq))
        }
    )

  private def writeToS3Files(events: RDD[Event])(
    implicit sqlContext: SQLContext
  ): Try[Unit] =
    Try(
      // outputFilePath will contain the day that these events are related to.
      events.groupBy(_.outputFilePath).foreach {
        case (filePath: String, eventsForFile: Iterable[Event]) =>
          writeToS3File(filePath, sqlContext.sparkContext.parallelize(eventsForFile.toSeq))
      }
    )

  private def writeToS3File(filePath: String, events: RDD[Event]): Try[Unit] = {
    val fileNameWithPath = s"${filePath}${UUID.randomUUID().toString}.gz"

    Try(events.saveAsTextFile(fileNameWithPath, classOf[GzipCodec]))
  }

1 Ответ

1 голос
/ 21 марта 2019

Я предполагаю, что есть какой-то способ определить день в месяце, когда происходит событие (например, день (типа Int) является свойством события).

Вы можете преобразовать RDD [(String, Iterable [Event]] в PairRDD [(K, V)], в котором ключ (K) - это месяц и день месяца, когда происходят события, и значения (V) Все события происходят в этот день месяца. После этого вы можете легко выгружать данные в базы данных.

val eventsByMonthAndDate = eventsByMonth.flatMap { case (month, events) => events.map(e => ((month, e.day), e)) }
eventsByMonthAndDate.groupby(_._1).foreach(writeToDB)
...