У меня есть внешний (то есть я не могу его изменить) Java API, который выглядит следующим образом:
public interface Sender {
void send(Event e);
}
Мне нужно реализовать Sender
, который принимает каждое событие, преобразует его в JSONобъект, собирает некоторое количество из них в один пакет и отправляет через HTTP к некоторой конечной точке.Все это должно выполняться асинхронно, без send()
блокирования вызывающего потока, с некоторым буфером фиксированного размера и удалением новых событий, если буфер заполнен.
С потоками akka это довольно просто: я создаюграфик этапов (который использует akka-http для отправки HTTP-запросов), материализуйте его и используйте материализованный ActorRef
для отправки новых событий в поток:
lazy val eventPipeline = Source.actorRef[Event](Int.MaxValue, OverflowStrategy.fail)
.via(CustomBuffer(bufferSize)) // buffer all events
.groupedWithin(batchSize, flushDuration) // group events into chunks
.map(toBundle) // convert each chunk into a JSON message
.mapAsyncUnordered(1)(sendHttpRequest) // send an HTTP request
.toMat(Sink.foreach { response =>
// print HTTP response for debugging
})(Keep.both)
lazy val (eventsActor, completeFuture) = eventPipeline.run()
override def send(e: Event): Unit = {
eventsActor ! e
}
Здесь CustomBuffer
- это пользовательский GraphStage
который очень похож на библиотеку Buffer
, но с учетом наших конкретных потребностей;это, вероятно, не имеет значения для этого конкретного вопроса.
Как видите, взаимодействие с потоком из непотокового кода очень просто - метод !
для признака ActorRef
является асинхронным и неНужны какие-то дополнительные машины, чтобы называтьсяКаждое событие, которое отправляется субъекту, затем обрабатывается по всему реактивному конвейеру.Более того, из-за того, как реализован akka-http, я даже получаю пул соединений бесплатно, поэтому к серверу открывается не более одного соединения.
Однако я не могу найти способ сделать то же самое сFS2 правильно.Даже отбрасывая вопрос о буферизации (мне, вероятно, потребуется написать собственную реализацию Pipe
, которая делает дополнительные вещи, которые нам нужны) и пуле HTTP-соединений, я все еще застрял с более простой вещью - то есть, как протолкнутьданные в реактивный поток «извне».
Все учебники и документация, которые я могу найти, предполагают, что вся программа происходит внутри некоторого контекста эффекта, обычно IO
.Это не мой случай - метод send()
вызывается библиотекой Java в неопределенное время.Поэтому я просто не могу держать все внутри одного действия IO
, я обязательно должен завершить действие "push" внутри метода send()
и иметь реактивный поток как отдельную сущность, потому что я хочу агрегировать события и, надеюсь, объединитьHTTP-соединения (которые, я считаю, естественно связаны с реактивным потоком).
Я предполагаю, что мне нужна какая-то дополнительная структура данных, например Queue
.У fs2 действительно есть какой-то fs2.concurrent.Queue
, но, опять же, вся документация показывает, как использовать его внутри одного IO
контекста, поэтому я предполагаю, что сделать что-то вроде
val queue: Queue[IO, Event] = Queue.unbounded[IO, Event].unsafeRunSync()
и затем использовать queue
внутри определения потока, а затем отдельно внутри метода send()
с дальнейшими unsafeRun
вызовами:
val eventPipeline = queue.dequeue
.through(customBuffer(bufferSize))
.groupWithin(batchSize, flushDuration)
.map(toBundle)
.mapAsyncUnordered(1)(sendRequest)
.evalTap(response => ...)
.compile
.drain
eventPipeline.unsafeRunAsync(...) // or something
override def send(e: Event) {
queue.enqueue(e).unsafeRunSync()
}
не является правильным способом и, скорее всего, даже не будет работать.
Так, мой вопрос, как правильно использовать fs2 для решения моей проблемы?