Извлечь конкретную запись из Kafka в Python, используя KafkaConsumer, отфильтровав транзакции с помощью sessionID - PullRequest
0 голосов
/ 22 октября 2018

Контекст:

У нас есть требование, когда мне нужно захватить конкретную транзакцию с использованием библиотеки KafkaConsumer. Я использую уникальный сеанс в качестве ключа, который я получаю из другого инструмента (давайте вызовемв качестве инструмента Feeder ) для захвата транзакции.

Я запускаю свой код сразу после получения сеанса от Feeder.

Проблема

Я могу получить несколько записей из Kafka, но не вижу записи, которую пытаюсь отфильтровать с помощью сеанса.

Код

from kafka import KafkaConsumer
import json
SESSION = 'sessionID'
def consumeRecords(topic,group,bootstrapserver,auto_offset_reset,mySession,auto_commit_boolean):
    consumer = KafkaConsumer(topic,group_id=group,bootstrap_servers=bootstrapserver,auto_offset_reset=auto_offset_reset,enable_auto_commit=auto_commit_boolean)
    consumer.topics()
    consumer.seek_to_beginning()
    try:
        while True:
            print("CALLING POLL")
            records = consumer.poll(timeout_ms=1000)
            print("RETURNED FROM POLL")
            if records:
                for consumedRecord in records.values():
                    for msg in consumedRecord:
                        json_data = json.loads(msg.value.decode('utf-8'))
                        #print(json_data)
                        if 'alias' in json_data.keys() and json_data['alias']=='myServer':

                            current_session = json_data[SESSION]
                            print("SESSION is :" , current_session)
                            if mySession == current_session :
                                print('My record is ', json_data)
    except Exception as e:
        print("Unable to find any related sessions")
        print(e)

if __name__ == '__main__':
    KAFKA_TOPIC = 'e-commerce.request'
    KAFKA_GROUP = 'test'
    KAFKA_BROKERS = ['ABC.net:9092', 'DEF:9092']
    auto_commit = False
    consumeRecords(KAFKA_TOPIC,KAFKA_GROUP,KAFKA_BROKERS,'earliest','38l87jondkvnefNW886QMTWVcN6S4my5Y-No167ZzqF',auto_commit)

Я должен распечатать следующие данные json, полученные из Kafka, но мой код не извлекает эту запись и, следовательно, ничего не печатает и работаетна бесконечное время

{'Type': 'request', 'requestID': '2018100819564659-5', 'payload': {'timing': {'startTime': '20181008195624322', 'total': '0.063', 'totalActions': '0', 'jsp': '0.063'}, 'user': {'orgID': '', 'userID': '', 'newComer': 'FALSE', 'helpdeskUserID': '', 'helpdeskUserOrgID': '', 'travelerID': ''}, 'version': '1.0.0', 'client': {'referrer': '', 'ip': ''}, 'url': {'parameters': {'JSESSIONID': '38l87jondkvnefNW886QMTWVcN6S4my5Y-No167ZzqF!430553578!-1652153437'}, 'baseUrl': 'http://server_url', 'path': 'DUMMY', 'method': 'POST'}, 'actions': [{'cumulTiming': '0', 'name': 'OverrideServlet', 'isChained': 'FALSE', 'features': '[GlobalTimeSpent = 0][PRE_RULES = 0][POST_RULES = 0]', 'chainParent': ''}], 'context': {'sessionSize': '12|12', 'fatalError': 'FALSE', 'requestType': 'XML', 'error': [], 'requestedSessionID': '', 'templateName': ''}}, 'Hostname': 'DummyAgain', 'sessionID': '38l87jondkvnefNW886QMTWVcN6S4my5Y-No167ZzqF', 'Site': 'ABCDEFGH', 'ClientId': 1234551515439, 'Epoch': 1539028584353, 'IP': 'A.B.C.D', 'alias': 'myServer', 'SeqNb': 21845, 'Source': 'eCommerce'}
...