Я пытаюсь прослушать изменения в базе данных Aurora с помощью Amazon DMS и отправить изменения в поток Kinesis, где будет обрабатываться функция Lambda, прослушивающая поток.
Я имел в виду следующеедокументация для написания моих правил.
https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Target.Kinesis.html https://aws.amazon.com/blogs/database/use-the-aws-database-migration-service-to-stream-change-data-to-amazon-kinesis-data-streams/
Вот мое сопоставление правил для задачи постоянной репликации DMS (CDC).
{
"rules": [
{
"rule-type": "selection",
"rule-id": "1",
"rule-name": "1",
"object-locator": {
"schema-name": "my_db",
"table-name": "my_table"
},
"rule-action": "include"
},
{
"rule-type": "object-mapping",
"rule-id": "2",
"rule-name": "2",
"rule-action": "map-record-to-record",
"object-locator": {
"schema-name": "my_db",
"table-name": "my_table"
},
"mapping-parameters": {
"partition-key": {
"attribute-name": "my_id",
"value": "${my_id}"
}
}
}
]
}
Однако, когда я делаю изменения в исходной таблице, задача DMS завершается с ошибкой (-ами) ниже.
2019-02-05T10:36:55 [TARGET_APPLY ]E: Error allocating memory for Json document [1020100] (field_mapping_utils.c:382)
2019-02-05T10:36:55 [TARGET_APPLY ]E: Failed while looking for object mapping for table my_table [1020100] (kinesis_utils.c:258)
2019-02-05T10:36:55 [TARGET_APPLY ]E: Error executing data handler [1020100] (streamcomponent.c:1778)
2019-02-05T10:36:55 [TASK_MANAGER ]E: Stream component failed at subtask 0, component st_0_some_random_id [1020100] (subtask.c:1366)
2019-02-05T10:36:55 [TASK_MANAGER ]E: Task error notification received from subtask 0, thread 1 [1020100] (replicationtask.c:2661)
2019-02-05T10:36:55 [TASK_MANAGER ]W: Task 'some_random_task_id' encountered a fatal error (repository.c:4704)
Когда я пытаюсь без правила object-mapping
, Kinesis получит записьс "partitionKey": "my_db.my_table"
с правильными значениями, что является поведением по умолчанию для приемника от таблицы к столу, но нам нужен приемник от таблицы к кинесису.
Почему меня так волнует partition-key
?Потому что мне нужно использовать все осколки в моем потоке Kinesis.
Может ли кто-нибудь мне помочь?
ОБНОВЛЕНИЕ:
Когда я добавляю "partition-key-type": "schema-table"
до "mapping-parameters"
, оно не будет выполнено, задача не будет выполнена, но игнорируется атрибут "partition-key"
и будет иметь "partitionKey": "my_db.my_table"
, как и раньше.
Неопределенные точки:
- При переходе из таблицы в таблицу он использует
"partition-key-type":
"schema-table"
, но никогда не упоминает, каково значение таблицы в кинезис. - Примеры и пояснения в документах очень ограничены и даже ошибочны(т.е. некоторые правила JSON недействительны)