Вариант использования хвостовой рекурсии в Scala - PullRequest
0 голосов
/ 02 апреля 2020

Я пытаюсь решить проблему с помощью хвостовой рекурсии. Вариант использования:

У меня есть список папок, и у каждой папки есть список файлов, и у каждого файла есть несколько записей. Я хочу выполнить некоторые преобразования записей и записать их в kinesis в пакетном режиме.

val listOfFolders = Folder1(File1(RF11, RF12, RF13), File2(RF21,RF22))

Я хочу написать, скажем, две записи одновременно в kinesis. До сих пор я пытался:

listOfFolders.map { folder =>
    val files = fetchAllFilesFromFolder(folder)
    if (files.nonEmpty) {
      sendBatch(files, Seq.empty[(ByteBuffer, String)], 2)
    } else {
      logger.info(s"No files are present in folder")
    }
  }

  @scala.annotation.tailrec
  def sendBatch(
    files: Seq[Files],
    buffer: Seq[(ByteBuffer, String)],
    numberOfRecordsToSend: Int
  ): Unit =
    files match {
      case Nil => {
        if (buffer.nonEmpty) {
          sendToKinesis(streamName, buffer) map { putDataResult =>
            val putDataList = putDataResult.getRecords.asScala.toList
            logger.info(
              s"Successfully Sent"
            )
          }
        } else {
          logger.info(s"Successfully sent")
        }
      }
      case head :: tail => {
        val fileData = readFileData()
        val byteData: Seq[(ByteBuffer, String)] = transformDataAndConvertToByteBuffer(fileData)

        val currentBatch = buffer ++ byteData
        if (currentBatch.size >= numberOfRecordsToSend) {
          sendToKinesis(streamName, buffer)  map { putRecordRes =>
            val putDataList = putRecordRes.getRecords.asScala.toList
            logger.info(
              s"Sent successfully" 
            )
          }
          sendBatch(tail, Seq.empty[(ByteBuffer, String)], 2)
        } else {
          sendBatch(tail, currentBatch, 2)
        }
      }
    }

sendToKinesis использует KCL putRecords.

Проблемы с кодом выше:

  • Читает все данные из одного файла. Поэтому, если в файле 5 записей, в kinesis будет отправлено 5 записей, но размер пакета равен 2.

    Невозможно вызвать хвостовой рекурсивный метод с карты.

    Также следует позаботиться, если - если file1 имеет 3 записи, он должен отправить 2 записи RF11, RF12 вместе, а затем RF13, RF21 вместе и, наконец, RF22.

Я не хочу использовать переменную в моем коде. Можно ли это решить с помощью хвоста re c?

1 Ответ

2 голосов
/ 02 апреля 2020

У вас есть две подзадачи

  1. Как отправлять пакеты фиксированного размера
@scala.annotation.tailrec
def sendBatch(file: Option[File], buffer: Seq[(ByteBuffer, String)], numbersOfRecrodsToSend: Int): Seq[(ByteBuffer, String)] = {
  if (buffer.length < numbersOfRecrodsToSend) {
    // case 1: too few records to be sent 
    file match {
      // case 1.1: file was not yet read
      case Some(f) => sendBatch(None, buffer ++ getByteData(f), numbersOfRecrodsToSend)
      // case 1.2: too few records, file was already read, return leftover records
      case None => buffer
    }
  } else {
    // case 2: we can send numbersOfRecrodsToSend to Kinesis
    val (toSend, newBuffer) = buffer.splitAt(numbersOfRecrodsToSend)
    sendToKinesis(streamName, toSend)
    sendBatch(file, newBuffer, numbersOfRecrodsToSend)
  }
}
Как перебрать список и отправить пакеты фиксированного размера
// start with empty list of files to send and for each folder
// add it's files to the buffer and send as many records as you can
// the leftover is going to be passed to next iteration for both files and directories
val partial = listOfFolders.foldLeft(Seq.empty[(ByteBuffer, String)]) { case (acc, folder) =>
  fetchAllFilesFromFolder(folder).foldLeft(acc) { case (acc2, file) => 
    sendBatch(Some(file), acc2, numbersOfRecrodsToSend)
  }
}

// if any records have left - send them too
if (partial.nonEmpty) {
  sendToKinesis(streamName, partial)
}

Надеюсь, у вас есть идея.

...