Нужен ли мне объект RxPy для обработки данных? - PullRequest
0 голосов
/ 18 июня 2019

Я новичок в использовании rx, и из того, что я прочитал, следует избегать использования Subject s.Но я не уверен, как избежать использования темы здесь.

У меня есть поток байтов, представляющих сообщения.Каждый пакет сообщения форматируется как префикс длиной 2 байта + сообщение или:

<2 byte length>< message with length from prefix >

Мое решение:

def get_first_message(obs):
    '''Get the first message from the stream'''
    return obs.take(2)\
        .buffer_with_count(2)\
        .map(lambda l_arr: int(''.join(l_arr)))\
        .first_or_default()\
        .filter(lambda x: x is not None)\
        .flat_map(lambda x: obs.take(x).buffer_with_count(x).first())


def get_messages(obs):
    '''Parse stream into messages'''
    window_openings = Subject()

    return obs.window(window_openings=window_openings)\
        .flat_map(lambda w: get_first_message(w))\
        .tap(on_next=lambda x: window_openings.on_next(x))
...