Как отправить несколько файлов производителю kafka, используя akka stream в Scala - PullRequest
0 голосов
/ 28 августа 2018

Я пытаюсь отправить несколько данных производителю kafka, используя поток akka, в то время как я написал сам производитель, но изо всех сил пытаюсь использовать akka-streamIO для получения нескольких файлов, которые будут данными, которые я хочу отправить на мой продюсер кафка это мой код:

 object App {

  def main(args: Array[String]): Unit = {
    val file = Paths.get("233339.8.1231731728115136.1722327129833578.log")
//    val file = Paths.get("example.csv")
//
//    val foreach: Future[IOResult] = FileIO.fromPath(file)
//      .to(Sink.ignore)
//      .run()

    println("Hello from producer")

    implicit val system:ActorSystem = ActorSystem("producer-example")
    implicit val materializer:Materializer = ActorMaterializer()

    val producerSettings = ProducerSettings(system,new StringSerializer,new StringSerializer)

    val done: Future[Done] =
      Source(1 to 955)
        .map(value => new ProducerRecord[String, String]("test-topic", s"$file : $value"))
        .runWith(Producer.plainSink(producerSettings))

    implicit val ec: ExecutionContextExecutor = system.dispatcher
    done onComplete  {
      case Success(_) => println("Done"); system.terminate()
      case Failure(err) => println(err.toString); system.terminate()
    }

  }

}

1 Ответ

0 голосов
/ 28 августа 2018

Учитывая несколько имен файлов:

val fileNames : Iterable[String] = ???

Можно создать Source, который будет выдавать содержимое файлов, соединенных вместе, используя flatMapConcat:

val chunkSize = 8192

val chunkSource : Source[ByteString, _] = 
  Source.apply(fileNames)
        .map(fileName => Paths get fileName)
        .flatMapConcat(path => FileIO.fromPath(path, chunkSize))

Это выдаст значения фиксированного размера ByteString, которые имеют длину chunkSize, за исключением, возможно, последнего значения, которое может быть меньше.

Если вы хотите разбить строки по некоторому разделителю, вы можете использовать Framing:

val delimiter : ByteString = ???

val maxFrameLength : Int = ???

val framingSource : Source[ByteString, _] =
  chunkSource.via(Framing.delimiter(delimiter, maxFrameLength))
...