Как транслировать изменения из Postgres с помощью debezium - PullRequest
1 голос
/ 02 мая 2019

Изменения потока от Postgres с использованием дебезия

настройка, которая сделана:

  1. Настройка Docker.
  2. Запустил Postgres, zookeeper, kafka, а затем и разъем debezium.
  3. Удаленная настройка базы данных с помощью decoderbufs, wal2json (postgres).
  4. подключение к дебезию с завитком.
  5. создал наблюдателя.

Проблема: когда я запускаю watcher, он читает все изменения, которые произошли ранее, но когда любая сделанная вставка, kafka выдает исключение для debezium, говоря "An exception occurred in the change event producer. This connector will be stopped.", а в watcher ничего не показывает.

Поскольку я очень новичок в этих понятиях, не могу понять, что я упустил при настройке среды, и это мой первый вопрос по переполнению стека, пожалуйста, игнорируйте мои ошибки.

Основная проблема в том, что он работает нормально, моя локальная база данных.
Может ли кто-нибудь помочь в этом?
Заранее спасибо

019-05-02 14:09:47,242 WARN   Postgres|kafkaserver|records-stream-producer  Closing replication stream due to db connection IO exception...   [io.debezium.connector.postgresql.RecordsStreamProducer]
2019-05-02 14:09:47,365 INFO   ||  WorkerSourceTask{id=kafka-public-connector-0} Committing offsets   [org.apache.kafka.connect.runtime.WorkerSourceTask]
2019-05-02 14:09:47,366 INFO   ||  WorkerSourceTask{id=kafka-public-connector-0} flushing 0 outstanding messages for offset commit   [org.apache.kafka.connect.runtime.WorkerSourceTask]
2019-05-02 14:09:47,375 ERROR  ||  WorkerSourceTask{id=kafka-public-connector-0} Task threw an uncaught and unrecoverable exception   [org.apache.kafka.connect.runtime.WorkerTask]
org.apache.kafka.connect.errors.ConnectException: An exception ocurred in the change event producer. This connector will be stopped.
    at io.debezium.connector.base.ChangeEventQueue.throwProducerFailureIfPresent(ChangeEventQueue.java:170)
    at io.debezium.connector.base.ChangeEventQueue.poll(ChangeEventQueue.java:151)
    at io.debezium.connector.postgresql.PostgresConnectorTask.poll(PostgresConnectorTask.java:161)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:245)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:221)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.postgresql.util.PSQLException: Database connection failed when reading from copy
    at org.postgresql.core.v3.QueryExecutorImpl.readFromCopy(QueryExecutorImpl.java:1037)
    at org.postgresql.core.v3.CopyDualImpl.readFromCopy(CopyDualImpl.java:41)
    at org.postgresql.core.v3.replication.V3PGReplicationStream.receiveNextData(V3PGReplicationStream.java:155)
    at org.postgresql.core.v3.replication.V3PGReplicationStream.readInternal(V3PGReplicationStream.java:124)
    at org.postgresql.core.v3.replication.V3PGReplicationStream.read(V3PGReplicationStream.java:70)
    at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.read(PostgresReplicationConnection.java:251)
    at io.debezium.connector.postgresql.RecordsStreamProducer.streamChanges(RecordsStreamProducer.java:134)
    at io.debezium.connector.postgresql.RecordsStreamProducer.lambda$start$0(RecordsStreamProducer.java:120)
    ... 5 more
Caused by: java.io.EOFException
    at org.postgresql.core.PGStream.receiveChar(PGStream.java:308)
    at org.postgresql.core.v3.QueryExecutorImpl.processCopyResults(QueryExecutorImpl.java:1079)
    at org.postgresql.core.v3.QueryExecutorImpl.readFromCopy(QueryExecutorImpl.java:1035)
    ... 12 more
2019-05-02 14:09:47,387 ERROR  ||  WorkerSourceTask{id=kafka-public-connector-0} Task is being killed and will not recover until manually restarted   [org.apache.kafka.connect.runtime.WorkerTask]
  • Есть ли полное руководство по решению этой проблемы?
  • Основная цель - у меня есть БД с огромными данными, где один приложение (производитель) получает данные с другого сервера и сохраняет все данные в нашей собственной БД и другое приложение (потребитель), чтобы получить хранить и применять бизнес-логику и для внешнего интерфейса. Вот где я хотите заменить попадания из другого приложения (потребителя) в базу данных это дебезиум и кафка часть.
  • или есть ли способ сделать это.

1 Ответ

0 голосов
/ 21 мая 2019

Спасибо всем.Выше проблема решена.Актуальная проблема с настройкой в ​​удаленной БД.Есть несколько других зависимостей, которые не установлены должным образом, такие как postgis, protobuf-c, decoderbufs после того, как правильно установленная проблема решена.

...