Потоковая передача данных CSV в Kafka- Python - PullRequest
0 голосов
/ 17 июня 2020

Я отправляю данные CSV в Kafka topi c, используя Kafka-Python. Данные отправляются и принимаются Потребителем успешно. Теперь я пытаюсь непрерывно передавать файл csv, любая новая запись, добавляемая в файл, должна автоматически отправляться в Kafka topi c. Любое предложение будет полезно при непрерывной потоковой передаче файла CSV

Ниже приведен мой существующий код

   from kafka import KafkaProducer
   import logging
   from json import dumps, loads
   import csv
   logging.basicConfig(level=logging.INFO)


   producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092', value_serializer=lambda 
   K:dumps(K).encode('utf-8'))

   with open('C:/Hadoop/Data/Job.csv', 'r') as file:
   reader = csv.reader(file, delimiter = '\t')
       for messages in reader:
       producer.send('Jim_Topic', messages)
       producer.flush()

1 Ответ

0 голосов
/ 17 июня 2020

Kafka Connect (часть Apache Kafka) - это хороший способ выполнять загрузку и передачу данных между Kafka и другими системами, включая плоские файлы.

Вы можете использовать Kafka Connect SpoolDir коннектор для потоковой передачи файлов CSV в Kafka. Установите его из Confluent Hub , а затем предоставьте ему конфигурацию для исходного файла:

curl -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/source-csv-spooldir-00/config \
    -d '{
        "connector.class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector",
        "topic": "orders_spooldir_00",
        "input.path": "/data/unprocessed",
        "finished.path": "/data/processed",
        "error.path": "/data/error",
        "input.file.pattern": ".*\\.csv",
        "schema.generation.enabled":"true",
        "csv.first.row.as.header":"true"
        }'

См. этот блог для получения дополнительных примеров и деталей.

...