Извините, если это глупый вопрос.Я новичок во всем этом материале конвейера:)
Я использовал клиент kafka-python для создания производителя, который отправляет csv (одна строка csv = одно сообщение kafka).Обратите внимание, что я сериализовал его в строку через JSON и закодировал в байтах как UTF-8.Затем я создал потребителя, который декодирует сообщения (одна строка CSV теперь является строкой) и печатает их в терминал.Теперь мне нужно сохранить эти данные в формате hdf с помощью куста.Я хочу вставить каждое сообщение в таблицу улья, а затем хочу сделать огромный выбор, чтобы получить все данные в одном файле.
Каков наилучший способ сделать это с помощью Python?
Вот что я сделал:
Сначала я запускаю сервер zookeeper:
bin/zookeeper-server-start.sh config/zookeeper.properties
Затем запускаю сервер kafka:
bin/kafka-server-start.sh config/server.properties
Затем я начинаю моего потребителя и производителя:
Мой производитель:
from kafka import KafkaClient, KafkaProducer, KafkaConsumer
import csv
import json
client = KafkaClient("localhost:9092")
producer = KafkaProducer(bootstrap_servers='localhost:9092')
with open("train.csv") as file:
reader = csv.reader(file)
for row in reader:
producer.send('the_topic', json.dumps(row).encode('utf-8'))
Мой потребитель:
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer('the_topic')
for msg in consumer:
decoded_msg = msg.value.decode("utf-8")
print(decoded_msg)