Не удалось создать для каждого действия в фабрике данных azure, используя python - PullRequest
0 голосов
/ 21 февраля 2020

Я пытаюсь создать azure конвейеры и ресурсы фабрики данных, используя python. Я успешно выполнил некоторые действия ADF, такие как поиск, копирование ..., но проблема, с которой я здесь сталкиваюсь, заключается в том, что я пытаюсь скопировать несколько таблиц из SQL в BLOB-объект, используя операцию «Каждое действие», и выдает следующую ошибку

Как бы вы создали деятельность внутри для каждого вида деятельности? Любые вклады с благодарностью. Спасибо!

Ссылка: https://docs.microsoft.com/en-us/python/api/azure-mgmt-datafactory/azure.mgmt.datafactory.models.foreachactivity?view=azure-python

Сообщение об ошибке

Ошибка типа: объект «CopyActivity» не является повторяемый

Кодовый блок

## Lookup Activity
ls_sql_name = 'ls_'+project_name+'_'+src_svr_type+'_dev'
linked_service_name =LinkedServiceReference(reference_name=ls_sql_name)
lkp_act_name ='Get Table Names'
sql_reader_query = "SELECT top 3 name from sys.tables where name like '%dim'"
source = SqlSource(sql_reader_query= sql_reader_query)
dataset= {"referenceName": "ds_sql_Dim_input","type": "DatasetReference"}
LookupActivity_ = LookupActivity(name =lkp_act_name, linked_service_name= linked_service_name, source = source, dataset = dataset
                                ,first_row_only =False)


#create copy activity
ds_name = 'ds_sql_dim_input' #these datasets already created
dsOut_name ='ds_blob_dim_output' #these datasets already created

copy_act_name = 'Copy SQL to Blob(parquet)'
sql_reader_query =  {"value": "@item().name","type": "Expression"}
sql_source = SqlSource(sql_reader_query=sql_reader_query)
blob_sink = ParquetSink()
dsin_ref = DatasetReference(reference_name=ds_name)
dsOut_ref = DatasetReference(reference_name=dsOut_name)
copy_activity = CopyActivity(name=copy_act_name,inputs=[dsin_ref], outputs=[dsOut_ref], source=sql_source, sink=blob_sink)

## For Each Activity
pl_name ='pl_Test'
items= {"value": "@activity('Get Table Names').output.value","type": "Expression"}
dependsOn = [{"activity": "Get Table Names","dependencyConditions": ["Succeeded"]}]
ForEachActivity_= ForEachActivity(name = 'Copy tables in loop',items=items,depends_on=dependsOn ,activities =copy_activity)

params_for_pipeline = {}
p_obj = PipelineResource(activities=[LookupActivity_,ForEachActivity_], parameters=params_for_pipeline)
p = adf_client.pipelines.create_or_update(rg_name, df_name, pl_name, p_obj)

1 Ответ

1 голос
/ 21 февраля 2020

Действия должны быть списком действий, и вы передаете один. Попробуйте создать список и добавить в него действие копирования, а затем передать этот список в параметре действий.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...