Как записать текущие данные в отдельные текстовые файлы с помощью Akka - PullRequest
0 голосов
/ 06 ноября 2018

Я нашел относительно похожие вопросы, но все еще не могу найти решение, которое мне нужно.

Короче говоря, у меня есть поток данных, который отменяет некоторые преобразования по пути, и в конце я получаю объекты, похожие на это (String, Seq [Strings]), которые представляют имя раздела строки и список доступных кодов стран. для этого раздела. Второй вариант - иметь, например, последовательность Seq [(String, String)] с набором кортежей (имя раздела, код страны), но у меня все равно будет такая же проблема.

В конце я напишу имена разделов, связанных с этим кодом страны, в файл с именем «smth_COUNTRY_CODE.txt»

Проблема в том, что я не могу понять, как сделать это написание отдельно для каждого кода страны в последовательности строк.

Пока у меня есть что-то вроде:

case class PartitionWithCodes(name: String, codes: Seq[String])

val partitions = Await.result(queryApi.getPartitions(inputCatalogVersion, layer, AdditionalFields.All), Duration.Inf)

    val flowRdfPartition = Flow[Partition].mapAsync(parallelism)(partition => readEngine.get(partition, data => RoadAttributesPartition.parseFrom(data)))
    val flowPartitionName = Flow[RoadAttributesPartition]
      .filter(CountryCodeUtils.filterPartitionByTheCode(_, countryCodes))
CountryCodeUtils.matchCountryCodesToObject(partition))
      .map(partition => CountryCodeUtils.getPartitionsWithCodes(partition))
// at this point I will have PartitionWithCodes object

Теперь я постараюсь сделать следующее:

val fileFlow = Flow[PartitionWithCodes]
    .map(partition =>
      partition.codes.map {
        code => {
          val outFile = constructFilename(fileName, code, format)
          Source.single(ByteString(partition.name + separator)).runWith(FileIO.toPath(Paths.get(outFile))
            .mapMaterializedValue { f =>
              f.onComplete {
                case Success(r) if r.wasSuccessful => println(s"Read ${r.count} bytes from ${partition.name}")
                case Success(r) => println(s"Something went wrong when reading ${partition.name}: ${r.getError}")
                case Failure(NonFatal(e)) => println(s"Something went wrong when reading ${partition.name}: $e")
              }
              Keep.right
            }
          )
        }
      })


// Of course code below wouldn't work.
val source = partitions.via(flowRdfPartition).via(flowPartitionName).via(fileFlow)


  val fileSink = Sink.foreach[IOResult]{println}

  val result = source.via(fileFlow).to(fileSink)

Итак, вопрос в том, как сделать эту запись для каждого кода страны в последовательности? Я разместил часть кода fileFlow в качестве примера того, в каком направлении движутся мои мысли, но я могу ошибаться. Я совсем новичок в Акке и не могу понять, как мне поступить.

Заранее спасибо. Описание довольно расплывчато, но при необходимости я объясню более подробно.

...