Установка клиентского объекта Elasticsearch в качестве переменной экземпляра в Apache Beam transform вызывает ошибку сериализации в Python - PullRequest
0 голосов
/ 18 июня 2020

В одном из моих преобразований Apache Beam я записываю данные в Elasticsearch, который выполняется локально в контейнере Docker. Это делается путем создания клиента Elasticsearch и передачи его преобразованию. В преобразовании у меня есть функция __init__, которая устанавливает клиент Elasticsearch в качестве переменной экземпляра: self.es_client = es_client, которая затем используется функцией process для записи данных в Elasticsearch.

Проблема в том, что Я не могу этого сделать. Всякий раз, когда я устанавливаю значение переменной экземпляра в этом преобразовании для клиентского объекта, я получаю сообщение об ошибке «TypeError: Cannot serialize socket object» .

My best Угадайте, что происходит: Apache Beam автоматически сериализует любые переменные экземпляра в преобразовании, и по какой-то причине он не может сериализовать этот клиентский объект Elasticsearch.

Ближайшее, что я нашел в Интернете, это это проблема. Я совершенно не понимаю, почему это происходит, но был бы признателен за любые идеи!

Файл, который создает клиент Elasticsearch и передает его в преобразование Beam:

es_client = Elasticsearch([
  {
    'host': "0.0.0.0", 'network.host': "0.0.0.0", 'network.publish_host': "0.0.0.0", 'http.port': 9200,
    'timeout': 30, 'retry_on_timeout': True, 'max_retries': 10
  }
])
....
# Line where I call the transform (as part of larger pipeline)
"Insert sessions into Elasticsearch" >> beam.ParDo(transforms.WriteDataToElasticsearch("sessions", es_client))

Файл с преобразованием

class WriteDataToElasticsearch(beam.DoFn):
  def __init__(self, index_name, es_client):
    # What index to write to
    self.index_name = index_name
    self.es_client = es_client

  def process(self, element):
    # Doesn't even get to this line - error seems to be thrown at conclusion of __init__ method
    index_exists = self.es_client.indices.exists(index=self.index_name)
    if not index_exists:
      print('Creating index {i}'.format(i=self.index_name))
      self.es_client.indices.create(index=self.index_name)

    print('Writing to {i} index'.format(i=self.index_name))
    res = self.es_client.index(index=self.index_name, body=element)
    print(res)
    yield
...