Реактор Sequential (), кажется, теряет события - PullRequest
1 голос
/ 01 мая 2019

В Flux, где поток распараллеливается, а затем секвенируется, кажется, что предполагаемая циклическая природа метода sequential () не применяется последовательно, и кажется, что события могут заканчиваться так далеко из последовательности что они эффективно потеряны. Это поведение не является последовательным при нескольких запусках, при этом некоторые запуски выполняются поочередно, а другие сильно различаются.

Я понимаю, что события, скорее всего, будут в некоторой степени выходить из строя, но этой степени, даже в простом примере, может быть достаточно, чтобы некоторые события откладывались сверх их полезного срока службы.

Для фиксированного набора данных это может быть вполне приемлемо, но для потока событий от Kafka это может привести к потере данных, которые трудно отладить.

В этом примере на нескольких прогонах вы можете увидеть каждое четное число из 2-1000, напечатанное по порядку, а затем на другом прогоне увидеть серию четных чисел, начинающихся примерно с 2 и доходящих до семнадцати сотен, с некоторыми двузначные числа, никогда не встречающиеся в последовательности.

Я изменил количество параллельных потоков, последовательную предварительную выборку, добавил шаги publishOn и subscribeOn, но, похоже, ничто не делает это более или менее предсказуемым.

    Flux.range(1, 5000)
        .parallel(64)
        .runOn(Schedulers.newParallel("test", 64))
        .filter(integer -> integer % 2 == 0)
        .sequential()
        .take(500)
        .doOnNext(System.out::println)
        .blockLast();
  }

Конечно, через достаточно длительный промежуток времени появятся все значения, но на практике некоторые события могут быть отложены слишком долго, чтобы они могли быть полезны.

Круговой список не идеален, но мне он не кажется круговым. Здесь я что-то не так делаю или это более глубокий вопрос?

1 Ответ

0 голосов
/ 07 мая 2019

Я попробовал запустить ваш пример и каждый раз получаю ровно 500 предметов.

Вы не можете ожидать, что он произведет предсказуемую последовательность, потому что обработка параллельна, и у вас, скорее всего, меньше ядер, чем количество потоков, которое вы здесь используете (64). Некоторые потоки не получат достаточно ресурсов ЦП для выполнения своих задач, и другие потоки выиграют из-за этого, и take(500) выбирает выигрышные номера.

Распределение parallel является циклическим, но обработка выполняется планировщиком потоков.

...