Итак, у меня есть микросервис, настроенный на 2 веб-сервиса, которые общаются через очередь kafka.
Первый веб-сервис выполняет вставку строки и обновляет недавно обновленную строку. И как только обновление выполнено в строке SQL, первые микросервисы публикуют сообщение с некоторой обработанной информацией в очередь kafka, и второй процесс прослушивает эту очередь kafka.
Теперь, как только сообщение будет опубликовано, пока выполняется другая информация. То есть 1 запрос представляет пакет входных файлов для обработки. Если первый запрос получил 5 входных файлов, то делает 5 записей в дБ и выдает 5 сообщений, которые нужно отправить в очередь kafka.
И пакет обрабатывается в течение l oop.
Сейчас! Через несколько секунд (почти 1 секунда) второй веб-сервис опрашивает сообщение, которое первым процессом помещается в очередь. И второй веб-сервис проверяет, присутствует ли запись в SQL DB или нет для соответствующего сообщения (запрашивает некоторый uuid, присутствующий в сообщении, которое также присутствует в DB)
Теперь, когда 2-й сервис пытается прочитать строка, вставленная первым веб-сервисом, не может этого сделать. Это приводит к пустому набору результатов, но запрос существует в БД.
Это происходит только тогда, когда первый пакет обрабатывается большим пакетом и происходит при нагрузочном тестировании.
В первом и втором веб-сервисе я использовал один экземплярный SQL объект соединения, о котором я упоминал в setting.py о Django для первого веб-сервиса.
Второй веб-сервис - это простой скрипт опроса, который только один раз создавал SQL объект соединения и использовал его почти бесконечно, пока l oop опроса.
Я пытался добавить сон перед тем, как вытащить сообщения, но все еще не делал заклинание.
Может кто-нибудь помочь мне в этом?
Язык: обе услуги написаны на python. Где первый микро сервис - это Django веб сервис, а второй микро сервис - просто скрипт опроса.
Вот пример кода для первого микросервиса:
requested_batch_list = request_serializer.data
for batch_input_file in requested_batch_list:
do_some_entry_on_request_tracking_table()
do_some_processing()
update_request_tracking_table()
publish_message_to_kafka_topic()
пример псевдокода для микросервиса 2
while True:
message = self.pull_message_from_kafka(1)
request_batch_id = message['request_id']
perfrom_select_query(request_batch_id)
do_something()
done_processing()
Журналы времени на первом микросервисе: update_request_tracking_table () --- > 13-04-2020 18: 08: 59,15 publish_message_to_kafka_topi c () ---> 13-04-2020 18: 08: 59,208
Журналы времени для второго микросервиса: pull_message_from_kafka (1) - -> 13-04-2020 18: 09: 25,863 perfrom_select_query () ---> 13-04-2020 18: 09: 25,864
А также, насколько возможно db.commit () поможет? Я еще не использую транзакции.
что могло произойти с этой настройкой?