Изменения потока от Postgres с использованием дебезия
настройка, которая сделана:
- Настройка Docker.
- Запустил Postgres, zookeeper, kafka, а затем и разъем debezium.
- Удаленная настройка базы данных с помощью decoderbufs, wal2json (postgres).
- подключение к дебезию с завитком.
- создал наблюдателя.
Проблема: когда я запускаю watcher, он читает все изменения, которые произошли ранее, но когда любая сделанная вставка, kafka выдает исключение для debezium, говоря "An exception occurred in the change event producer. This connector will be stopped.
", а в watcher ничего не показывает.
Поскольку я очень новичок в этих понятиях, не могу понять, что я упустил при настройке среды, и это мой первый вопрос по переполнению стека, пожалуйста, игнорируйте мои ошибки.
Основная проблема в том, что он работает нормально, моя локальная база данных.
Может ли кто-нибудь помочь в этом?
Заранее спасибо
019-05-02 14:09:47,242 WARN Postgres|kafkaserver|records-stream-producer Closing replication stream due to db connection IO exception... [io.debezium.connector.postgresql.RecordsStreamProducer]
2019-05-02 14:09:47,365 INFO || WorkerSourceTask{id=kafka-public-connector-0} Committing offsets [org.apache.kafka.connect.runtime.WorkerSourceTask]
2019-05-02 14:09:47,366 INFO || WorkerSourceTask{id=kafka-public-connector-0} flushing 0 outstanding messages for offset commit [org.apache.kafka.connect.runtime.WorkerSourceTask]
2019-05-02 14:09:47,375 ERROR || WorkerSourceTask{id=kafka-public-connector-0} Task threw an uncaught and unrecoverable exception [org.apache.kafka.connect.runtime.WorkerTask]
org.apache.kafka.connect.errors.ConnectException: An exception ocurred in the change event producer. This connector will be stopped.
at io.debezium.connector.base.ChangeEventQueue.throwProducerFailureIfPresent(ChangeEventQueue.java:170)
at io.debezium.connector.base.ChangeEventQueue.poll(ChangeEventQueue.java:151)
at io.debezium.connector.postgresql.PostgresConnectorTask.poll(PostgresConnectorTask.java:161)
at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:245)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:221)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.postgresql.util.PSQLException: Database connection failed when reading from copy
at org.postgresql.core.v3.QueryExecutorImpl.readFromCopy(QueryExecutorImpl.java:1037)
at org.postgresql.core.v3.CopyDualImpl.readFromCopy(CopyDualImpl.java:41)
at org.postgresql.core.v3.replication.V3PGReplicationStream.receiveNextData(V3PGReplicationStream.java:155)
at org.postgresql.core.v3.replication.V3PGReplicationStream.readInternal(V3PGReplicationStream.java:124)
at org.postgresql.core.v3.replication.V3PGReplicationStream.read(V3PGReplicationStream.java:70)
at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.read(PostgresReplicationConnection.java:251)
at io.debezium.connector.postgresql.RecordsStreamProducer.streamChanges(RecordsStreamProducer.java:134)
at io.debezium.connector.postgresql.RecordsStreamProducer.lambda$start$0(RecordsStreamProducer.java:120)
... 5 more
Caused by: java.io.EOFException
at org.postgresql.core.PGStream.receiveChar(PGStream.java:308)
at org.postgresql.core.v3.QueryExecutorImpl.processCopyResults(QueryExecutorImpl.java:1079)
at org.postgresql.core.v3.QueryExecutorImpl.readFromCopy(QueryExecutorImpl.java:1035)
... 12 more
2019-05-02 14:09:47,387 ERROR || WorkerSourceTask{id=kafka-public-connector-0} Task is being killed and will not recover until manually restarted [org.apache.kafka.connect.runtime.WorkerTask]
- Есть ли полное руководство по решению этой проблемы?
- Основная цель - у меня есть БД с огромными данными, где один
приложение (производитель) получает данные с другого сервера и сохраняет
все данные в нашей собственной БД и другое приложение (потребитель), чтобы получить
хранить и применять бизнес-логику и для внешнего интерфейса. Вот где я
хотите заменить попадания из другого приложения (потребителя) в базу данных
это дебезиум и кафка часть.
- или есть ли способ сделать это.