Я делаю ETL с Луиджи и Spark Standalone. Я получаю CSV-файл, применяю преобразования, а затем создаю PySpark DataFrame. После я делаю разные фильтрации и агрегации. Я сомневаюсь в том, что с Луиджи можно сделать что-то подобное, или мне нужно перейти к другим решениям:
class step_1(SparkSubmitTask):
name = "Create DataFrame from CSV"
app = 'df.py'
def app_options(self):
return [self.output().path]
def output(self):
return PySpark DataFrame
class step_2(SparkSubmitTask):
name = "aggregate data from step_1 dataframe and generate csv"
app = 'step_2.py'
def app_options(self):
return [self.output().path]
def requires(self):
return step_1
def output(self):
return csv_1
class step_3(SparkSubmitTask):
name = "aggregate data from step_1 dataframe and generate csv"
app = 'step_3.py'
def app_options(self):
return [self.output().path]
def requires(self):
return step_1
def output(self):
return csv_2
Большое спасибо заранее.