Работа через это https://docs.microsoft.com/en-us/azure/data-factory/tutorial-incremental-copy-portal
У меня есть одно действие Lookup под названием GetCurrentWatermarkValue с sqlReaderQuery:
Select WaterMarkValue as CurrentWatermarkValue\nfrom WatermarkTable
У меня есть другое действие под названием GetNewWatermarkValue с sqlReaderQuery:
select max(createdon) as NewWatermarkValue from shipment
Затем я пытаюсь использовать их обоих в источнике для операции копирования данных, используя
select *
from Shipment
where CreatedOn > '@{activity('GetCurrentWatermarkValue').output.firstRow.CurrentWatermarkValue}'
and CreatedOn <= '@{activity('GetNewWatermarkValue').output.firstRow.NewWatermarkValue}'
Кнопка «Предварительный просмотр данных» неактивна (но активна, когда я удаляю условие where), поэтому явно что-то не так
После того, как я установил приемник, я пытаюсь настроить отображение. Щелчок по схеме импорта на вкладке «Сопоставление» дает:
A database operation failed with the following error: 'Incorrect syntax near 'GetCurrentWatermarkValue'.'. Activity ID:98794aa9-c866-48d6-b9ff-9cb277bac6ed
Я подумал, может быть, мне следует использовать опцию добавления динамического содержимого, но это просто дает
Query is required
Я где-то читал, что когда в поле «Уточняющий запрос» установлен параметр «Только первая строка», текст после firstRow. должно быть [TableName], но это не так.
Поиск:
{
"name": "GetCurrentWatermarkValue",
"type": "Lookup",
"policy": ...,
"typeProperties": {
"source": {
"type": "AzureSqlSource",
"sqlReaderQuery": "Select WaterMarkValue as CurrentWatermarkValue\nfrom WatermarkTable"
},
"dataset": {
"referenceName": "WatermarkTable",
"type": "DatasetReference"
}
}
}
Поиск:
{
"name": "GetNewWatermarkValue",
"type": "Lookup",
"policy": ...,
"typeProperties": {
"source": {
"type": "AzureSqlSource",
"sqlReaderQuery": "select max(createdon) as NewWatermarkValue from shipment"
},
"dataset": {
"referenceName": "ShipmentsTable",
"type": "DatasetReference"
}
}
}
Копирование данных:
{
"name": "ArchiveShipments",
"type": "Copy",
"dependsOn": [
{
"activity": "GetCurrentWatermarkValue",
"dependencyConditions": [
"Succeeded"
]
},
{
"activity": "GetNewWatermarkValue",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": ...,
"typeProperties": {
"source": {
"type": "AzureSqlSource",
"sqlReaderQuery": {
"value": "select *\nfrom Shipment\nwhere CreatedOn > '@{activity('GetCurrentWatermarkValue').output.firstRow.CurrentWatermarkValue}' \nand CreatedOn <= '@{activity('GetNewWatermarkValue').output.firstRow.NewWatermarkValue}'",
"type": "Expression"
}
},
"sink": {
"type": "AzureSqlSink"
},
"enableStaging": false
},
"inputs": [
{
"referenceName": "ShipmentsTable",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "ShipmentArchiveTable",
"type": "DatasetReference"
}
]
}