Есть ли способ обернуть asyncio Writer в FileWriter для передачи асинхронного устройства записи в существующий API? - PullRequest
0 голосов
/ 09 июля 2019

В моем приложении я хотел бы иметь асинхронный выходной канал для записи данных, который фактически записывается существующим API (https://github.com/ynqa/pandavro). К сожалению, интерфейс устройства записи asyncio не соответствует интерфейсуклассический файлоподобный писатель.

Есть ли способ, как в Java, каким-то образом обернуть канал asyncio в классический интерфейс FileWriter? (без написания оболочки моим собственным ...)

Это код, который я хотел бы запустить:

import numpy as np
import pandas as pd
import pandavro as pdx
import asyncio


async def main():
    df = pd.DataFrame({
        "Boolean": [True, False, True, False],
        "Float64": np.random.randn(4),
        "Int64": np.random.randint(0, 10, 4),
        "String": ['foo', 'bar', 'foo', 'bar'],
        "DateTime64": [pd.Timestamp('20190101'), pd.Timestamp('20190102'),
                       pd.Timestamp('20190103'), pd.Timestamp('20190104')]})

    reader, writer = await asyncio.open_connection('127.0.0.1', 9090)

    pdx.to_avro(writer, df)

    writer.close()
    await writer.wait_closed()

if __name__ == '__main__':
    asyncio.run(main())

1 Ответ

0 голосов
/ 09 июля 2019

Единственное решение, которое я нашел сам, - написать простой класс-обертку:

class ChannelWriter:

    __delegate = None

    def __init__(self, delegate: asyncio.streams.StreamWriter):
        self.__delegate = delegate

    async def close(self):
        await self.__delegate.drain()
        return self.__delegate.close()

    @staticmethod
    def seekable():
        return False

    def write(self, value):
        self.__delegate.write(value)

    def flush(self):
        pass
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...