Я получил сообщение Кафки, используя librdkafka. Кафка сообщение в кодированном авро формате. Я использую libavro, чтобы потреблять авро сообщение. rkmessage получил от librdkafka есть ключ и key_len. Я пытаюсь получить значение "gln" из ключа. Значение "gln" приходит как ноль. Размер gln приходит как 1 (для пустой строки из-за нулевого терминатора, добавленного avro). Примеры ключей, которые я нашел под kafka topi c с помощью инструмента kafka (в шестнадцатеричном виде) 00 00 00 27 72 1A 30 30 37 38 37 34 32 30 30 38 30 31 31, 00 00 00 27 72 1A 30 30 37 38 37 34 32 30 35 36 30 34 33. Я знаю схему производителя / писателя, поэтому я использую схему считывателя так же, как писатель без какого-либо преобразователя схемы. Я пытаюсь получить значение "gln" из ключа (ключ должен быть в определенной схеме avro над). Значение "GLN" приходит как ноль. Помогите мне исправить мою проблему. не уверен, что это проблема с моим кодом или передаваемым ключом или ошибка преобразования libavro.
#define check_i(call) \
do { \
if ((call) != 0) { \
printf("avro error exiting the program with RC[%s]", avro_strerror()); \
exit(EXIT_FAILURE); \
} \
} while (0)
/*----------------------Schema--------------------------*/
#define INLGLN_SCHEMA \
"{" \
" \"namespace\": \"com.xyz.sim.events\"," \
" \"type\": \"record\"," \
" \"name\": \"InlGln\"," \
" \"fields\": [" \
" {" \
" \"name\": \"gln\"," \
" \"type\": \"string\"" \
" }" \
" ]" \
"}"
int parsefunc1(rd_kafka_message_t *rkmessage)
{
/*
typedef struct rd_kafka_message_s {
void *key;
size_t key_len;
} rd_kafka_message_t;
*/
avro_schema_t inlGLNSchema = NULL;
avro_schema_error_t error;
avro_value_iface_t *reader_class;
avro_value_t val;
avro_value_t field;
avro_type_t type;
avro_reader_t nested_reader;
const char *GLN = NULL;
size_t size = 0;
size_t field_count = 0;
GLN = (const char *)malloc(rkmessage->key_len +1);
if( GLN == NULL )
{
printf("memory allocation failed");
}
if (avro_schema_from_json(INLGLN_SCHEMA, sizeof(INLGLN_SCHEMA), &inlGLNSchema, &error))
{
printf("unable to parse inlGLNSchema\n");
exit(EXIT_FAILURE);
}
reader_class = avro_generic_class_from_schema(inlGLNSchema);
check_i(avro_generic_value_new(reader_class, &val));
/*Send the memory in the buffer into the reader*/
nested_reader = avro_reader_memory((rkmessage->key), rkmessage->key_len );
if(nested_reader == NULL)
{
printf("NULL reader. exiting\n");
exit(1);
}
check_i(avro_value_read( nested_reader, &val ));
type = avro_value_get_type(&val);
if ( type == AVRO_RECORD )
{
printf("type is AVRO_RECORD\n");
}
check_i(avro_value_get_size(&val, &field_count));
printf("val record field count is:%d\n", field_count);
/*check_i(avro_value_get_by_index(&val, 0, &field, NULL));*/
check_i(avro_value_get_by_name(&val, "gln", &field, NULL));
type = avro_value_get_type(&field);
if ( type == AVRO_STRING )
printf("type is AVRO_STRING\n");
}
check_i(avro_value_get_string(&field, &GLN, &size));
printf("string size is:%d\n", size);
printf("GLN is:%s\n", GLN);
avro_value_decref( &val );
avro_value_iface_decref( reader_class );
avro_reader_free( nested_reader );
}