слот репликации уже существует - PullRequest
0 голосов
/ 07 июня 2018

Каждый раз, когда я перезапускаю контейнер debezium kafka-connect или развертываю другой экземпляр, я получаю следующую ошибку:

    io.debezium.jdbc.JdbcConnectionException: ERROR: replication slot "debezium" already exists
    at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.initReplicationSlot(PostgresReplicationConnection.java:136)
    at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.<init>(PostgresReplicationConnection.java:79)
    at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.<init>(PostgresReplicationConnection.java:38)
    at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$ReplicationConnectionBuilder.build(PostgresReplicationConnection.java:349)
    at io.debezium.connector.postgresql.PostgresTaskContext.createReplicationConnection(PostgresTaskContext.java:80)
    at io.debezium.connector.postgresql.RecordsStreamProducer.<init>(RecordsStreamProducer.java:75)
    at io.debezium.connector.postgresql.PostgresConnectorTask.start(PostgresConnectorTask.java:112)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:157)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.postgresql.util.PSQLException: ERROR: replication slot "debezium" already exists
    at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2412)
    at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2125)
    at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:297)
    at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:428)
    at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:354)
    at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:301)
    at org.postgresql.jdbc.PgStatement.executeCachedSql(PgStatement.java:287)
    at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:264)
    at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:260)
    at org.postgresql.replication.fluent.logical.LogicalCreateSlotBuilder.make(LogicalCreateSlotBuilder.java:48)
    at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.initReplicationSlot(PostgresReplicationConnection.java:102)
    ... 14 more

Я использую этот образ: https://github.com/debezium/docker-images/tree/master/connect/0.8

Иу меня есть для него конфигурация:

    {  
   "name":"record-loader-connector",
   "config":{  
      "connector.class":"io.debezium.connector.postgresql.PostgresConnector",
      "database.dbname":"record_loader?ssl",
      "database.user":"postgres",
      "database.hostname":"redacted",
      "database.history.kafka.bootstrap.servers":"redacted",
      "database.history.kafka.topic":"dbhistory.recordloader",
      "database.password":"redacted",
      "name":"record-loader-connector",
      "database.server.name":"recordLoaderDb",
      "database.port":"20023",
      "table.whitelist":".*sync"
   },
   "tasks":[  
      {  
         "connector":"record-loader-connector",
         "task":0
      }
   ],
   "type":"source"
}

Я заметил эти две опции конфигурации (slot.name и slot.drop_on_stop), но мне не ясно, если / как я должен изменить их:

http://debezium.io/docs/connectors/postgresql/#connector-properties

1 Ответ

0 голосов
/ 12 июня 2018

При развертывании нескольких экземпляров коннектора Debezium Postgres необходимо обязательно использовать разные имена слотов репликации.При настройке соединителя вы можете указать имя:

{
    "name": "inventory-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "tasks.max": "1",
        "database.hostname": "postgres",
        "database.port": "5432",
        "database.user": "postgres",
        "database.password": "postgres",
        "database.dbname" : "postgres",
        "database.server.name": "dbserver1",
        "database.whitelist": "inventory",
        "database.history.kafka.bootstrap.servers": "kafka:9092",
        "database.history.kafka.topic": "schema-changes.inventory",
        "slot.name" : "my-slot-name"
    }
}

Я не могу воспроизвести проблему, описанную вами при перезапуске данного экземпляра соединителя.Он должен обнаружить, что слот уже существует, и использовать его повторно (одной из возможных причин может быть то, что вы также изменили плагин логического декодирования («decoderbufs» вместо «wal2json»)?). Если у вас есть средство воспроизведения для этогоНе могли бы вы открыть запись в нашем баг-трекере ?

Чтобы продолжить, вы можете вручную удалить слот в Postgres:

select pg_drop_replication_slot('debezium');
...