Обработка потоков данных в кафке - PullRequest
0 голосов
/ 08 апреля 2019

Я пытаюсь настроить потребителя kafka для обработки данных из потоков Kafka.Мне удалось установить соединение с потоком, и данные видны, но это смесь специальных символов и ASCII.

Я использую встроенную консоль kafka, но также попробовал версию слияния на Python.Кафка.Единственными параметрами, которые необходимо соблюдать, является использование протокола безопасности SASL_PLAINTEXT с SCRAM-SHA-256.Я открыт для использования других методов, чтобы также анализировать вывод (не Java, если это возможно).

Kafka Console

bin/kafka-console-consumer.sh --bootstrap-server server:9092 \
--topic TOPIC --from-beginning --consumer.config=consumer.properties

Confluent Kafka Python

topics = "TOPIC"
conf = {
        "bootstrap.servers": "server:9092",
        "group.id": "group",
        "security.protocol": "SASL_PLAINTEXT",
        "sasl.mechanisms" : "SCRAM-SHA-256",
}
c = Consumer(conf)
c.subscribe([topics])
running = True
while running:
        message = c.poll()
        print(message.value())
c.close()

Выход

PLE9K1PKH3S0MAY38ChangeRequest : llZYMEgVmq2CHG:Infra RequestKSUSMAINCHANGEKC-10200-FL01DATA_MISSINGCHGUSD
DATA_MISSINGDATA_MISSINGUSD
CANCEL

▒▒12SLM:Measurement"Schedule(1 = 0)USDUSD▒▒▒
                                                              l▒l▒V?▒▒▒
                                                                       llZYMEgVmq
company_team team_nameTEAM###SGP000000140381PPL000002020234
Latha M▒>▒>▒ChangeRequest
hello:1234543534 cloud abcdef▒▒▒
                                                         ▒Ի▒
                                                            ▒▒▒
                                                               John Smithjs12345SGP000000140381▒NPPL000002020234
▒Ի▒

Я пытаюсь проанализировать данные на стандартном выходеПервоначально, но ожидание в конце состоит в том, чтобы получить проанализированные данные в базе данных.Любой совет будет оценен.

Ответы [ 2 ]

0 голосов
/ 09 апреля 2019

Как уже упоминал jaivalis, между вашими производителями и потребителями, которых вы используете для загрузки данных, существует несоответствие. Kafka Streams предоставляет два свойства для управления сериализацией и десериализацией данных, проходящих через топологию; default.value.serde, default.key.serde. Я рекомендую просмотреть конфигурацию вашего потокового приложения, чтобы найти подходящий десериализатор для потребителя.

https://kafka.apache.org/documentation/#streamsconfigs

Учтите, однако, что эти serdes могут быть перезаписаны реализацией вашего потокового приложения. Обязательно ознакомьтесь с вашей реализацией, чтобы убедиться, что вы нашли правильный формат сериализации.

https://kafka.apache.org/21/documentation/streams/developer-guide/datatypes.html#overriding-default-serdes

0 голосов
/ 08 апреля 2019

Похоже, что ваши сообщения закодированы в двоичном формате.Чтобы распечатать их, вам нужно настроить двоичный декодер и пропустить их через него.Если вы создали их с использованием определенной схемы, вам также может потребоваться десериализация объектов с использованием реестра схем, который содержит схему для данной темы.Вы смотрите на что-то вроде:

message_bytes = io.BytesIO(message.value())
decoder = BinaryDecoder(message_bytes)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...