У меня следующая проблема: есть большой CSV-файл, который мы читаем с помощью spark.Нам нужно преобразовать каждую строку файла и записать его обратно в другой текстовый файл.Во время этого преобразования нам нужно вызвать внешнюю службу через REST, и прежде чем мы напишем файл, нам нужно получить ответ от службы.Тем временем мы можем выполнять другие действия в Spark.
Наивная реализация будет выглядеть так:
val keys = spark.read.csv("/path/to/myfile.csv")
.map(row => {
val result = new Param(row(0), row(3), row(7))
result
})
.collect()
val enrichedData: Map[String,String] = keys.map(key =>
externalService.getValue(key)) // unpredictable response time
val finalResult = spark.read.csv("/path/to/myfile.csv") // read same file twice
.map(row => doSomeTransformation(row))
.map(row => doSomeMoreTransformation(row))
.map(row => andAnotherCostlyOperation(row))
.map(row => {
val key = row(0)
val myData = enrichedData(key) // only here we use the data from ext.serv.
enrichRow(row, myData)
})
.write.csv("/path/to/output.csv")
Проблема в том, что я читаю один и тот же входной файл дважды.Для начала мне нужно собрать все ключи из файла и отправить их во внешний сервис.Как только внешняя служба ответит, я снова смогу просмотреть тот же файл и использовать полученные данные для завершения обработки данных.
Как сделать это более эффективно?Мне не нужны данные из внешнего сервиса до самого последнего шага, поэтому было бы неплохо, если бы Spark мог выполнять другие преобразования параллельно, а затем, когда он достигнет последней функции map (), он будет ждать (потенциально0 сек) для данных из внешнего сервиса.