Просто передайте полезную нагрузку (двоичную) и длину сообщения Янссону, например:
/* For each consumed message .. */
rd_kafka_message_t *rkmessage = rd_kafka_consumer_poll(consumer, 1000);
if (!rkmessage) {
/* No message available */
continue;
} else if (rkmessage->err) {
/* Handle consumer event/error (typically not fatal) */
handle_consumer_error(rkmessage->err, "%s", rd_kafka_message_errstr(rkmessage));
} else if (rkmessage->len > 0) {
/* Parse JSON value */
json_error_t err;
json_t *json = json_loadb(rkm->payload, rkm->len, JSON_ALLOW_NUL, &err);
if (!json) {
handle_consumer_error(RD_KAFKA_RESP_ERR__BAD_MSG,
"Failed to parse JSON for message at %s [%"PRId32"] offset %"PRIu64: line %d: %s\n",
rd_kafka_topic_name(rkmessage->rkt), rkmessage->partition),
rkmessage->offset, err.line, err.text);
} else {
/* Process the message */
process_message(json);
json_decref(json);
}
}
rd_kafka_message_destroy(rkmessage);
Для получения дополнительной информации см. Документы Янссона: https://jansson.readthedocs.io/en/latest/tutorial.html