У меня есть фрагмент кода, который отправляет некоторые сообщения клиенту Kafka, но много раз код застревает при получении брокера.Иногда перезапуск программы работает или программа застревает, и это бесконечный цикл.
provider = KafkaProducer (bootstrap_servers = ['host1: port', 'host2: port', 'host3: port',' host4: port ',' host5: port '])
def KafkaProducerFromCSV(csv_file, topic):
csv_file_reader = csv.reader(open(csv_file, 'rU'), quotechar='"',delimiter=',')
data = {'schemaVersion': None, 'data': None, 'entityId': None, 'entityVersion': None, 'eventId': None,'eventTime': None, 'ingestedAt': None, 'parentId': None, 'parentVersion': None, 'updatedAt': None,'encodingType': None, 'payment_advice_created_at_date_key': None, 'payment_advice_id': None,'payment_advice_status_key': None}
count = 0
for i in range(1):
next(csv_file_reader, 'END OF FILE')
count = count + 1
print("Yaha aaya")
producer = KafkaProducer(bootstrap_servers=['host1:port','host2:port','host3:port','host4:port','host5:port'])#this is where the problem occur.!
print(producer)
try:
for split_msg in csv_file_reader:
print(split_msg)
if split_msg == ['END OF FILE']:
print("End of File encountered, ingestion completed for this slot.!")
producer.close()
csv_file_reader.close()
break
else:
data['entityId'] = split_msg[3]
data['entityVersion'] = split_msg[1]
data['eventId'] = split_msg[2]
data['eventTime'] = split_msg[4]
data['ingestedAt'] = split_msg[5]
data['parentId'] = split_msg[6]
data['parentVersion'] = split_msg[7]
data['updatedAt'] = split_msg[8]
data['encodingType'] = split_msg[9]
data['schemaVersion'] = split_msg[10]
data['payment_advice_created_at_date_key'] = split_msg[11]
data['payment_advice_id'] = split_msg[12]
data['payment_advice_status_key'] = split_msg[13]
data['data'] = split_msg[0].decode('string_escape')
stringified_data = str(data['data'])
json_data = json.loads(stringified_data[1:len(stringified_data) - 1])
data['data'] = json_data
producer.send(topic, json.dumps(data))
count = count + 1
if count % 1000 == 0:
producer.flush()
print "Successfully send " + str(count) + " messages"
continue
except:
pass
Что я могу сделать, есть ли какой-нибудь механизм повтора, который я могу включить для получения клиента брокера .?