Я пытаюсь использовать несколько брокеров kafka, когда один распределенный режим меня не устраивает. Но у меня возникает другая проблема при регистрации разъемов Кафки Postgresql. Он всегда отправлял мне ошибку 500 при регистрации. Вот моя распределенная конфигурация kafka connect:
bootstrap.servers=localhost:9093,localhost:9094,localhost:9095
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.topic=connect-offsets
offset.storage.replication.factor=3
#offset.storage.partitions=25
config.storage.topic=connect-configs
config.storage.replication.factor=3
status.storage.topic=connect-status
status.storage.replication.factor=3
#status.storage.partitions=5
offset.flush.interval.ms=10000
#rest.host.name= localhost
rest.port=8083
#rest.advertised.host.name= localhost
#rest.advertised.port=8082
plugin.path=/home/admin/kafka/connectors
, и вот как я определяю один из разъемов:
{
"name":"pg_connect_admin_analytics_menu",
"config":{
"connector.class":"io.debezium.connector.postgresql.PostgresConnector",
"plugin.name":"wal2json",
"database.hostname":"<>",
"database.port":"<>",
"database.user":"<>",
"database.password":"<>",
"database.dbname":"<>",
"database.server.name":"<>",
"database.history.kafka.bootstrap.servers":"localhost:9093,localhost:9094,localhost:9095",
"database.history.kafka.topic":"history_admin_analytics_menu",
"table.whitelist":"public.<topics>",
"transforms":"unwrap",
"transforms.unwrap.type":"io.debezium.transforms.UnwrapFromEnvelope",
"decimal.handling.mode":"double",
"time.precision.mode":"connect",
"transforms":"dropPrefix",
"transforms.dropPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.dropPrefix.regex":"postgres.public.(.*)",
"transforms.dropPrefix.replacement":"<>.dbo.$1"
}
}
Есть ли что-то, чего я пропустил?
ОБНОВЛЕНИЕ:
Вот что я получил от kafka-connect:
[2020-03-21 12:25:29,390] ERROR Uncaught exception in REST call to /connectors (org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper:61)
org.apache.kafka.connect.errors.ConnectException: Could not create PG connection
at io.debezium.connector.postgresql.TypeRegistry.prime(TypeRegistry.java:398)
at io.debezium.connector.postgresql.TypeRegistry.<init>(TypeRegistry.java:122)
at io.debezium.connector.postgresql.connection.PostgresConnection.<init>(PostgresConnection.java:72)
at io.debezium.connector.postgresql.PostgresConnector.validate(PostgresConnector.java:102)
at org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:313)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$6.call(DistributedHerder.java:745)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$6.call(DistributedHerder.java:742)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:342)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:282)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.postgresql.util.PSQLException: Connection to localhost:15432 refused. Check that the hostname and port are correct and that the postmaster is accepting TCP/IP connections.
at org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:280)
at org.postgresql.core.ConnectionFactory.openConnection(ConnectionFactory.java:49)
at org.postgresql.jdbc.PgConnection.<init>(PgConnection.java:211)
at org.postgresql.Driver.makeConnection(Driver.java:458)
at org.postgresql.Driver.connect(Driver.java:260)
at io.debezium.jdbc.JdbcConnection.lambda$patternBasedFactory$1(JdbcConnection.java:191)
at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:789)
at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:784)
at io.debezium.connector.postgresql.TypeRegistry.prime(TypeRegistry.java:310)
... 13 more
Caused by: java.net.ConnectException: Connection refused (Connection refused)
at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:399)
at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:242)
at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:224)
at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:403)
at java.base/java.net.Socket.connect(Socket.java:609)
at org.postgresql.core.PGStream.<init>(PGStream.java:75)
at org.postgresql.core.v3.ConnectionFactoryImpl.tryConnect(ConnectionFactoryImpl.java:91)
at org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:192)
... 21 more
и от разъемов curl:
{"error_code":500,"message":"Could not create PG connection"}