Обновление потока при изменении файла - PullRequest
0 голосов
/ 23 сентября 2019

Используя приведенный ниже код, я читаю и печатаю содержимое файла с использованием потоков Akka:

package playground

import java.nio.file.Paths
import akka.actor.ActorSystem
import akka.stream.scaladsl.{FileIO, Framing, Sink, Source}
import akka.util.ByteString
import akka.stream.ActorMaterializer

object Greeter extends App {

  implicit val system = ActorSystem("map-management-service")
  implicit val materializer = ActorMaterializer()

  FileIO.fromPath(Paths.get("a.csv"))
    .via(Framing.delimiter(ByteString("\n"), 256, true).map(_.utf8String)).runForeach(println)

}

Мое понимание использования потоков Akka состоит в том, что если файл изменяет / обновляет код обработки, в данном случае println запускается, поэтому при каждом обновлении файла весь файл перечитывается.Но этого не происходит - файл читается один раз.

Как это следует изменить, чтобы каждый раз, когда файл a.csv обновлялся, файл перечитывался и код println перезапускался

Ответы [ 2 ]

1 голос
/ 25 сентября 2019

Я хотел бы остановиться на Ответ Джеффри с полностью работоспособным Аммонитовым сценарием :

import $ivy.`com.lightbend.akka::akka-stream-alpakka-file:1.1.1`

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{ FileIO, Framing }
import akka.stream.alpakka.file.DirectoryChange
import akka.stream.alpakka.file.scaladsl.DirectoryChangesSource
import akka.util.ByteString
import java.nio.file.Paths
import scala.concurrent.duration._

implicit val system = ActorSystem("map-management-service")
implicit val materializer = ActorMaterializer()

val myFile = Paths.get("a.csv")
val changes = DirectoryChangesSource(Paths.get("."), pollInterval = 3.seconds, maxBufferSize = 1000)

changes
  .filter {
    case (path, dirChange) =>
      path.endsWith(myFile) && (dirChange == DirectoryChange.Creation || dirChange == DirectoryChange.Modification)
  }
  .flatMapConcat {
    case (path, _) => FileIO.fromPath(path).via(Framing.delimiter(ByteString("\n"), 256, true))
  }
  .map(_.utf8String)
  .runForeach(println)

Пожалуйста, направьте возражения на его ответ для оригинальной идеи.

1 голос
/ 24 сентября 2019

Alpakka's DirectoryChangesSource может соответствовать вашему варианту использования.Например:

import akka.stream.alpakka.file.DirectoryChange
import akka.stream.alpakka.file.scaladsl.DirectoryChangesSource

implicit val system = ActorSystem("map-management-service")
implicit val materializer = ActorMaterializer()

val myFile = Paths.get("a.csv")
val changes = DirectoryChangesSource(Paths.get("."), pollInterval = 3.seconds, maxBufferSize = 1000)

changes
  .filter {
    case (path, dirChange) =>
      path.endsWith(myFile) && (dirChange == DirectoryChange.Creation || dirChange == DirectoryChange.Modification)
  }
  .flatMapConcat(_ => FileIO.fromPath(myFile).via(Framing.delimiter(ByteString("\n"), 256, true)))
  .map(_.utf8String)
  .runForeach(println)

Приведенный выше фрагмент кода печатает содержимое файла при его создании и при каждом изменении файла с опросом с интервалом в три секунды.

...