Отслеживание причинно-следственных связей в очереди сообщений - PullRequest
0 голосов
/ 17 мая 2018

Я работаю в системе, где мне приходится обрабатывать несколько сообщений, но сохранять частичный порядок этих сообщений.У меня есть очередь RabbitMQ с сообщениями двух типов: создание элементов и обновление элементов.

Предположим, у нас есть очередь с 6 сообщениями:

CREATE1 CREATE2 UPDATE1 UPDATE2 UPDATE1 UPDATE1
  • Если я обрабатываю их по одному, то это совершенно нормально, но очень медленно, потому что у меня естьмного сообщений

  • Если я прочитаю их в некоторый буфер, то смогу обрабатывать их параллельно, но могу обработать UPDATE1 для первого элемента, который еще не был создан.Хуже того, последнее обновление может быть обработано раньше предыдущего и, таким образом, стереть последнее состояние элемента

Я могу создать дополнительное сообщение в сообщении или поместить его в очередь с некоторым дополнительным заголовком,например, MESSAGE_ID:10, чтобы убедиться, что все сообщения для одного элемента имеют одинаковые MESSAGE_ID.Проблема в том, что я не знаю, что с ним делать.

Как я могу читать из очереди сразу несколько элементов, не нарушая причинно-следственную связь между сообщениями?


Псевдокод, которыйЯ думаю, что для этой задачи может быть:

const prefetchItemsCount = 20
let buffer = new Message[prefetchItemsCount]
let set = new Set()
foreach item in queue
    if !set.Contains(item.MessageId)
         set.Add(item.MessageId)
         buffer.Add(item)
         if set.Count == buffer.Count
             break
return buffer

Так что в нашем примере он вернет следующие последовательности элементов:

CREATE1 CREATE2
UPDATE1 UPDATE2
UPDATE1
UPDATE1

, что делает его почти вдвое быстрее

1 Ответ

0 голосов
/ 18 мая 2018

Как я могу прочитать из очереди несколько элементов сразу без прерывания причинность между сообщениями?

Действительно хороший случай.

Если действительно выполняется желаемым образом, особенность TimeDOMAIN « сразу » идет в основном против скрытой морфологии , что выражается как « причинность ».

Дается вместе с входной стороной QUEUE, которая по определению является чистой - [SERIAL] (ничего не может произойти сразу, только чистый один ход за другим, даже если " just "- [CONCURENT] планирование может быть выставлено внешним агентам, внутреннее управление QUEUE соответствует чисто последовательному упорядочению внутреннего потока и доставки сообщений (также ссылается на метки времени, постоянство и другие их артефакты)).

Причинность также означает некоторый cause -> effect порядок событий, как в смысле абстрактной причинности отношения, так и в потоке реального времени того, как вещи действительно происходят, так практически против паттерна " сразу ".

И последнее, но не менее важное: Причинность также должна обрабатывать дополнительную парадигму, задержка между стороной cause -> и -> effect сторона (часто F inite- S tate- A utomata, обычно с гораздо более богатым пространством состояний, чем просто { 0 -> CREATE -> UPDATE [ -> UPDATE [...] ] -> }) серия событий.

Результат

В то время как можно «читать», используя некоторую степень [CONCURRENT] -планирования процессов, условия FSA / Causality в основном избегают выхода из основного чистого- [SERIAL] постобработки события сообщения доставлены.

В этом случае появятся дополнительные правила, если среда обмена сообщениями не содержит посредников и не гарантирует гарантированную устойчивость к потерянным сообщениям / упорядочению сообщений / подлинности сообщений / содержанию сообщений.

Там дьяволы начинают танцевать против ваших попыток построить последовательную, распределенную обработку транзакций, надежную, распределенную FSA: o)

...