Передать объект json как сообщение с ключом в apache kafka - PullRequest
0 голосов
/ 25 мая 2020

У меня есть объект json с разными состояниями заказа. Я хотел пу sh это apache кафку. Последовательность имеет решающее значение, поскольку у меня есть потребительские настройки, которые потребляют данные. Я планировал использовать идентификатор заказа в качестве ключа к сообщению, так как оно будет последовательно храниться в одном разделе.

data = [{"id": 1, "orderId": "A123","status":"PLACED"},{"id": 2, "orderId": "B123","status":"PLACED"}
{"id": 3, "orderId": "A123","status":"DISPATCHED"}]

from kafka import KafkaProducer
import json
producer = KafkaProducer(bootstrap_servers=os.environ['KAFKA_BROKER_URLS'],security_protocol="SSL")
for item in data:
    d = json.dumps(item)
    future = producer.send(topic,key=item['orderId'], value=d)

Но я получаю assertionError в производителе. Я даже попробовал

d = json.dumps(item).encode('utf-8')
future = producer.send(topic,key=item['orderId'], value=d)

, также определив сериализатор значений в коде инициализации, например

producer = KafkaProducer(bootstrap_servers=os.environ['KAFKA_BROKER_URLS'],security_protocol="SSL",value_serializer=lambda v: json.dumps(v).encode('utf-8'))

Все они дают одну и ту же ошибку. Если я удалю ключ и передам данные вроде

d = json.dumps(item).encode('utf-8')    
future = producer.send(topic,d)

, это будет работать, и сообщение будет отправлено в kafka и отобразится на стороне потребителя. Но я хочу передать ключ для поддержания последовательности того же идентификатора заказа. Как решить?

Обновление 1:

Вот ошибка:

{
  "errorType": "AssertionError",
  "stackTrace": [
    "  File \"/var/task/producer.py\", line 56, future = producer.send(topic,key=item['orderId'], value=d)\n",
    "  File \"/opt/python/lib/python3.7/site-packages/kafka/producer/kafka.py\", line 572, in send\n    assert type(key_bytes) in (bytes, bytearray, memoryview, type(None))\n"
  ]
}
...