Я нашел относительно похожие вопросы, но все еще не могу найти решение, которое мне нужно.
Короче говоря, у меня есть поток данных, который отменяет некоторые преобразования по пути, и в конце я получаю объекты, похожие на это (String, Seq [Strings]), которые представляют имя раздела строки и список доступных кодов стран. для этого раздела. Второй вариант - иметь, например, последовательность Seq [(String, String)] с набором кортежей (имя раздела, код страны), но у меня все равно будет такая же проблема.
В конце я напишу имена разделов, связанных с этим кодом страны, в файл с именем «smth_COUNTRY_CODE.txt»
Проблема в том, что я не могу понять, как сделать это написание отдельно для каждого кода страны в последовательности строк.
Пока у меня есть что-то вроде:
case class PartitionWithCodes(name: String, codes: Seq[String])
val partitions = Await.result(queryApi.getPartitions(inputCatalogVersion, layer, AdditionalFields.All), Duration.Inf)
val flowRdfPartition = Flow[Partition].mapAsync(parallelism)(partition => readEngine.get(partition, data => RoadAttributesPartition.parseFrom(data)))
val flowPartitionName = Flow[RoadAttributesPartition]
.filter(CountryCodeUtils.filterPartitionByTheCode(_, countryCodes))
CountryCodeUtils.matchCountryCodesToObject(partition))
.map(partition => CountryCodeUtils.getPartitionsWithCodes(partition))
// at this point I will have PartitionWithCodes object
Теперь я постараюсь сделать следующее:
val fileFlow = Flow[PartitionWithCodes]
.map(partition =>
partition.codes.map {
code => {
val outFile = constructFilename(fileName, code, format)
Source.single(ByteString(partition.name + separator)).runWith(FileIO.toPath(Paths.get(outFile))
.mapMaterializedValue { f =>
f.onComplete {
case Success(r) if r.wasSuccessful => println(s"Read ${r.count} bytes from ${partition.name}")
case Success(r) => println(s"Something went wrong when reading ${partition.name}: ${r.getError}")
case Failure(NonFatal(e)) => println(s"Something went wrong when reading ${partition.name}: $e")
}
Keep.right
}
)
}
})
// Of course code below wouldn't work.
val source = partitions.via(flowRdfPartition).via(flowPartitionName).via(fileFlow)
val fileSink = Sink.foreach[IOResult]{println}
val result = source.via(fileFlow).to(fileSink)
Итак, вопрос в том, как сделать эту запись для каждого кода страны в последовательности? Я разместил часть кода fileFlow в качестве примера того, в каком направлении движутся мои мысли, но я могу ошибаться.
Я совсем новичок в Акке и не могу понять, как мне поступить.
Заранее спасибо. Описание довольно расплывчато, но при необходимости я объясню более подробно.