Я разрабатываю инструмент для мониторинга BS kafka. Программа будет прослушивать тему кафки и постоянно выводить новое сообщение из этой темы. Итак, каков наилучший способ постоянно отправлять эти сообщения на сторону браузера?
Программа использует колбу, поэтому в настоящее время я использую stream_with_context для отправки нового сообщения на сторону браузера. Пока это работает, но мне интересно, если это правильный сценарий для использования stream_with_context, так как большинство случаев использования для загрузки и потокового видео? или, может быть, я должен использовать websocket?
@read_controller.route('/v1/listenkafka/<string:kafkaId>', methods=['GET'])
def start_stream(kafkaId):
try:
mykafka_json = eval(my_storage.get(kafkaId))
mykafka = kafkaserver(ip=mykafka_json['ip'], id=kafkaId, port=mykafka_json['port'])
return Response(stream_with_context(mykafka.consume_topic(mykafka_json['topic'])))
except Exception as e:
print(f"{e}")
return jsonify(f"{e}"), 400
#The generator listen to kafka and feed to stream
def consume_topic(self, topic, groupid='test-consumer-group'):
consumer = KafkaConsumer(topic,
group_id=groupid,
bootstrap_servers=[f"{self.ip}:{self.port}"])
print(f"Topic: {topic}@{self.ip}:{self.port} starts steaming at {datetime.now()}")
try:
for messages in consumer:
mykafka_json = eval(my_storage.get(self.id))
print(mykafka_json)
if mykafka_json['flag']:
my_storage.delete(self.id)
return
else:
message = {'topic':messages.topic,
'partition':messages.partition,
'offset':messages.offset,
'key':messages.key,
'value':messages.value}
print (message['value'])
yield message['value']
except StopIteration as e:
#TODO:: handle return
print(e)
finally:
print(f"Topic-{topic} finish at {datetime.now()}")
Итак, я должен использовать stream_with_context в этом сценарии или я должен перейти на использование websockt?
Спасибо