Два агента с разными фильтрами на одной кафке топи c. Подтверждение в Faust Stream - PullRequest
0 голосов
/ 13 февраля 2020

Я хочу, чтобы два агента faust слушали одну и ту же kafka topi c, но каждый агент использует свой собственный фильтр перед обработкой событий, и их наборы событий не пересекаются.

В документации мы есть пример: https://faust.readthedocs.io/en/latest/userguide/streams.html#id4

Если два агента используют потоки, подписанные на одну и ту же топи c:

 topic = app.topic('orders')

 @app.agent(topic)
 async def processA(stream):
      async for value in stream:
          print(f'A: {value}')

 @app.agent(topic)
  async def processB(stream):
       async for value in stream:
           print(f'B: {value}')

Проводник будет пересылать каждый сообщение, полученное по «заказам» topi c обоим агентам, увеличивая счетчик ссылок при каждом поступлении в поток агентов.

счетчик ссылок уменьшается, когда событие подтверждается, и когда оно достигает нуля, потребитель сочтет это смещение «готовым» и может его зафиксировать.

и ниже для фильтров https://faust.readthedocs.io/en/latest/userguide/streams.html#id13:

@app.agent() async def process(stream):
    async for value in stream.filter(lambda: v > 1000).group_by(...):
        ...

Я использую какой-то сложный фильтр, но в результате делю поток на две части для двух агентов с совершенно разными логиками c. (Я не использую group_by)

Если два агента работают вместе, все в порядке. Но если я остановлю их и перезапущу, каждый будет обрабатывать поток с самого начала. Потому что каждое событие не было признано одним из агентов. Если я подтверждаю все события в каждом агенте, когда один из агентов не будет запущен, второй очистит topi c. (Если один из них раздавлен и перезапущен, проводник увидит трех подписчиков, ожидающих ответа от раздавленного агента в течение 20 минут.)

Я просто хочу разделить события на две части. Как я могу сделать соответствующую синхронизацию в этом случае?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...