Заполняется слот репликации, созданный соединителем kafka-соединителя.
У меня есть postgres RDS база данных на AWS. Я поместил следующую опцию группы параметров (только отображая разницу из значения по умолчанию)
rds.logical_replication: 1
У меня есть kafka connect, работающий с разъемом debezium postgres. Это конфиг (с определенными значениями, отредактированными, конечно)
"database.dbname" = "mydb"
"database.hostname" = "myhostname"
"database.password" = "mypass"
"database.port" = "myport"
"database.server.name" = "postgres"
"database.user" = "myuser"
"database.whitelist" = "my_database"
"include.schema.changes" = "false"
"plugin.name" = "wal2json_streaming"
"slot.name" = "my_slotname"
"snapshot.mode" = "never"
"table.whitelist" = "public.mytable"
"tombstones.on.delete" = "false"
"transforms" = "key"
"transforms.key.field" = "id"
"transforms.key.type" = "org.apache.kafka.connect.transforms.ExtractField$Key"
Если я получу статус этого соединителя, это будет нормально.
curl -s http://my.kafkaconnect.url:kc_port/connectors/my-connector/status | jq
{
"name": "my-connector",
"connector": {
"state": "RUNNING",
"worker_id": "some_ip"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "some_ip"
}
],
"type": "source"
}
Однако репликация слот в postgres продолжает увеличиваться и увеличиваться:
SELECT slot_name,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) as replicationSlotLag,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)) as confirmedLag,
active
FROM pg_replication_slots;
slot_name | replicationslotlag | confirmedlag | active
-------------------------------+--------------------+--------------+--------
my_slotname | 20 GB | 20 GB | t
Почему репликация продолжает расти? Как я понимаю, запущенная задача коннектора kafka connect должна считывать данные из этого слота репликации, публиковать его в топи c postgres. public.mytable
, а затем размер слота репликации должен уменьшаться. Я что-то упустил в этой цепочке действий?