Просто пытаюсь разобраться, Луиджи, мне нужно передать обработанные pandas фреймы данных вокруг рабочего процесса:
class readSQLtoPickle(luigi.Task):
sql = luigi.Parameter()
pickle = luigi.Parameter()
def output(self):
return luigi.LocalTarget(self.pickle, format=format.Nop)
def run(self):
data = pd.read_sql(self.sql, ariel)
data.to_pickle(self.output().path, compression=None)
class grabData(luigi.Task): # standard Luigi Task class
sql = luigi.Parameter(default="SELECT * FROM DIM_DRUG_PRODUCT")
pickle = luigi.Parameter(default="drug_product.pkl")
def requires(self):
# we need to read the log file before we can process it
return readSQLtoPickle(sql=self.sql, pickle=self.pickle)
def run(self):
df = pd.read_pickle(self.input())
print(df.head(20))
Эта наивная попытка не удалась, я думаю, поскольку я возвращаю двоичный объект, который может ' t затем можно прочитать через read_pickle ()?
File "C:\ProgramData\Anaconda3\envs\ariel\lib\site-packages\pandas\io\pickle.py", line 166, in read_pickle
filepath_or_buffer, compression=compression
File "C:\ProgramData\Anaconda3\envs\ariel\lib\site-packages\pandas\io\common.py", line 200, in get_filepath_or_buffer
raise ValueError(msg)
ValueError: Invalid file path or buffer object type: <class 'luigi.local_target.LocalTarget'>
Обновление: похоже, это работает, но не уверен, что вы можете использовать этот синтаксис файла для pandas to_pickle () и read_pickle ()?
class readSQLtoPickle(luigi.Task):
sql = luigi.Parameter()
pickle = luigi.Parameter()
def output(self):
return luigi.LocalTarget(self.pickle, format=format.Nop)
def run(self):
data = pd.read_sql(self.sql, ariel)
with self.output().open('w') as f:
pickle.dump(data, f)
class grabData(luigi.Task): # standard Luigi Task class
sql = luigi.Parameter(default="SELECT * FROM DIM_DRUG_PRODUCT")
pickle = luigi.Parameter(default="drug_product.pkl")
def requires(self):
# we need to read the log file before we can process it
return readSQLtoPickle(sql=self.sql, pickle=self.pickle)
def run(self):
with self.input().open('r') as f:
df = pickle.load(f)
print(type(df))