Напишите csv файл в тему kafka - PullRequest
1 голос
/ 27 мая 2020

У меня большой csv, и я хочу записать в kafka topi c.

def producer():
    producer = KafkaProducer(bootstrap_servers='mykafka-broker')
    with open('/home/antonis/repos/testfile.csv') as file:
        reader = csv.DictReader(file, delimiter=";")
        for row in reader:
            producer.send(topic='stable_topic', value=row)
            producer.flush()

if __name__ == '__main__':
    producer()

Этот код выдает ошибку:

AssertionError: value must be bytes

Файл выглядит так:

"timestamp","name","age"
2020-03-01 00:00:01,John,36
2020-03-01 00:00:01,Peter,22

Кто-нибудь может мне с этим помочь?

Ответы [ 2 ]

1 голос
/ 27 мая 2020

Вам необходимо правильно сериализовать ваши значения.


Следующее должно помочь:

import json  

producer = KafkaProducer(
    bootstrap_servers='mykafka-broker',
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
1 голос
/ 27 мая 2020

Вместо того, чтобы заново изобретать колесо, используйте очень хорошее, которое уже существует :) Это Kafka Connect , который является частью Apache Kafka.

Есть несколько разъемов который может читать из CSV, включая Kafka Connect spooldir (см. пример ) и Filepulse .

Подробнее о Kafka Connect в здесь говорить .

...