Сохраненные сообщения Kafka Тема не сохраняется правильно через Kafka Connector - PullRequest
0 голосов
/ 05 марта 2019

Итак, у меня установлен разъем Confluent Kafka JDBC.Сначала я запускаю реестр схемы, например

./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties

Это файл schema-registery.properties

listeners=http://0.0.0.0:8081
kafkastore.connection.url=zookeeperhost:2181
kafkastore.bootstrap.servers=PLAINTEXT://kafkahost:9092
kafkastore.topic=_schemas
debug=false

Затем я запускаю автономный соединитель, подобный этому

./bin/connect-standalone ./etc/schema-registry/connect-avro-standalone.properties ./jdbc-source.properties

connect-avro-standalone.properties:

bootstrap.servers=kafkahost:9092

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets
plugin.path=share/java

jdbc-source.properties:

name=jdbc_source_oracle
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
connection.url=jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(HOST=host)(PORT=port))(CONNECT_DATA=(SERVER=dedicated)(SID=server)))
connection.user=xxx
connection.password=xxx
table.whitelist=table1, table2
mode=bulk
topic.prefix=my_topic
query=select * from table1 t1 join table1 t2 on t2.id = t1.id where t2.entereddate >='19-FEB-2019' and t2.entereddate <= '23-FEB-2019'

Я использую запрос только для целей тестирования, настоящий запросЯ хочу использовать, будет реализовывать инкрементный режим и не будет содержать предложение where.

Теперь это позволяет опубликовать данные в теме, но с некоторыми странными вещами.Сначала идентификаторы сохраняются в нечитаемом формате.Просто пустой квадрат.Во-вторых, некоторые поля, которые заполняются в базе данных, сохраняются как нулевые в теме.И в-третьих, всякий раз, когда я пытаюсь изменить дату в запросе в исходном файле JDBC, ничего не происходит.Он по-прежнему содержит те же сообщения, которые я опубликовал при первом запуске, так как ничего в теме kafka не обновляется, сколько раз я меняю запрос.

Кто-нибудь может мне помочь?

РЕДАКТИРОВАТЬ

То, что я хочу сделать, это использовать данные через код pyspark.Вот код того, как я это делаю

from pyspark.sql import SparkSession

spark = SparkSession\
    .builder\
    .appName("data streaming app")\
    .getOrCreate()


data_raw = spark.readStream\
    .format("kafka")\
    .option("kafka.bootstrap.servers", "kafkahost:9092")\
    .option("subscribe", "my_topic")\
    .load()

query = data_raw.writeStream\
    .outputMode("append")\
    .format("console")\
    .option("truncate", "false")\
    .trigger(processingTime="5 seconds")\
    .start()\
    .awaitTermination()

Я также использую данные с помощью kafka-avro-console-consumer с помощью этой команды

./bin/kafka-avro-console-consumer \
--bootstrap-server kafkahost:9092 \
--property print.key=true \
--from-beginning \
--topic my_topic

Оба эти параметра даютменя странные результаты

Вот что дает мне код pyspark enter image description here

и вот что дает мне использование консоли avro

enter image description here

Блокировка некоторых полей и текста, поскольку они могут содержать конфиденциальную информацию компании.

1 Ответ

0 голосов
/ 05 марта 2019

Если вы потребляете Avro из Spark, вам нужно использовать правильный десериализатор .

Вы видите байты в ваших данных Avro с консоли, затем дело до обработки десятичных / числовых значений, , как подробно описано здесь .

Подробнее об альтернативах Kafka Connect и сериализации для Avro (включая JSON) можно узнать здесь .

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...