Я использую тайтовый l oop просто делаю XRange и сохраняю позицию - KISS .. но если нет работы, он отступает, так что это довольно быстро, когда много происходит, его тайтовое l oop.
Если вам нужна более высокая производительность, например, чтение во время обработки, я бы предостерегал от этого в большинстве случаев.
- Это создает большую сложность, и это должно быть круто solid.
- Redis обычно достаточно быстр
- «Я не хочу дважды обработать одно и то же сообщение ". почти каждая система имеет хотя бы одну поставку, поэтому устранение сбоев в работе невероятно сложно / медленно. Вы можете частично удалить его, используя хэш-набор идентификаторов, но для потребителей довольно тривиально иметь дело с ним и сообщениями, предназначенными для идемпотентности. Вероятно, это root причина проблем с дизайном сообщений. Если вы разделите каждый считыватель (отдельный поток и 1 рабочий на поток), вы можете сохранить хэш-набор в памяти, избегая проблем с масштабированием / распределением. Обратите внимание, что поток Redis может сохранять порядок, используйте это для упрощения идемпотентных сообщений.
- Исключения, вы не хотите останавливать обработку потока, потому что у потребителя есть исключение logi c в 1 сообщении, например, при звонке ночью вся система остановилась, блокировки усугубляют ситуацию. Данные о событии не могут быть изменены, это произошло, поэтому постарайтесь сделать все возможное. Тем не менее, исключения в инфракрасном / редком режиме все же необходимо бросить и повторить. Управлять этим за пределами al oop очень болезненно.
- Простое противодавление. Если вы не можете обрабатывать работу достаточно быстро, l oop замедляется вместо того, чтобы создавать множество задач и взрывать всю вашу память.
Я больше не использую распределенные блокировки / семафоры.
Если вы имеете дело с командами, например, dosomething вместо xyz, они могут потерпеть неудачу. Опять же, потребитель должен иметь дело со случаем, когда это уже произошло, а не с частью чтения redis / stream.
Некоторые библиотеки с magi c обратными вызовами не решают эти проблемы, обратные вызовы будут иметь повторную попытку по истечении времени ожидания на любом узле и c. Сложность / проблемы все еще существуют, они просто перемещаются в другое место.
У вас может быть наблюдаемое сверху для потребителей, но это в основном cosmeti. вы увидите тот же l oop. Я бы не использовал это вместо того, чтобы заставить потребителя зарегистрировать действие.
например,
public interface IStreamSubscriber
{
void RegisterEventCallBack(Func<object, IReadOnlyDictionary<string, string>, Task> callback);
void RegisterBatchEventCallBack(Func<IEnumerable<(object msg, IReadOnlyDictionary<string, string> metaData)>, Task> batchCallback);
void Start();
}
В вашем случае обратный вызов может иметь наблюдаемое и не использовать l oop, но там - это низкий уровень l oop внизу, который также может выполнять преобразование сообщения в объект для потребителя.