Толкаем элементы наружу в реактивный поток в фс2 - PullRequest
0 голосов
/ 30 ноября 2018

У меня есть внешний (то есть я не могу его изменить) Java API, который выглядит следующим образом:

public interface Sender {
    void send(Event e);
}

Мне нужно реализовать Sender, который принимает каждое событие, преобразует его в JSONобъект, собирает некоторое количество из них в один пакет и отправляет через HTTP к некоторой конечной точке.Все это должно выполняться асинхронно, без send() блокирования вызывающего потока, с некоторым буфером фиксированного размера и удалением новых событий, если буфер заполнен.

С потоками akka это довольно просто: я создаюграфик этапов (который использует akka-http для отправки HTTP-запросов), материализуйте его и используйте материализованный ActorRef для отправки новых событий в поток:

lazy val eventPipeline = Source.actorRef[Event](Int.MaxValue, OverflowStrategy.fail)
  .via(CustomBuffer(bufferSize))  // buffer all events
  .groupedWithin(batchSize, flushDuration)  // group events into chunks
  .map(toBundle)  // convert each chunk into a JSON message
  .mapAsyncUnordered(1)(sendHttpRequest)  // send an HTTP request
  .toMat(Sink.foreach { response =>
    // print HTTP response for debugging
  })(Keep.both)

lazy val (eventsActor, completeFuture) = eventPipeline.run()

override def send(e: Event): Unit = {
  eventsActor ! e
}

Здесь CustomBuffer - это пользовательский GraphStage который очень похож на библиотеку Buffer, но с учетом наших конкретных потребностей;это, вероятно, не имеет значения для этого конкретного вопроса.

Как видите, взаимодействие с потоком из непотокового кода очень просто - метод ! для признака ActorRef является асинхронным и неНужны какие-то дополнительные машины, чтобы называтьсяКаждое событие, которое отправляется субъекту, затем обрабатывается по всему реактивному конвейеру.Более того, из-за того, как реализован akka-http, я даже получаю пул соединений бесплатно, поэтому к серверу открывается не более одного соединения.

Однако я не могу найти способ сделать то же самое сFS2 правильно.Даже отбрасывая вопрос о буферизации (мне, вероятно, потребуется написать собственную реализацию Pipe, которая делает дополнительные вещи, которые нам нужны) и пуле HTTP-соединений, я все еще застрял с более простой вещью - то есть, как протолкнутьданные в реактивный поток «извне».

Все учебники и документация, которые я могу найти, предполагают, что вся программа происходит внутри некоторого контекста эффекта, обычно IO.Это не мой случай - метод send() вызывается библиотекой Java в неопределенное время.Поэтому я просто не могу держать все внутри одного действия IO, я обязательно должен завершить действие "push" внутри метода send() и иметь реактивный поток как отдельную сущность, потому что я хочу агрегировать события и, надеюсь, объединитьHTTP-соединения (которые, я считаю, естественно связаны с реактивным потоком).

Я предполагаю, что мне нужна какая-то дополнительная структура данных, например Queue.У fs2 действительно есть какой-то fs2.concurrent.Queue, но, опять же, вся документация показывает, как использовать его внутри одного IO контекста, поэтому я предполагаю, что сделать что-то вроде

val queue: Queue[IO, Event] = Queue.unbounded[IO, Event].unsafeRunSync()

и затем использовать queue внутри определения потока, а затем отдельно внутри метода send() с дальнейшими unsafeRun вызовами:

val eventPipeline = queue.dequeue
  .through(customBuffer(bufferSize))
  .groupWithin(batchSize, flushDuration)
  .map(toBundle)
  .mapAsyncUnordered(1)(sendRequest)
  .evalTap(response => ...)
  .compile
  .drain

eventPipeline.unsafeRunAsync(...)  // or something

override def send(e: Event) {
  queue.enqueue(e).unsafeRunSync()
}

не является правильным способом и, скорее всего, даже не будет работать.

Так, мой вопрос, как правильно использовать fs2 для решения моей проблемы?

Ответы [ 2 ]

0 голосов
/ 03 февраля 2019

Рассмотрим следующий пример:

import cats.implicits._
import cats.effect._
import cats.effect.implicits._
import fs2._
import fs2.concurrent.Queue

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

object Answer {
  type Event = String

  trait Sender {
    def send(event: Event): Unit
  }

  def main(args: Array[String]): Unit = {
    val sender: Sender = {
      val ec = ExecutionContext.global
      implicit val cs: ContextShift[IO] = IO.contextShift(ec)
      implicit val timer: Timer[IO] = IO.timer(ec)

      fs2Sender[IO](2)
    }

    val events = List("a", "b", "c", "d")
    events.foreach { evt => new Thread(() => sender.send(evt)).start() }
    Thread sleep 3000
  }

  def fs2Sender[F[_]: Timer : ContextShift](maxBufferedSize: Int)(implicit F: ConcurrentEffect[F]): Sender = {
    // dummy impl
    // this is where the actual logic for batching
    //   and shipping over the network would live
    val consume: Pipe[F, Event, Unit] = _.evalMap { event =>
      for {
        _ <- F.delay { println(s"consuming [$event]...") }
        _ <- Timer[F].sleep(1.seconds)
        _ <- F.delay { println(s"...[$event] consumed") }
      } yield ()
    }

    val suspended = for {
      q <- Queue.bounded[F, Event](maxBufferedSize)
      _ <- q.dequeue.through(consume).compile.drain.start
      sender <- F.delay[Sender] { evt =>
        val enqueue = for {
          wasEnqueued <- q.offer1(evt)
          _ <- F.delay { println(s"[$evt] enqueued? $wasEnqueued") }
        } yield ()
        enqueue.toIO.unsafeRunAsyncAndForget()
      }
    } yield sender

    suspended.toIO.unsafeRunSync()
  }
}

Основная идея - использовать параллельную очередь из fs2.Обратите внимание, что приведенный выше код демонстрирует, что ни интерфейс Sender, ни логика в main не могут быть изменены.Только реализация интерфейса Sender может быть заменена.

0 голосов
/ 30 ноября 2018

У меня нет особого опыта именно с этой библиотекой, но она должна выглядеть примерно так:

import cats.effect.{ExitCode, IO, IOApp}
import fs2.concurrent.Queue

case class Event(id: Int)

class JavaProducer{

  new Thread(new Runnable {
    override def run(): Unit = {
      var id = 0
      while(true){
        Thread.sleep(1000)
        id += 1
        send(Event(id))
      }
    }
  }).start()

  def send(event: Event): Unit ={
    println(s"Original producer prints $event")
  }
}

class HackedProducer(queue: Queue[IO, Event]) extends JavaProducer {
  override def send(event: Event): Unit = {
    println(s"Hacked producer pushes $event")
    queue.enqueue1(event).unsafeRunSync()
    println(s"Hacked producer pushes $event - Pushed")
  }

}

object Test extends IOApp{
  override def run(args: List[String]): IO[ExitCode] = {
    val x: IO[Unit] = for {
      queue <- Queue.unbounded[IO, Event]
      _ = new HackedProducer(queue)
      done <- queue.dequeue.map(ev => {
        println(s"Got $ev")
      }).compile.drain
    } yield done
    x.map(_ => ExitCode.Success)
  }

}
...