В одном из моих преобразований Apache Beam я записываю данные в Elasticsearch, который выполняется локально в контейнере Docker. Это делается путем создания клиента Elasticsearch и передачи его преобразованию. В преобразовании у меня есть функция __init__
, которая устанавливает клиент Elasticsearch в качестве переменной экземпляра: self.es_client = es_client
, которая затем используется функцией process
для записи данных в Elasticsearch.
Проблема в том, что Я не могу этого сделать. Всякий раз, когда я устанавливаю значение переменной экземпляра в этом преобразовании для клиентского объекта, я получаю сообщение об ошибке «TypeError: Cannot serialize socket object» .
My best Угадайте, что происходит: Apache Beam автоматически сериализует любые переменные экземпляра в преобразовании, и по какой-то причине он не может сериализовать этот клиентский объект Elasticsearch.
Ближайшее, что я нашел в Интернете, это это проблема. Я совершенно не понимаю, почему это происходит, но был бы признателен за любые идеи!
Файл, который создает клиент Elasticsearch и передает его в преобразование Beam:
es_client = Elasticsearch([
{
'host': "0.0.0.0", 'network.host': "0.0.0.0", 'network.publish_host': "0.0.0.0", 'http.port': 9200,
'timeout': 30, 'retry_on_timeout': True, 'max_retries': 10
}
])
....
# Line where I call the transform (as part of larger pipeline)
"Insert sessions into Elasticsearch" >> beam.ParDo(transforms.WriteDataToElasticsearch("sessions", es_client))
Файл с преобразованием
class WriteDataToElasticsearch(beam.DoFn):
def __init__(self, index_name, es_client):
# What index to write to
self.index_name = index_name
self.es_client = es_client
def process(self, element):
# Doesn't even get to this line - error seems to be thrown at conclusion of __init__ method
index_exists = self.es_client.indices.exists(index=self.index_name)
if not index_exists:
print('Creating index {i}'.format(i=self.index_name))
self.es_client.indices.create(index=self.index_name)
print('Writing to {i} index'.format(i=self.index_name))
res = self.es_client.index(index=self.index_name, body=element)
print(res)
yield