Как использовать pandas to_pickle и read_pickle с luigi - PullRequest
0 голосов
/ 07 августа 2020

Просто пытаюсь разобраться, Луиджи, мне нужно передать обработанные pandas фреймы данных вокруг рабочего процесса:

class readSQLtoPickle(luigi.Task):

    sql = luigi.Parameter()
    pickle = luigi.Parameter()

    def output(self):
        return luigi.LocalTarget(self.pickle, format=format.Nop)


    def run(self):
        data = pd.read_sql(self.sql, ariel)
        data.to_pickle(self.output().path, compression=None)




class grabData(luigi.Task): # standard Luigi Task class

    sql = luigi.Parameter(default="SELECT * FROM DIM_DRUG_PRODUCT")
    pickle = luigi.Parameter(default="drug_product.pkl")

    def requires(self):
    # we need to read the log file before we can process it
        return readSQLtoPickle(sql=self.sql, pickle=self.pickle)

    def run(self):
        df = pd.read_pickle(self.input())
        print(df.head(20))

Эта наивная попытка не удалась, я думаю, поскольку я возвращаю двоичный объект, который может ' t затем можно прочитать через read_pickle ()?

File "C:\ProgramData\Anaconda3\envs\ariel\lib\site-packages\pandas\io\pickle.py", line 166, in read_pickle
    filepath_or_buffer, compression=compression
  File "C:\ProgramData\Anaconda3\envs\ariel\lib\site-packages\pandas\io\common.py", line 200, in get_filepath_or_buffer
    raise ValueError(msg)
ValueError: Invalid file path or buffer object type: <class 'luigi.local_target.LocalTarget'>

Обновление: похоже, это работает, но не уверен, что вы можете использовать этот синтаксис файла для pandas to_pickle () и read_pickle ()?

class readSQLtoPickle(luigi.Task):

    sql = luigi.Parameter()
    pickle = luigi.Parameter()

    def output(self):
        return luigi.LocalTarget(self.pickle, format=format.Nop)


    def run(self):
        data = pd.read_sql(self.sql, ariel)
        with self.output().open('w') as f:
            pickle.dump(data, f)





class grabData(luigi.Task): # standard Luigi Task class

    sql = luigi.Parameter(default="SELECT * FROM DIM_DRUG_PRODUCT")
    pickle = luigi.Parameter(default="drug_product.pkl")

    def requires(self):
    # we need to read the log file before we can process it
        return readSQLtoPickle(sql=self.sql, pickle=self.pickle)

    def run(self):
        with self.input().open('r') as f:
            df = pickle.load(f)
            print(type(df))
...