Использование вашего Docker Compose Я вижу эту ошибку в рабочем журнале Kafka Connect при создании соединителя:
Caused by: org.postgresql.util.PSQLException: ERROR: could not access file "pgoutput": No such file or directory
at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2505)
at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2241)
at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:310)
at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:447)
at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:368)
at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:309)
at org.postgresql.jdbc.PgStatement.executeCachedSql(PgStatement.java:295)
at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:272)
at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:267)
at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.createReplicationSlot(PostgresReplicationConnection.java:288)
at io.debezium.connector.postgresql.PostgresConnectorTask.start(PostgresConnectorTask.java:126)
... 9 more
Это также отражается в статусе задачи, если вы используете Kafka Connect REST API для запроса:
curl -s "http://localhost:8083/connectors?expand=info&expand=status" | jq '."test-connector".status'
{
"name": "test-connector",
"connector": {
"state": "RUNNING",
"worker_id": "192.168.16.5:8083"
},
"tasks": [
{
"id": 0,
"state": "FAILED",
"worker_id": "192.168.16.5:8083",
"trace": "org.apache.kafka.connect.errors.ConnectException: org.postgresql.util.PSQLException: ERROR: could not access file \"pgoutput\": No such file or directory\n\tat io.debezium.connector.postgresql.PostgresConnectorTask.start(PostgresConnectorTask.java:129)\n\tat io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:49)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:208)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:834)\nCaused by: org.postgresql.util.PSQLException: ERROR: could not access file \"pgoutput\": No such file or directory\n\tat org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2505)\n\tat org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2241)\n\tat org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:310)\n\tat org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:447)\n\tat org.postgresql.jdbc.PgStatement.execute(PgStatement.java:368)\n\tat org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:309)\n\tat org.postgresql.jdbc.PgStatement.executeCachedSql(PgStatement.java:295)\n\tat org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:272)\n\tat org.postgresql.jdbc.PgStatement.execute(PgStatement.java:267)\n\tat io.debezium.connector.postgresql.connection.PostgresReplicationConnection.createReplicationSlot(PostgresReplicationConnection.java:288)\n\tat io.debezium.connector.postgresql.PostgresConnectorTask.start(PostgresConnectorTask.java:126)\n\t... 9 more\n"
}
],
"type": "source"
Используемая версия Postgres:
postgres=# SHOW server_version;
server_version
----------------
9.6.16
pgoutput
доступен только> = версия 10.
Я изменил ваш Docker Compose для использования версии 10:
image: debezium/postgres:10
После отскока стека для чистого запуска и следуя вашим инструкциям, я получаю соединитель, который работает:
curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
column -s : -t| sed 's/\"//g'| sort
source | test-connector | RUNNING | RUNNING | io.debezium.connector.postgresql.PostgresConnector
и данные в топике Кафки c:
$ docker exec kafkacat kafkacat -b kafka:9092 -t postgres.public.mytable -C
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"postgres.public.mytable.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"postgres.public.mytable.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"postgres.public.mytable.Envelope"},"payload":{"before":null,"after":{"id":1,"name":"Tom Cat"},"source":{"version":"1.0.0.Final","connector":"postgresql","name":"postgres","ts_ms":1579172192292,"snapshot":"false","db":"test","schema":"public","table":"mytable","txId":561,"lsn":24485520,"xmin":null},"op":"c","ts_ms":1579172192347}}% Reached end of topic postgres.public.mytable [0] at offset 1
Я добавил kafkacat в ваш Docker Составить с:
kafkacat:
image: edenhill/kafkacat:1.5.0
container_name: kafkacat
entrypoint:
- /bin/sh
- -c
- |
while [ 1 -eq 1 ];do sleep 60;done
Редактировать : сохранение предыдущего ответа, поскольку оно все еще полезно и актуально:
Дебезиум запишет сообщение в topi c на основе имени таблицы . В вашем примере это будет postgres.test.mytable
.
Вот почему kafkacat
полезен, потому что вы можете запустить
kafkacat -b broker:9092 -L
, чтобы увидеть список всех ваших тем и разделов. Как только у вас есть topi c
kafkacat -b broker:9092 -t postgres.test.mytable -C
для чтения из него.
Проверьте подробности по kafkacat , включая инструкции по запуску с Docker
Также есть демонстрационная версия все в действии с Docker Составьте здесь