Как регулировать запрос в графе, используя поток akka? - PullRequest
0 голосов
/ 09 января 2019

Фон

У меня есть проект, в котором мы используем akka-streams с Java.

В этом проекте у меня есть поток строк и график, который выполняет с ними некоторые операции.

Цель

На моем графике я хочу транслировать этот поток двум работникам. Один из них заменит все символы 'a' на 'A' и отправит данные по мере их получения в режиме реального времени.

Другой получит данные, и каждые 3 строки он объединит эти 3 строки и отобразит их в числа.

Это будет выглядеть следующим образом:

akka-streams-buffer

Очевидно, Sink 2 не будет получать информацию так же быстро, как Sink 1. но это ожидаемое поведение. Интересная часть здесь, это рабочий 2.

Проблема

Делать работника 1 легко, а не сложно. Проблема здесь в том, что работающий 2. Я знаю, что у akka есть буферы, которые могут сохранять до X сообщений, но тогда мне кажется, что я вынужден выбрать одну из существующих стратегий переполнения , которые часто приводят к выбору того, какое сообщение Я хочу отбросить или я хочу сохранить поток в прямом эфире или нет.

Все, что я хочу, это когда мой буфер в worke2 достигает максимального размера буфера, чтобы выполнить операции concat и map для всех сообщений, которые у него есть, а затем отправить их вместе (сбросить буфер после).

Но даже после прочтения документации по скорости потока для akka я не смог найти способ сделать это, по крайней мере, с помощью Java.

Исследования

Я также проверил аналогичный вопрос SO, Избирательное регулирование запросов с использованием потока akka-http Однако прошло более года, и никто не ответил.

Вопросы

Используя график DSL, как мне создать путь из:

Источник -> bcast -> worker2 -> Sink 2

??

1 Ответ

0 голосов
/ 09 января 2019

После вашего bcast применяется оператор groupedWithin с неограниченной продолжительностью и набором элементов, равным 3. https://doc.akka.io/docs/akka/2.5/stream/operators/Source-or-Flow/groupedWithin.html

Вы также можете сделать это самостоятельно, добавив этап, который хранит элемент в List и генерирует список каждый раз, когда он достигает 3 элементов.

import akka.stream.Attributes;
import akka.stream.FlowShape;
import akka.stream.Inlet;
import akka.stream.Outlet;
import akka.stream.stage.AbstractInHandler;
import akka.stream.stage.GraphStage;
import akka.stream.stage.GraphStageLogic;
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.List;

public class RecordGrouper<T> extends GraphStage<FlowShape<T, List<T>>> {

  private final Inlet<T> inlet = Inlet.create("in");
  private final Outlet<List<T>> outlet = Outlet.create("out");
  private final FlowShape<T, List<T>> shape = new FlowShape<>(inlet, outlet);

  @Override
  public GraphStageLogic createLogic(Attributes inheritedAttributes) {
    return new GraphStageLogic(shape) {
      List<T> batch = new ArrayList<>(3);

      {
        setHandler(
            inlet,
            new AbstractInHandler() {
              @Override
              public void onPush() {
                T record = grab(inlet);
                batch.add(record);
                if (batch.size() == 3) {
                  emit(outlet, ImmutableList.copyOf(batch));
                  batch.clear();
                }
                pull(inlet);
              }
            });
      }

      @Override
      public void preStart() {
        pull(inlet);
      }
    };
  }

  @Override
  public FlowShape<T, List<T>> shape() {
    return shape;
  }
}

Как побочный узел, я не думаю, что оператор buffer будет работать, поскольку он срабатывает только при наличии противодавления. Поэтому, если все тихо, элементы все равно будут излучаться один за другим, а не 3 на 3. https://doc.akka.io/docs/akka/2.5/stream/operators/Source-or-Flow/buffer.html

...