Код Python застрял при получении клиента Kafka Broker - PullRequest
0 голосов
/ 14 декабря 2018

У меня есть фрагмент кода, который отправляет некоторые сообщения клиенту Kafka, но много раз код застревает при получении брокера.Иногда перезапуск программы работает или программа застревает, и это бесконечный цикл.

provider = KafkaProducer (bootstrap_servers = ['host1: port', 'host2: port', 'host3: port',' host4: port ',' host5: port '])

def KafkaProducerFromCSV(csv_file, topic):
    csv_file_reader = csv.reader(open(csv_file, 'rU'), quotechar='"',delimiter=',')
    data = {'schemaVersion': None, 'data': None, 'entityId': None, 'entityVersion': None, 'eventId': None,'eventTime': None, 'ingestedAt': None, 'parentId': None, 'parentVersion': None, 'updatedAt': None,'encodingType': None, 'payment_advice_created_at_date_key': None, 'payment_advice_id': None,'payment_advice_status_key': None}
    count = 0
    for i in range(1):
        next(csv_file_reader, 'END OF FILE')
        count = count + 1
    print("Yaha aaya")
    producer = KafkaProducer(bootstrap_servers=['host1:port','host2:port','host3:port','host4:port','host5:port'])#this is where the problem occur.!
    print(producer)
    try:
        for split_msg in csv_file_reader:
            print(split_msg)
            if split_msg == ['END OF FILE']:
                print("End of File encountered, ingestion completed for this slot.!")
                producer.close()
                csv_file_reader.close()
                break
            else:
                data['entityId'] = split_msg[3]
                data['entityVersion'] = split_msg[1]
                data['eventId'] = split_msg[2]
                data['eventTime'] = split_msg[4]
                data['ingestedAt'] = split_msg[5]
                data['parentId'] = split_msg[6]
                data['parentVersion'] = split_msg[7]
                data['updatedAt'] = split_msg[8]
                data['encodingType'] = split_msg[9]
                data['schemaVersion'] = split_msg[10]
                data['payment_advice_created_at_date_key'] = split_msg[11]
                data['payment_advice_id'] = split_msg[12]
                data['payment_advice_status_key'] = split_msg[13]
                data['data'] = split_msg[0].decode('string_escape')
                stringified_data = str(data['data'])
                json_data = json.loads(stringified_data[1:len(stringified_data) - 1])
                data['data'] = json_data
                producer.send(topic, json.dumps(data))
                count = count + 1
                if count % 1000 == 0:
                    producer.flush()
                    print "Successfully send " + str(count) + " messages"
                    continue
    except:
        pass

Что я могу сделать, есть ли какой-нибудь механизм повтора, который я могу включить для получения клиента брокера .?

...