Как избежать дублирования дополнений? - PullRequest
2 голосов
/ 19 апреля 2019

Проблема, которую я пытаюсь решить

У меня есть рабочий процесс, который изменяет атрибут ресурса (скажем, MyResource), накапливая значения из некоторых сообщений JSON, которые получает рабочий.Я пытаюсь найти лучший способ избежать накопления дубликатов, даже если одно и то же сообщение JSON получено рабочим процессом дважды или более раз.

Вот что я пробовал

Решение 1

Каждое сообщение JSON имеет уникальную метку времени, которая зависит от того, когда было создано сообщение JSON, я сохранил эту метку времени в MyResource и отклонил сообщение JSON, если оно имеет значение метки времени меньше значения в MyResource.

Проблема

Поскольку вся архитектура асинхронна, сообщения могут приниматься в любом порядке и не обязательно в том же порядке, в котором они были созданы.

Решение 2

Я создал новый атрибут (скажем, add_ids) в MyResource.Каждое сообщение JSON имеет уникальный идентификатор, я добавил этот идентификатор в MyResource.added_ids.И каждый раз при накоплении использованных add_ids для уже обработанных сообщений JSON.

Проблема

Я использую Монго для хранения MyResource.Каждый документ MyResource начал взрываться с этим массивом идентификаторов, так как сообщений JSON для каждого MyResource много.Также поиск в массиве - дорогостоящая операция.

Я ищу

Я ищу ответ, который может обрабатывать асинхронный характер, а также не взрывать мои монго документы.Также я не ищу точное решение, есть ли алгоритм / шаблон, который используется для решения подобных проблем?Я пробовал поискать в Google, но я не знаю, как назвать эту проблему, чтобы получить релевантные результаты.

Ответы [ 3 ]

0 голосов
/ 19 апреля 2019

Если ваше сообщение JSON может быть применено в любом порядке, то подход с отметкой времени, похоже, не имеет особого смысла.Описание проблемы не очень ясно по этому поводу - только о необходимости убедиться, что вы избегаете повторной обработки одного и того же сообщения.

Я работал над системой с аналогичным ограничением, и мы использовали следующий подход:сосредоточиться на сообщении , а не на ресурсе.Подход состоял в том, чтобы вычислить контрольную сумму MD5 сообщения (или, по крайней мере, критических частей, так как это повлияет на экземпляр MyResource ... включая идентификатор ресурса).Вы сохраните сообщение в документе mongoDB, возможно, все сообщение как один атрибут и контрольную сумму MD5 в другом.Когда работник получает сообщение, он вычисляет контрольную сумму для сообщения, проверяет, было ли сообщение уже получено, и обрабатывает сообщение (сохраняет в mongoDB, выполняет действие на экземпляре MyResource), если не существует документа с этой контрольной суммой..

Одним из преимуществ этого подхода является то, что вы можете «воспроизводить» сообщения в будущем, если по какой-то причине выпадет фрагмент «выполнить действие на MyResource».Возможно, вы захотите поставить временную метку doc при получении , чтобы гарантировать порядок воспроизведения (поскольку производство асинхронно ... и вам может потребоваться поддержка нескольких производителей ... время получения должно быть королем).

0 голосов
/ 20 апреля 2019

позвольте мне поделиться еще одним сумасшедшим решением.если вы можете назначить уникальные простые числа для каждого сообщения Ресурса, то вы можете идентифицировать дубликаты.в подобной ситуации вам приходится выбирать между пространством и временем.

messages for MyResource 1 => message 2 | message 3 | message 5 | message 7  
messages for MyResource 2 => message 2 | message 3 | message 5 | message 7  

после каждого процесса вы сохраняете умножение простого номера текущего сообщения и предыдущего вычисления.

MyResource 1 | 2  (processed 2 only)  
MyResource 2 | 70 (processed 2*5*7)

всякий раз, когда вы получили сообщение, проверьте, может ли существующее значение делиться на идентификатор сообщения.

70 % 5 == 0 true (already processed)
70 % 3 == 0 false (not processed)

в вашем варианте номер два, вы беспокоитесь о пространстве (ограничение MongoDB 15 Мб, задержка вставки / поиска), для этого вам следует рассмотреть экономичные по объему структуры данных, такие как bloom filter.однако это вероятностная структура данных, что означает, что ложноположительные совпадения возможны, а ложноотрицательные - нет. Redis имеет хорошую реализацию вы можете попробовать.

 127.0.0.1:6379> BF.ADD resource1 msg1
 (integer) 1
 127.0.0.1:6379> BF.EXISTS resource1 msg1
 (integer) 1
 127.0.0.1:6379> BF.EXISTS resource1 msg2
 (integer) 0
0 голосов
/ 19 апреля 2019

Я думаю, что вы находитесь на правильном пути со вторым решением, однако производительность может быть выше, если вы будете хранить каждый добавленный_id как собственный ключ-значение вместо массива.

Логика довольно проста: каждый раз, когда вы получаете данные из очереди, ищите в своем кэше, есть ли запись для этого идентификатора сообщения. Если есть запись, не накапливайте эту информацию. В противном случае накапливайте ввод и сохраняйте ключ в кеше.

Как вы упомянули, у этого подхода есть проблемы с масштабируемостью, потому что кэш будет расти бесконечно. Чтобы обойти эту проблему, вы можете использовать кэш с функциями истечения срока действия и выселения. Самый простой способ сделать это - явно установить «expires at» каждого ключа, который вы пишете. Это поддерживается Mongo, Memcached и Redis.

Проблема в том, что даже если вы установите «expires at» в каждой точке, при достаточной нагрузке ваш кэш все равно будет нехватать памяти. Так что вам нужен запасной вариант - что-то делать, когда в кеше недостаточно памяти. Для этого вы можете использовать кеш с функцией «автоматического выселения», что означает, что он имеет алгоритм для удаления вещей, когда это необходимо.

Похоже, что Mongo не поддерживает ничего подобного (это база данных с функциями кэша, а не с правильным кэшем). Memcache использует алгоритм LRU (см. https://github.com/memcached/memcached/wiki/UserInternals#when-are-items-evicted). Redis имеет несколько алгоритмов, из которых вы можете выбрать (см. https://redis.io/topics/lru-cache).

Еще одна вещь, которую я хотел бы иметь в виду, это то, что выполнение всего этого процесса в распределенном или многопоточном приложении вводит условия гонки. Скажем, у вас есть 20 рабочих машин, которые по какой-то причине все получают одно и то же сообщение практически в одно и то же время. Каждый из них проверит кеш на предмет записи и ничего не найдет, поэтому ни один из них не помечен как дубликат.

Чтобы обойти эту проблему, вы можете использовать мьютексы / семафоры для нескольких потоков, работающих на одном компьютере (вертикальное масштабирование) или «распределенную блокировку», если у вас есть несколько машин полностью (горизонтальное масштабирование). См https://redis.io/topics/distlock

1020 * редактировать *

Я получил совет, что Монго может сделать авто-выселение с Capped Collections . Он поддерживает только выселение FIFO (сначала истекает самый старый срок действия данных), которое в любом случае может работать для ваших нужд.

...