Я отправляю данные CSV в Kafka topi c, используя Kafka-Python
. Данные отправляются и принимаются Потребителем успешно. Теперь я пытаюсь непрерывно передавать файл csv, любая новая запись, добавляемая в файл, должна автоматически отправляться в Kafka topi c. Любое предложение будет полезно при непрерывной потоковой передаче файла CSV
Ниже приведен мой существующий код
from kafka import KafkaProducer
import logging
from json import dumps, loads
import csv
logging.basicConfig(level=logging.INFO)
producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092', value_serializer=lambda
K:dumps(K).encode('utf-8'))
with open('C:/Hadoop/Data/Job.csv', 'r') as file:
reader = csv.reader(file, delimiter = '\t')
for messages in reader:
producer.send('Jim_Topic', messages)
producer.flush()