Контекст:
У нас есть требование, когда мне нужно захватить конкретную транзакцию с использованием библиотеки KafkaConsumer. Я использую уникальный сеанс в качестве ключа, который я получаю из другого инструмента (давайте вызовемв качестве инструмента Feeder ) для захвата транзакции.
Я запускаю свой код сразу после получения сеанса от Feeder.
Проблема
Я могу получить несколько записей из Kafka, но не вижу записи, которую пытаюсь отфильтровать с помощью сеанса.
Код
from kafka import KafkaConsumer
import json
SESSION = 'sessionID'
def consumeRecords(topic,group,bootstrapserver,auto_offset_reset,mySession,auto_commit_boolean):
consumer = KafkaConsumer(topic,group_id=group,bootstrap_servers=bootstrapserver,auto_offset_reset=auto_offset_reset,enable_auto_commit=auto_commit_boolean)
consumer.topics()
consumer.seek_to_beginning()
try:
while True:
print("CALLING POLL")
records = consumer.poll(timeout_ms=1000)
print("RETURNED FROM POLL")
if records:
for consumedRecord in records.values():
for msg in consumedRecord:
json_data = json.loads(msg.value.decode('utf-8'))
#print(json_data)
if 'alias' in json_data.keys() and json_data['alias']=='myServer':
current_session = json_data[SESSION]
print("SESSION is :" , current_session)
if mySession == current_session :
print('My record is ', json_data)
except Exception as e:
print("Unable to find any related sessions")
print(e)
if __name__ == '__main__':
KAFKA_TOPIC = 'e-commerce.request'
KAFKA_GROUP = 'test'
KAFKA_BROKERS = ['ABC.net:9092', 'DEF:9092']
auto_commit = False
consumeRecords(KAFKA_TOPIC,KAFKA_GROUP,KAFKA_BROKERS,'earliest','38l87jondkvnefNW886QMTWVcN6S4my5Y-No167ZzqF',auto_commit)
Я должен распечатать следующие данные json, полученные из Kafka, но мой код не извлекает эту запись и, следовательно, ничего не печатает и работаетна бесконечное время
{'Type': 'request', 'requestID': '2018100819564659-5', 'payload': {'timing': {'startTime': '20181008195624322', 'total': '0.063', 'totalActions': '0', 'jsp': '0.063'}, 'user': {'orgID': '', 'userID': '', 'newComer': 'FALSE', 'helpdeskUserID': '', 'helpdeskUserOrgID': '', 'travelerID': ''}, 'version': '1.0.0', 'client': {'referrer': '', 'ip': ''}, 'url': {'parameters': {'JSESSIONID': '38l87jondkvnefNW886QMTWVcN6S4my5Y-No167ZzqF!430553578!-1652153437'}, 'baseUrl': 'http://server_url', 'path': 'DUMMY', 'method': 'POST'}, 'actions': [{'cumulTiming': '0', 'name': 'OverrideServlet', 'isChained': 'FALSE', 'features': '[GlobalTimeSpent = 0][PRE_RULES = 0][POST_RULES = 0]', 'chainParent': ''}], 'context': {'sessionSize': '12|12', 'fatalError': 'FALSE', 'requestType': 'XML', 'error': [], 'requestedSessionID': '', 'templateName': ''}}, 'Hostname': 'DummyAgain', 'sessionID': '38l87jondkvnefNW886QMTWVcN6S4my5Y-No167ZzqF', 'Site': 'ABCDEFGH', 'ClientId': 1234551515439, 'Epoch': 1539028584353, 'IP': 'A.B.C.D', 'alias': 'myServer', 'SeqNb': 21845, 'Source': 'eCommerce'}