Как использовать параллелизм в faust? - PullRequest
0 голосов
/ 04 апреля 2019

Я работаю с faust и хотел бы использовать функцию параллелизма.Приведенный пример не совсем демонстрирует использование параллелизма.

Что я хотел бы сделать, так это прочитать от производителя kafka и unnest json.Затем посылки отправляются в процесс для расчета счетов и т. Д. Я должен отправить 10 посылок за раз в функцию, которая выполняет расчеты.Для этого я использую параллелизм, чтобы одновременно рассчитывать 10 отправлений.

import faust
import time
import json
from typing import List
import asyncio

class Items(faust.Record):
    name: str
    billing_unit: str
    billing_qty: int


class Shipments(faust.Record, serializer="json"):
    shipments: List[Items]
    ship_type: str
    shipping_service: str
    shipped_at: str


app = faust.App('ships_app', broker='kafka://localhost:9092', )
ship_topic = app.topic('test_shipments', value_type=Shipments)


@app.agent(value_type=str, concurrency=10)
async def mytask(records):
# task that does some other activity
    async for record in records:
        print(f'received....{record}')
        time.sleep(5)


@app.agent(ship_topic)
async def process_shipments(shipments):
    # async for ships in stream.take(100, within=10):
    async for ships in shipments:
        data = ships.items
        uid = faust.uuid()
        for item in data:
            item_uuid = faust.uuid()
            print(f'{uid}, {item_uuid}, {ships.ship_type}, {ships.shipping_service}, {ships.shipped_at}, {item.name}, {item.billing_unit}, {item.billing_qty}')
            await mytask.send(value=("{} -- {}".format(uid, item_uuid)))

            # time.sleep(2)
        # time.sleep(10)


if __name__ == '__main__':
    app.main()

...