Итак, у меня установлен разъем Confluent Kafka JDBC.Сначала я запускаю реестр схемы, например
./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties
Это файл schema-registery.properties
listeners=http://0.0.0.0:8081
kafkastore.connection.url=zookeeperhost:2181
kafkastore.bootstrap.servers=PLAINTEXT://kafkahost:9092
kafkastore.topic=_schemas
debug=false
Затем я запускаю автономный соединитель, подобный этому
./bin/connect-standalone ./etc/schema-registry/connect-avro-standalone.properties ./jdbc-source.properties
connect-avro-standalone.properties:
bootstrap.servers=kafkahost:9092
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
plugin.path=share/java
jdbc-source.properties:
name=jdbc_source_oracle
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
connection.url=jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(HOST=host)(PORT=port))(CONNECT_DATA=(SERVER=dedicated)(SID=server)))
connection.user=xxx
connection.password=xxx
table.whitelist=table1, table2
mode=bulk
topic.prefix=my_topic
query=select * from table1 t1 join table1 t2 on t2.id = t1.id where t2.entereddate >='19-FEB-2019' and t2.entereddate <= '23-FEB-2019'
Я использую запрос только для целей тестирования, настоящий запросЯ хочу использовать, будет реализовывать инкрементный режим и не будет содержать предложение where.
Теперь это позволяет опубликовать данные в теме, но с некоторыми странными вещами.Сначала идентификаторы сохраняются в нечитаемом формате.Просто пустой квадрат.Во-вторых, некоторые поля, которые заполняются в базе данных, сохраняются как нулевые в теме.И в-третьих, всякий раз, когда я пытаюсь изменить дату в запросе в исходном файле JDBC, ничего не происходит.Он по-прежнему содержит те же сообщения, которые я опубликовал при первом запуске, так как ничего в теме kafka не обновляется, сколько раз я меняю запрос.
Кто-нибудь может мне помочь?
РЕДАКТИРОВАТЬ
То, что я хочу сделать, это использовать данные через код pyspark.Вот код того, как я это делаю
from pyspark.sql import SparkSession
spark = SparkSession\
.builder\
.appName("data streaming app")\
.getOrCreate()
data_raw = spark.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", "kafkahost:9092")\
.option("subscribe", "my_topic")\
.load()
query = data_raw.writeStream\
.outputMode("append")\
.format("console")\
.option("truncate", "false")\
.trigger(processingTime="5 seconds")\
.start()\
.awaitTermination()
Я также использую данные с помощью kafka-avro-console-consumer с помощью этой команды
./bin/kafka-avro-console-consumer \
--bootstrap-server kafkahost:9092 \
--property print.key=true \
--from-beginning \
--topic my_topic
Оба эти параметра даютменя странные результаты
Вот что дает мне код pyspark
и вот что дает мне использование консоли avro
Блокировка некоторых полей и текста, поскольку они могут содержать конфиденциальную информацию компании.