Я хочу, чтобы два агента faust слушали одну и ту же kafka topi c, но каждый агент использует свой собственный фильтр перед обработкой событий, и их наборы событий не пересекаются.
В документации мы есть пример: https://faust.readthedocs.io/en/latest/userguide/streams.html#id4
Если два агента используют потоки, подписанные на одну и ту же топи c:
topic = app.topic('orders')
@app.agent(topic)
async def processA(stream):
async for value in stream:
print(f'A: {value}')
@app.agent(topic)
async def processB(stream):
async for value in stream:
print(f'B: {value}')
Проводник будет пересылать каждый сообщение, полученное по «заказам» topi c обоим агентам, увеличивая счетчик ссылок при каждом поступлении в поток агентов.
счетчик ссылок уменьшается, когда событие подтверждается, и когда оно достигает нуля, потребитель сочтет это смещение «готовым» и может его зафиксировать.
и ниже для фильтров https://faust.readthedocs.io/en/latest/userguide/streams.html#id13:
@app.agent() async def process(stream):
async for value in stream.filter(lambda: v > 1000).group_by(...):
...
Я использую какой-то сложный фильтр, но в результате делю поток на две части для двух агентов с совершенно разными логиками c. (Я не использую group_by)
Если два агента работают вместе, все в порядке. Но если я остановлю их и перезапущу, каждый будет обрабатывать поток с самого начала. Потому что каждое событие не было признано одним из агентов. Если я подтверждаю все события в каждом агенте, когда один из агентов не будет запущен, второй очистит topi c. (Если один из них раздавлен и перезапущен, проводник увидит трех подписчиков, ожидающих ответа от раздавленного агента в течение 20 минут.)
Я просто хочу разделить события на две части. Как я могу сделать соответствующую синхронизацию в этом случае?