Я пытаюсь делать резервные копии файла в памяти на диске каждый час, используя Luigi, т.е. сказать: «Каждый час я хочу выполнить задачу runrun_backup и проверить, возвращает ли функция func_backup () значение True или нет, если да». Затем я сохраняю файл в диск. (ниже код, который я попробовал):
df_dumm = pd.DataFrame({'col1':[1,2,3],'col2':[1,2,3]})
def func_backup():
df_dumm.to_csv('test.csv')
class run_backup(luigi.Task):
status = False
def run(self):
self.status = True if func_backup() else False
def complete(self):
return True #hard Code to test Overall Task
class BackupScheduler(luigi.Task):
def run(self):
RangeHourlyBase(of=run_backup,start=datetime.date.today(),param_name='test')
def output(self):
return luigi.LocalTarget(r'Backup_{}'.format(2) )
if __name__ =='main':
luigi.build([ChainedTasks(ids)], workers=1,local_scheduler=False,no_lock=True,detailed_summary=True)
Однако я получаю ошибку ниже:
Traceback (most recent call last):
File "\lib\site-packages\luigi\worker.py", line 184, in run
raise RuntimeError('Unfulfilled %s at run time: %s' % (deps, ', '.join(missing)))
RuntimeError: Unfulfilled dependency at run time: BackupScheduler__99914b932b
Как я могу запустить RangeHourlyBase () внутри Python вместо команды линия