Моя тема называется mytopic.Мой потребитель может производить данные JSON из CSV.Мой CSV ниже
owner,Rank,Model,mileage,priceincrore,torque
maws,1,Audi,20,2,7000
drav,2,Benz,23,3,8000
ere,3,Ford,12,1,5400
Мой код Python ниже
from kafka import KafkaConsumer
import json
from elasticsearch import Elasticsearch
from datetime import datetime
import datetime
es = Elasticsearch([{'host': 'localhost', 'port': 9200}])
consumer = KafkaConsumer(
'mytopic',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='latest',
enable_auto_commit=True)
for val in kafkaconsumer:
current_values = loads(val.value)
es.index(index='esindex', doc_type='my_type',id=1, body=current_values)
logger.warning ('Необъяснимый необработанный ответ от сервера:% s', ошибка) повысить HTTP_EXCEPTIONS.get (status_code, TransportError) (status_code, error_message, Additional_info) RequestError: RequestError (400, 'mapper_parsing_exception', 'не удалось проанализировать')