Я пытаюсь создать azure конвейеры и ресурсы фабрики данных, используя python. Я успешно выполнил некоторые действия ADF, такие как поиск, копирование ..., но проблема, с которой я здесь сталкиваюсь, заключается в том, что я пытаюсь скопировать несколько таблиц из SQL в BLOB-объект, используя операцию «Каждое действие», и выдает следующую ошибку
Как бы вы создали деятельность внутри для каждого вида деятельности? Любые вклады с благодарностью. Спасибо!
Ссылка: https://docs.microsoft.com/en-us/python/api/azure-mgmt-datafactory/azure.mgmt.datafactory.models.foreachactivity?view=azure-python
Сообщение об ошибке
Ошибка типа: объект «CopyActivity» не является повторяемый
Кодовый блок
## Lookup Activity
ls_sql_name = 'ls_'+project_name+'_'+src_svr_type+'_dev'
linked_service_name =LinkedServiceReference(reference_name=ls_sql_name)
lkp_act_name ='Get Table Names'
sql_reader_query = "SELECT top 3 name from sys.tables where name like '%dim'"
source = SqlSource(sql_reader_query= sql_reader_query)
dataset= {"referenceName": "ds_sql_Dim_input","type": "DatasetReference"}
LookupActivity_ = LookupActivity(name =lkp_act_name, linked_service_name= linked_service_name, source = source, dataset = dataset
,first_row_only =False)
#create copy activity
ds_name = 'ds_sql_dim_input' #these datasets already created
dsOut_name ='ds_blob_dim_output' #these datasets already created
copy_act_name = 'Copy SQL to Blob(parquet)'
sql_reader_query = {"value": "@item().name","type": "Expression"}
sql_source = SqlSource(sql_reader_query=sql_reader_query)
blob_sink = ParquetSink()
dsin_ref = DatasetReference(reference_name=ds_name)
dsOut_ref = DatasetReference(reference_name=dsOut_name)
copy_activity = CopyActivity(name=copy_act_name,inputs=[dsin_ref], outputs=[dsOut_ref], source=sql_source, sink=blob_sink)
## For Each Activity
pl_name ='pl_Test'
items= {"value": "@activity('Get Table Names').output.value","type": "Expression"}
dependsOn = [{"activity": "Get Table Names","dependencyConditions": ["Succeeded"]}]
ForEachActivity_= ForEachActivity(name = 'Copy tables in loop',items=items,depends_on=dependsOn ,activities =copy_activity)
params_for_pipeline = {}
p_obj = PipelineResource(activities=[LookupActivity_,ForEachActivity_], parameters=params_for_pipeline)
p = adf_client.pipelines.create_or_update(rg_name, df_name, pl_name, p_obj)