В Luigi, как я могу проверить, существует ли внешний ресурс (в моем примере индекс эластичного поиска, но это может быть что угодно), и если нет, создать его?Моя проблема в том, что задачи всегда должны генерировать выходной файл, иначе они будут считаться завершенными и не будут выполняться.
Это здесь https://github.com/spotify/luigi/issues/595 выглядит как хак, а у меня нет run () -метод в моем классе в любом случае (он реализуется родительским классом CopyToIndex):
class UpdateIndex(CopyToIndex):
source: str = Parameter(default='')
iteration = luigi.IntParameter()
def requires(self):
return FilterDataset(self.source)
def docs(self):
file_path: str = os.path.join('tmp', '{0}_filtered_output.ldj'.format(self.source))
file_contents = open(file_path, 'r').read()
return [json.loads(str(item)) for item in file_contents.strip().split('\n')]
# properties of parent class CopyToIndex:
host = 'localhost'
port = 9200
index = 'example'
doc_type = '_doc'
purge_existing_index = True
marker_index_hist_size = 1
mapping = {
'properties': {
'name': {'type': 'keyword'},
'name_suggest': {
'type': 'completion',
}
}
}
settings = {
'number_of_shards': 1,
'number_of_replicas': 0
}