Как использовать данные в HDFS, используя Hive с клиентом Kafka-Python? - PullRequest
0 голосов
/ 22 мая 2018

Извините, если это глупый вопрос.Я новичок во всем этом материале конвейера:)

Я использовал клиент kafka-python для создания производителя, который отправляет csv (одна строка csv = одно сообщение kafka).Обратите внимание, что я сериализовал его в строку через JSON и закодировал в байтах как UTF-8.Затем я создал потребителя, который декодирует сообщения (одна строка CSV теперь является строкой) и печатает их в терминал.Теперь мне нужно сохранить эти данные в формате hdf с помощью куста.Я хочу вставить каждое сообщение в таблицу улья, а затем хочу сделать огромный выбор, чтобы получить все данные в одном файле.

Каков наилучший способ сделать это с помощью Python?

Вот что я сделал:

Сначала я запускаю сервер zookeeper:

bin/zookeeper-server-start.sh config/zookeeper.properties

Затем запускаю сервер kafka:

bin/kafka-server-start.sh config/server.properties

Затем я начинаю моего потребителя и производителя:

Мой производитель:

from kafka import KafkaClient, KafkaProducer, KafkaConsumer
import csv
import json

client = KafkaClient("localhost:9092")

producer = KafkaProducer(bootstrap_servers='localhost:9092')

with open("train.csv") as file:
    reader = csv.reader(file)
    for row in reader:
        producer.send('the_topic', json.dumps(row).encode('utf-8'))

Мой потребитель:

from kafka import KafkaConsumer
import json

consumer = KafkaConsumer('the_topic')

for msg in consumer:
    decoded_msg = msg.value.decode("utf-8")
    print(decoded_msg)
...