Воздушный поток делает потоковую передачу задач неправильной - PullRequest
0 голосов
/ 17 декабря 2018

The streaming that is wrong

В соответствии с кодом, задача versionator и fs_task должны быть после задачи register.

Однако в этомслучай, он не имеет ни вверх по течению, ни вниз по течению.Как начать отладку?

Коды определения процесса: 1. Versionator:

def driver_de_versionator(**kwargs):
    log = get_logger(__name__, "INFO")
    log.info(kwargs)
    if 'pipeline_id' in kwargs and 'sheet_id' in kwargs and 'run_id' in kwargs and 'primary_key' in kwargs and 'mode' in kwargs:
        pipeline_id = kwargs['pipeline_id']
        sheet_id = kwargs['sheet_id']
        run_id = kwargs['run_id']
        package_primary_key = kwargs['primary_key']
        mode = kwargs['mode']
    else:
        raise ValueError('Invalid Parameters.')
    package_name = os.path.join(STORAGE_PREFIX, hashlib.md5(str(pipeline_id + sheet_id).encode('utf-8')).hexdigest())
    package_sheet_s3_key = os.path.join(STORAGE_PREFIX, unit_source_collection.find_one({"$and":[ {"pipeline_id": pipeline_id}, {"sheet_id": sheet_id}, {"run_id": run_id}]})["storage_path"])
    logging.info(package_sheet_s3_key)
    package_s3_version_key = package_name + "_versions"
    encoding = "utf-8"
    update_recent = True
    package_s3_diff_key = package_name + "_diff"
    a = Versionator(
        local_sink_path=LOCAL_SINK_PATH,
        aws_server_public_key=AWS_SERVER_PUBLIC_KEY,
        aws_server_secret_key=AWS_SERVER_SECRET_KEY,
        bucket_name=BUCKET_NAME,
        region_name=REGION_NAME,
        package_name=package_name,
        package_sheet_s3_key=package_sheet_s3_key, 
        package_s3_version_key=package_s3_version_key, 
        package_primary_key=package_primary_key, 
        package_s3_diff_key=package_s3_diff_key,
        s3=u's3',
        update_recent=True,
        encoding=encoding,
        mode=mode,
    )
    a.create_file_object_for_streaming()
    a.resolve_last_version()
    a.create_folder()
    a.download_last_version()
    a.index_patch()
    a.upload_diff_and_version()
    version_key = a.return_version_details()
    diff_key = a.return_diff_details()
    version_meta = {
        'version_s3_key': version_key,
        'diff_s3_key': diff_key
    }
    store_version_meta(version_meta=version_meta, pipeline_id=pipeline_id, run_id=run_id, sheet_id=sheet_id)

Кроме того, в моем пакете должен быть модуль datatable, но его нет.

Код строителя dag:

for sheet in pipeline.get('sheets', []):
        try:
            if sheet.get('push_config', {}).get('push'):
                sheet_task_id = sheet.get('task_id')
                sheet_id = sheet.get('sheet_id')
                fs_task_id = 'fs_{0}'.format(sheet_id)
                fs_task = PythonOperator(
                    task_id=fs_task_id,
                    python_callable=driver_de_feedershark,
                    op_kwargs={
                        'pipeline_id': pipeline.get('pipeline_id'),
                        'sheet_id': sheet.get('sheet_id')
                        },
                    dag=dag
                )
                task = sheet_bag[sheet_id]
                task_bag[fs_task_id] = fs_task
                task.set_downstream(fs_task)
                versionator_task_id = 'versionator_{0}'.format(sheet_id)
                versionator_task = PythonOperator(
                    task_id=versionator_task_id,
                    python_callable=driver_de_versionator,
                    op_kwargs={
                        'pipeline_id': pipeline.get('pipeline_id'),
                        'sheet_id': sheet.get('sheet_id'),
                        'primary_key': sheet['push_config'].get('primary_key', []),
                        'mode': pipeline.get('pipeline_type')
                    },
                    dag=dag
                )
                # print("jhim batuta")
                # print(sheet)
                task = sheet_bag[sheet_id]
                task_bag[versionator_task_id] = versionator_task
                task.set_downstream(versionator_task)

                datatable_task_id = 'datatable_{0}'.format(sheet_task_id)
                datatable_task = PythonOperator(
                    task_id=datatable_task_id,
                    python_callable=driver_de_datatable,
                    op_kwargs={
                        'pipeline_id': pipeline.get('pipeline_id'),
                        'sheet_id': sheet.get('sheet_id'),
                        'package_path': sheet['push_config'].get('package_path'),
                        'primary_key': sheet['push_config'].get('primary_key', []),
                        'mode': pipeline.get('pipeline_type')
                    },
                    dag=dag
                )
                task = task_bag[versionator_task_id]
                task_bag[datatable_task_id] = datatable_task
                task.set_downstream(datatable_task)
                task = task_bag[fs_task_id]
                task_bag[datatable_task_id] = datatable_task
                task.set_downstream(datatable_task)
        except AirflowException as e:
            raise ValueError (e)

1 Ответ

0 голосов
/ 18 декабря 2018

Да, я не слежу за твоим использованием task_bag и sheet_bag здесь.Вы не показали, что создает задачи register, которые вы хотите использовать в качестве исходных для задач versionator и fs.Также укажите это, чтобы помочь людям прочитать его.

for sheet in pipeline.get('sheets', []):
        try:
            if sheet.get('push_config', {}).get('push'):
                fs_task = PythonOperator(
                    task_id='fs_{0}'.format(sheet.get('sheet_id')),
                    python_callable=driver_de_feedershark,
                    op_kwargs={
                        'pipeline_id': pipeline.get('pipeline_id'),
                        'sheet_id': sheet.get('sheet_id')
                        },
                    dag=dag
                )

                # task = ?
                task.set_downstream(fs_task)

                versionator_task = PythonOperator(
                    task_id='versionator_{0}'.format(sheet.get('sheet_id')),
                    python_callable=driver_de_versionator,
                    op_kwargs={
                        'pipeline_id': pipeline.get('pipeline_id'),
                        'sheet_id': sheet.get('sheet_id'),
                        'primary_key': sheet['push_config'].get('primary_key', []),
                        'mode': pipeline.get('pipeline_type')
                    },
                    dag=dag
                )

                # task = ?
                task.set_downstream(versionator_task)

                datatable_task = PythonOperator(
                    task_id='datatable_{0}'.format(sheet.get('task_id')),
                    python_callable=driver_de_datatable,
                    op_kwargs={
                        'pipeline_id': pipeline.get('pipeline_id'),
                        'sheet_id': sheet.get('sheet_id'),
                        'package_path': sheet['push_config'].get('package_path'),
                        'primary_key': sheet['push_config'].get('primary_key', []),
                        'mode': pipeline.get('pipeline_type')
                    },
                    dag=dag
                )
                # Not necessary: task = task_bag[versionator_task_id]
                # This is done twice: task_bag[datatable_task_id] = datatable_task
                versionator_task.set_downstream(datatable_task)
                # Its confusing: task = task_bag[fs_task_id]
                # And I don't see it used: task_bag[datatable_task_id] = datatable_task
                fs_task.set_downstream(datatable_task)
        except AirflowException as e:
            raise ValueError (e)

Часть кода, которую вы не видите, также является частью кода, который вы нам не показывали.

...