Отправка сообщения из pub / sub в Elasticsearch - PullRequest
0 голосов
/ 27 марта 2020

Я пытаюсь отправить сообщения от подписчика pub / sub на индекс Elasticsearch в очереди (например, вставка строки в SQL).

Мое сопоставление индекса Elasticsearch указано ниже:

"properties" : {
    "field1" : {
      "type" : "float"
    },
    "field2" : {
      "type" : "keyword"
    },
    "field3" : {
      "type" : "keyword"
    },
    "field4" : {
      "type" : "keyword"
    },
    "field5" : {
      "type" : "date"
    }

Я использую приведенный ниже код для отправки сообщений из pub / sub в Elasticsearch:

def collect_data(data):
    es = Elasticsearch( [host], port = port)
    data = data.decode('utf-8')
    data = data.replace("\'", "\"")
    twraw = json.loads(data)
    response = requests.post('http://{}:{}/my_index/_doc/'.format(host,port), data=twraw, headers={"Content-Type": "application/json"})
    print(response.text)

def receive_data(project, subscription_name):
    subscriber = pubsub.SubscriberClient()
    subscription_path = subscriber.subscription_path(project, subscription_name)

    def callback(message):
       print('Received message: {}'.format(message))
       collect_data(message.data)
       message.ack()

    subscription = subscriber.subscribe(subscription_path, callback=callback)
    print('Listening for messages on {}'.format(subscription_path))

    future = subscriber.subscribe(subscription_path, callback=callback)
    try:
        future.result(timeout=10)
    except Exception as e:
       print(
        'Listening for messages on {} threw an Exception: {}'.format(
            subscription_name, e))
       raise


    while True:
       time.sleep(60) 

Я получаю сообщение об ошибке ниже для каждого сообщения, которое не могу решить.

{
  "error": {
    "root_cause": [
      {
        "type": "mapper_parsing_exception",
        "reason": "failed to parse"
      }
    ],
    "type": "mapper_parsing_exception",
    "reason": "failed to parse",
    "caused_by": {
      "type": "not_x_content_exception",
      "reason": "Compressor detection can only be called on some xcontent bytes or compressed xcontent bytes"
    }
  },
  "status": 400
}

1 Ответ

2 голосов
/ 27 марта 2020

Это сработало для меня, чтобы послать сообщения наasticsearch:

def collect_data(data):
    es = Elasticsearch( [host], port = port)
    data = data.decode('utf-8')
    data = data.replace("\'", "\"")
    twraw = json.loads(data)
    es.index(index = 'my_index', body = twraw)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...