Я пытаюсь решить проблему с помощью хвостовой рекурсии. Вариант использования:
У меня есть список папок, и у каждой папки есть список файлов, и у каждого файла есть несколько записей. Я хочу выполнить некоторые преобразования записей и записать их в kinesis в пакетном режиме.
val listOfFolders = Folder1(File1(RF11, RF12, RF13), File2(RF21,RF22))
Я хочу написать, скажем, две записи одновременно в kinesis. До сих пор я пытался:
listOfFolders.map { folder =>
val files = fetchAllFilesFromFolder(folder)
if (files.nonEmpty) {
sendBatch(files, Seq.empty[(ByteBuffer, String)], 2)
} else {
logger.info(s"No files are present in folder")
}
}
@scala.annotation.tailrec
def sendBatch(
files: Seq[Files],
buffer: Seq[(ByteBuffer, String)],
numberOfRecordsToSend: Int
): Unit =
files match {
case Nil => {
if (buffer.nonEmpty) {
sendToKinesis(streamName, buffer) map { putDataResult =>
val putDataList = putDataResult.getRecords.asScala.toList
logger.info(
s"Successfully Sent"
)
}
} else {
logger.info(s"Successfully sent")
}
}
case head :: tail => {
val fileData = readFileData()
val byteData: Seq[(ByteBuffer, String)] = transformDataAndConvertToByteBuffer(fileData)
val currentBatch = buffer ++ byteData
if (currentBatch.size >= numberOfRecordsToSend) {
sendToKinesis(streamName, buffer) map { putRecordRes =>
val putDataList = putRecordRes.getRecords.asScala.toList
logger.info(
s"Sent successfully"
)
}
sendBatch(tail, Seq.empty[(ByteBuffer, String)], 2)
} else {
sendBatch(tail, currentBatch, 2)
}
}
}
sendToKinesis использует KCL putRecords.
Проблемы с кодом выше:
Читает все данные из одного файла. Поэтому, если в файле 5 записей, в kinesis будет отправлено 5 записей, но размер пакета равен 2.
Невозможно вызвать хвостовой рекурсивный метод с карты.
Также следует позаботиться, если - если file1 имеет 3 записи, он должен отправить 2 записи RF11, RF12 вместе, а затем RF13, RF21 вместе и, наконец, RF22.
Я не хочу использовать переменную в моем коде. Можно ли это решить с помощью хвоста re c?