Java Создать пользовательский поток в конвейере - PullRequest
0 голосов
/ 24 мая 2019

Мне нужно использовать поток сообщений и генерировать уведомления. Скажем, при входе Stream<Message> мой процессор обрабатывает его, а затем генерирует Stream<Notification> в соответствии с некоторыми вычислениями. Это не простая операция отображения, процессор имеет состояние, ему нужно запомнить определенное количество предыдущих сообщений, вычислить скользящее среднее и распознать некоторые другие шаблоны для генерации потока уведомлений.

Я не могу найти подходящую операцию для использования из промежуточных операций Stream - filter(), map() и так далее. Одним из способов является использование forEach(). Но это терминальная операция, и я не могу сгенерировать полученный поток и передать его по конвейеру.

Я новичок в потоке Java и мне интересно, как я могу использовать модель потока Java для достижения вышеуказанной цели.

Поток:

Stream<Message> ---> (Notification processor) ---> Stream<Notification> ---> ...

Edit:

Я еще не начал реализацию, но могу представить, что код будет выглядеть так:

public class NotificationProcessor {

    @Autowired
    private Averager averager;

    @Autowired
    private TrendAnalyser trendAnalyser;

    private long prevNotificationTime;

    public void consume(Message message) {


        if (message.getRate() >  averager.getAverage() + THRESHOLD) {
            // Generate notification A here
        }

        // Adjust the moving average
        averager.put(message);

        trendAnalyser.analyze(message);
        if (trendAnalyser.isFalling()) {
            Date now  = new Date();
            // Throttle
            if (now.getTime() - prevNotificationTime > 60) {
                prevNotificationTime = now.getTime();
                // Generate notification B here
            }
        }
    }
}

Это просто традиционный класс Java, который потребляет сообщения. Я все еще изучаю модель Stream, поэтому я не уверен, как я могу подключить к ней класс.

Подробнее Редактировать:

Путь Хольгера довольно аккуратный и твердый, я думаю, что это очень хороший дизайн. Однако позже я узнал, что могу использовать класс для отслеживания состояния и вызова его методов в Stream.map()

NotificationProcessor processor;
stream.map(s -> processor.consume(s)).filter(s -> s != null)

1 Ответ

4 голосов
/ 24 мая 2019

Пользовательские операции, не вписывающиеся в функциональный API, могут быть реализованы через интерфейс Spliterator.

Чтобы привести простой пример, следующая операция объединит элемент String сего предыдущий элемент, если не- null:

public static Stream<String> concatWithPrevious(Stream<String> source) {
    boolean parallel = source.isParallel();
    Spliterator<String> sp = source.spliterator();
    return StreamSupport.stream(new Spliterators.AbstractSpliterator<String>(
        sp.estimateSize(),
        sp.characteristics()&~(Spliterator.DISTINCT|Spliterator.SORTED)) {

        private String previous;

        @Override
        public boolean tryAdvance(Consumer<? super String> action) {
            return sp.tryAdvance(s -> {
                String p = previous;
                previous = s;
                action.accept(p == null? s: s == null? p: p.concat(s));
            });
        }
    }, parallel).onClose(source::close);
}

Центральным элементом является метод tryAdvance, который должен вызвать метод Consumer accept со следующим элементом и вернуть true, если он есть, или просто вернуть false, если достигнут конец потока.

Есть также характеристики и предполагаемый размер (который будет точным размером, когда SIZED имеется характеристика), которую приведенный выше пример в основном берет из сплитератора исходного потока.Я оставляю это в качестве упражнения для читателя, почему здесь удаляются характеристики DISTINCT и SORTED, когда они присутствуют для исходного потока.

Параллельная обработка будет включена с помощью метода trySplit, который будетнаследоваться от AbstractSpliterator здесь.Этот метод будет выполнять буферизацию элементов в массив, что не очень эффективно, но для сплитератора, имеющего такую ​​зависимость от предыдущего элемента, это лучшее, что мы можем получить.


Когда мы запустим этоНапример, с

concatWithPrevious(
    IntStream.range('A', 'Z')
        .mapToObj(i -> String.valueOf((char)i))
        .peek(s -> System.out.println("source stream: "+s))
)
.filter(Predicate.isEqual("EF"))
.findFirst()
.ifPresent(s -> System.out.println("result: "+s));

мы получаем

source stream: A
source stream: B
source stream: C
source stream: D
source stream: E
source stream: F
result: EF

, демонстрирующий, что лень потоков все еще сохраняется.


Взяв в качестве примера эскиз вашей задачи,Я хотел бы рассмотреть возможность изменения кода, например

public class NotificationProcessor {
    @Autowired
    private Averager averager;

    @Autowired
    private TrendAnalyser trendAnalyser;

    private long prevNotificationTime;

    public void consume(Message message, Queue<Notification> queue) {


        if (message.getRate() >  averager.getAverage() + THRESHOLD) {
            // Generate notification A here
            queue.add(…);
        }

        // Adjust the moving average
        averager.put(message);

        trendAnalyser.analyze(message);
        if (trendAnalyser.isFalling()) {
            Date now  = new Date();
            // Throttle
            if (now.getTime() - prevNotificationTime > 60) {
                prevNotificationTime = now.getTime();
                // Generate notification B here
                queue.add(…);
            }
        }
    }
}

, и использовать его в операции потока, например

public static Stream<Notification> notificationProcessor(Stream<Message> source) {
    // replace with intended factory mechanism or make it a parameter
    NotificationProcessor proc = new NotificationProcessor();

    boolean parallel = source.isParallel();
    Spliterator<Message> sp = source.spliterator();
    return StreamSupport.stream(new Spliterators.AbstractSpliterator<Notification>(
        sp.estimateSize(),
        sp.characteristics() & Spliterator.ORDERED | Spliterator.NONNULL) {

        final Queue<Notification> queue = new ArrayDeque<>(2);

        @Override
        public boolean tryAdvance(Consumer<? super Notification> action) {
            while(queue.isEmpty()) {
                if(!sp.tryAdvance(msg -> proc.consume(msg, queue))) {
                    return false;
                }
            }
            action.accept(queue.remove());
            return true;
        }
    }, parallel).onClose(source::close);
}

Поскольку каждый элемент источника может генерировать от нуля до двух элементов, не можетбыть характеристикой SIZED, на самом деле я решил быть здесь консервативным и сохранить только характеристику ORDERED, которая, как вы сказали, относится к вашей операции, и добавила NONNULL, которая кажется подходящей для вашего кода.

Поскольку каждый вызов tryAdvance должен обеспечивать ровно один элемент или не иметь элемента только при достижении конца потока, существует очередь из двух элементовэто нужно¹.Если очередь пуста, источник будет запрашиваться до тех пор, пока не будет хотя бы один элемент или пока не будет достигнут конец источника.Затем следующий элемент будет передан потребителю, если в очереди есть элемент.


here Здесь мы можем работать с очередью размером один, сразу же потребляя первый ожидающий элемент без очереди, но это значительно усложнит код

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...