Вызов RequestError при отправке данных в Elasticsearch с использованием Kafka и Python - PullRequest
0 голосов
/ 12 декабря 2018

Моя тема называется mytopic.Мой потребитель может производить данные JSON из CSV.Мой CSV ниже

owner,Rank,Model,mileage,priceincrore,torque
maws,1,Audi,20,2,7000
drav,2,Benz,23,3,8000
ere,3,Ford,12,1,5400

Мой код Python ниже

from kafka import KafkaConsumer
import json
from elasticsearch import Elasticsearch
from datetime import datetime
import datetime

es = Elasticsearch([{'host': 'localhost', 'port': 9200}]) 
consumer = KafkaConsumer(
   'mytopic',
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='latest',
    enable_auto_commit=True)


for val in kafkaconsumer:
    current_values = loads(val.value)
    es.index(index='esindex', doc_type='my_type',id=1, body=current_values)

logger.warning ('Необъяснимый необработанный ответ от сервера:% s', ошибка) повысить HTTP_EXCEPTIONS.get (status_code, TransportError) (status_code, error_message, Additional_info) RequestError: RequestError (400, 'mapper_parsing_exception', 'не удалось проанализировать')

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...