Как использовать групповые и оконные потоки в потоковой передаче faust kafka? - PullRequest
0 голосов
/ 08 апреля 2019

Мне нужно создать приложение для робастных сборок, которое должно обрабатывать группировку с последующей обработкой окон.

Теперь я использую следующий для окон,

class Sample(faust.Record):
    master_mac: str 
    uuid: str
    slave_mac: str
    rawData: str
    rssi: int

app = faust.App('sample_app', broker='kafka://localhost:9092')
my_topic = app.topic('out', key_type=str, value_type=Sample)

@app.agent(my_topic)
async def process(samples):
    async for sample in samples.take(5000, within=5):
        print("sample :: ", sample)

if __name__ == '__main__':
    worker = Worker(app, loglevel="INFO")
    worker.execute_from_commandline()

Принимая во внимание, что мне нужно сделать концепцию группировки в дополнение к оконному управлению,

class Sample(faust.Record):
    master_mac: str 
    uuid: str
    slave_mac: str
    rawData: str
    rssi: int

app = faust.App('sample_app', broker='kafka://localhost:9092')
my_topic = app.topic('out', key_type=str, value_type=Sample)

@app.agent(my_topic)
async def process(samples):
    async for sample in samples.group_by(Sample.master_mac and Sample.slave_mac):
        print("sample :: ", sample)

if __name__ == '__main__':
    worker = Worker(app, loglevel="INFO")
    worker.execute_from_commandline()

Может ли кто-нибудь помочь мне в этом, чтобы добиться как объединения окон, так и группировки?

...