Я пытаюсь написать свой первый ETL с luigi.
Я думаю, что это должен быть простой конвейер, но я борюсь с этим.
Что мне понадобится Рабочий процесс просто:
- Загрузите файл XML по стране (он содержит отели в стране).
- Прочтите его и проанализируйте.
- Вставьте каждая строка в базе данных postgres.
Что у меня сейчас есть:
class GetXML(luigi.Task):
usr = ''
psw = ''
pais = luigi.Parameter()
def output(self):
ruta="data/%s.xml"%(self.pais)
return luigi.LocalTarget(ruta, format=luigi.format.Nop)
def requires(self):
return None
def run(self):
response = requests.get(top_level_url_pais + str(self.pais), auth=HTTPBasicAuth(self.usr, self.psw))
xml = response.content
with self.output().open('wb') as f:
f.write(xml)
class ReadXML(luigi.Task):
pais = luigi.Parameter()
def output(self):
ruta = "data/%s.xml" % (self.pais)
return luigi.LocalTarget(ruta)
def requires(self):
return GetXML(self.pais)
def run(self):
root = ET.parse(self.input().path, 'r').getroot()
for prop in root.findall('property'):
giataid = prop.get('giataId')
lastUpdate = prop.get('lastUpdate')
with open(self.input().path, 'r') as f:
hello = f.read()
print(hello)
class InsertToRDS(luigi.contrib.postgres.CopyToTable):
pais = luigi.Parameter()
host = "localhost"
database = ""
user = ""
password = ""
table = "test"
columns = [("giataid", "TEXT"),
("lastUpdate", "TEXT")
]
def requires(self):
return ReadXML(self.pais)
К настоящему времени я могу получить файл XML, но все еще не могу его проанализировать и сохраните его в таблице.
Моя основная проблема заключается в том, что я не понимаю, каким образом я должен передавать данные между Задачами.
Мне нужно прочитать данные XML, передать его в класс InsertToRDS, а затем сохранить его.
Есть предложения?
Заранее спасибо!