Я пытаюсь отправить сообщения от подписчика pub / sub на индекс Elasticsearch в очереди (например, вставка строки в SQL).
Мое сопоставление индекса Elasticsearch указано ниже:
"properties" : {
"field1" : {
"type" : "float"
},
"field2" : {
"type" : "keyword"
},
"field3" : {
"type" : "keyword"
},
"field4" : {
"type" : "keyword"
},
"field5" : {
"type" : "date"
}
Я использую приведенный ниже код для отправки сообщений из pub / sub в Elasticsearch:
def collect_data(data):
es = Elasticsearch( [host], port = port)
data = data.decode('utf-8')
data = data.replace("\'", "\"")
twraw = json.loads(data)
response = requests.post('http://{}:{}/my_index/_doc/'.format(host,port), data=twraw, headers={"Content-Type": "application/json"})
print(response.text)
def receive_data(project, subscription_name):
subscriber = pubsub.SubscriberClient()
subscription_path = subscriber.subscription_path(project, subscription_name)
def callback(message):
print('Received message: {}'.format(message))
collect_data(message.data)
message.ack()
subscription = subscriber.subscribe(subscription_path, callback=callback)
print('Listening for messages on {}'.format(subscription_path))
future = subscriber.subscribe(subscription_path, callback=callback)
try:
future.result(timeout=10)
except Exception as e:
print(
'Listening for messages on {} threw an Exception: {}'.format(
subscription_name, e))
raise
while True:
time.sleep(60)
Я получаю сообщение об ошибке ниже для каждого сообщения, которое не могу решить.
{
"error": {
"root_cause": [
{
"type": "mapper_parsing_exception",
"reason": "failed to parse"
}
],
"type": "mapper_parsing_exception",
"reason": "failed to parse",
"caused_by": {
"type": "not_x_content_exception",
"reason": "Compressor detection can only be called on some xcontent bytes or compressed xcontent bytes"
}
},
"status": 400
}