Как использовать пользовательский столбец полезной нагрузки с исходящим преобразованием на автономном Debezium? - PullRequest
1 голос
/ 22 мая 2019

Я пытаюсь запустить автономный Debezium с исходящим SMT, используя настраиваемый столбец полезной нагрузки (after) и дополнительный столбец jsonb (before), но задача выдает ошибку:

debezium           | 2019-05-21 23:07:50,267 ERROR  ||  WorkerSourceTask{id=campaigns-outbox-connector-0} Task threw an uncaught and unrecoverable exception   [org.apache.kafka.connect.runtime.WorkerTask]
debezium           | org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
debezium           |    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
debezium           |    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
debezium           |    at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
debezium           |    at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:293)
debezium           |    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:229)
debezium           |    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
debezium           |    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
debezium           |    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
debezium           |    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
debezium           |    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
debezium           |    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
debezium           |    at java.lang.Thread.run(Thread.java:748)
debezium           | Caused by: org.apache.kafka.connect.errors.DataException: payload is not a valid field name
debezium           |    at org.apache.kafka.connect.data.Struct.lookupField(Struct.java:254)
debezium           |    at org.apache.kafka.connect.data.Struct.get(Struct.java:74)
debezium           |    at io.debezium.transforms.outbox.EventRouter.apply(EventRouter.java:98)
debezium           |    at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
debezium           |    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
debezium           |    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
debezium           |    ... 11 more

Мне кажется, что debezium пытается получить столбец payload из моей таблицы campaigns.outbox, хотя я явно переопределяю столбец полезной нагрузки transforms в моих рабочих параметрах (./config/connect-campaigns-outbox.properties):

name=campaigns-outbox-connector
connector.class=io.debezium.connector.postgresql.PostgresConnector
database.hostname=postgres
database.port=5432
database.user=postgres
database.password=postgres
database.dbname=campaigns
database.server.name=campaigns_api
table.whitelist=campaigns.outbox
transforms=outbox

transforms.outbox.type=io.debezium.transforms.outbox.EventRouter

transforms.outbox.table.field.event.key=aggregate_id
transforms.outbox.table.field.event.type=type
transforms.outbox.table.field.event.payload.id=aggregate_id
transforms.outbox.table.field.event.payload=after
transforms.outbox.table.fields.additional.placement=before:envelope

transforms.outbox.route.topic.replacement=media-platform.campaigns-api.${routedByValue}
transforms.outbox.route.by.field=type

Мой ./docker-compose.yaml:

services:
  debezium:
    container_name: debezium
    image: debezium/connect:0.9
    ports:
    - 8082:8082
    volumes:
    - ./config:/kafka/config # can't use $KAFKA_HOME here
    - ./offsets:/offsets
    command:
    - sh
    - -c
    - $$KAFKA_HOME/bin/connect-standalone.sh $$KAFKA_HOME/config/connect-standalone.properties $$KAFKA_HOME/config/connect-campaigns-outbox.properties
  postgres:
    container_name: postgres
    image: 'debezium/postgres:10-alpine'
    ports:
      - 5432:5432
    environment:
      - POSTGRES_DB=campaigns
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U postgres"]
      interval: 5s
      timeout: 3s
      retries: 7
  zookeeper:
    container_name: zookeeper
    hostname: zookeeper
    image: 'confluentinc/cp-zookeeper:3.1.1'
    environment:
      - ZOOKEEPER_CLIENT_PORT=2181
  kafka:
    container_name: kafka
    hostname: kafka
    image: 'confluentinc/cp-kafka:3.1.1'
    environment:
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
  schema-registry:
    container_name: schema-registry
    hostname: schema-registry
    image: 'confluentinc/cp-schema-registry:3.1.1'
    environment:
      - SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181
      - SCHEMA_REGISTRY_HOST_NAME=schema-registry
      - SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081
    depends_on:
      - zookeeper
    ports:
      - "8081:8081"

Мой ./config/connect-standalone.properties:

bootstrap.servers=kafka:9092
key.converter=io.confluent.connect.avro.AvroConverter
value.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://schema-registry:8081
value.converter.schema.registry.url=http://schema-registry:8081
offset.storage.file.filename=/offsets/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/kafka/connect

Минимальная схема PG, необходимая для воспроизведения ошибки:

CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
CREATE TYPE campaigns.outbox_event_type AS ENUM (
    'campaign',
    'creative'
);
CREATE TABLE campaigns.outbox (
    id UUID PRIMARY KEY,
    type campaigns.outbox_event_type NOT NULL,
    aggregate_id TEXT NOT NULL,
    before JSONB,
    after JSONB
);

Воспроизвести ошибку:

insert into campaigns.outbox (id, type, aggregate_id, before, after) values (uuid_generate_v4(), 'campaign', '1', NULL, '{"id":1,"title":"teste","description":"teste description"}');

Я делаю что-то не так или я должен сообщить об этом как о проблеме команде Jboss?

1 Ответ

0 голосов
/ 23 мая 2019

Имя параметра для столбца полезной нагрузки должно быть transforms.outbox.table.field.payload вместо transforms.outbox.table.field.event.payload (см. Определение параметров в EventRouterConfigDefinition .

Я вижу, что это указано по-разному в документах, и я думаю, что код должен быть соответствующим образом скорректирован. Я позабочусь об этом.

...