Я новичок в использовании rx, и из того, что я прочитал, следует избегать использования Subject
s.Но я не уверен, как избежать использования темы здесь.
У меня есть поток байтов, представляющих сообщения.Каждый пакет сообщения форматируется как префикс длиной 2 байта + сообщение или:
<2 byte length>< message with length from prefix >
Мое решение:
def get_first_message(obs):
'''Get the first message from the stream'''
return obs.take(2)\
.buffer_with_count(2)\
.map(lambda l_arr: int(''.join(l_arr)))\
.first_or_default()\
.filter(lambda x: x is not None)\
.flat_map(lambda x: obs.take(x).buffer_with_count(x).first())
def get_messages(obs):
'''Parse stream into messages'''
window_openings = Subject()
return obs.window(window_openings=window_openings)\
.flat_map(lambda w: get_first_message(w))\
.tap(on_next=lambda x: window_openings.on_next(x))