librdkafka: rd_kafka_assignment возвращает смещение -1001 для назначенных разделов - PullRequest
1 голос
/ 23 января 2020

Когда я запрашиваю у моего потребителя назначенный Topi c Список разделов, все разделы в результате имеют смещение -1001. Если я распечатываю смещение полученного сообщения, смещение устанавливается на правильное значение.

Это код, который я использовал для приема сообщений:

static void print_partition_list(FILE* fp,
    const rd_kafka_topic_partition_list_t
    * partitions) {
    int i;
    for (i = 0; i < partitions->cnt; i++) {
        fprintf(fp, "%s %s [%d] offset %lld",
            i > 0 ? "," : "",
            partitions->elems[i].topic,
            partitions->elems[i].partition,
            partitions->elems[i].offset);
    }
    fprintf(fp, "\n");

}

static void rebalance_cb(rd_kafka_t* rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t* partitions, void* opaque) {
    fprintf(stderr, "%% Consumer group rebalanced: ");
    switch (err) {
    case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
        fprintf(stderr, "assigned:\n");
        print_partition_list(stderr, partitions);
        rd_kafka_assign(rk, partitions);
        break;
    case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
        fprintf(stderr, "revoked:\n");
        print_partition_list(stderr, partitions);
        rd_kafka_assign(rk, NULL);
        break;
    default:
        fprintf(stderr, "failed: %s\n", rd_kafka_err2str(err));
        rd_kafka_assign(rk, NULL);
        break;
    }
}

int main()
{

    rd_kafka_t* rk;
    rd_kafka_conf_t* conf;
    rd_kafka_resp_err_t err;

    char errstr[512];
    const char* brokers{ "localhost:9092" };
    const char* groupid{ "OffsetTest" };
    const char* topics[] = { "OffsetTesting" };

    rd_kafka_topic_partition_list_t* subscription;

    conf = rd_kafka_conf_new();

    if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers,
        errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
        fprintf(stderr, "%s\n", errstr);
        rd_kafka_conf_destroy(conf);
        return 1;
    }

    if (rd_kafka_conf_set(conf, "group.id", groupid,
        errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
        fprintf(stderr, "%s\n", errstr);
        rd_kafka_conf_destroy(conf);
        return 1;
    }

    if (rd_kafka_conf_set(conf, "enable.auto.commit", "false",
        errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
        fprintf(stderr, "%s\n", errstr);
        rd_kafka_conf_destroy(conf);
        return 1;
    }

    if (rd_kafka_conf_set(conf, "auto.offset.reset", "earliest",
        errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
        fprintf(stderr, "%s\n", errstr);
        rd_kafka_conf_destroy(conf);
        return 1;
    }

    if (rd_kafka_conf_set(conf, "offset.store.method", "broker",
        errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
        fprintf(stderr, "%s\n", errstr);
        rd_kafka_conf_destroy(conf);
        return 1;
    }

    rd_kafka_conf_set_rebalance_cb(conf, rebalance_cb);

    rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
    if (!rk) {
        fprintf(stderr, "%% Failed to create new consumer: %s\n", errstr);
        return 1;
    }


    conf = NULL;

    rd_kafka_poll_set_consumer(rk);

    subscription = rd_kafka_topic_partition_list_new(1);
    rd_kafka_topic_partition_list_add(subscription, topics[0], RD_KAFKA_PARTITION_UA);

    err = rd_kafka_subscribe(rk, subscription);
    if (err) {
        fprintf(stderr,
            "%% Failed to subscribe to %d topics: %s\n",
            subscription->cnt, rd_kafka_err2str(err));
        rd_kafka_topic_partition_list_destroy(subscription);
        rd_kafka_destroy(rk);
        return 1;
    }

    fprintf(stderr,
        "%% Subscribed to %d topic(s), "
        "waiting for rebalance and messages...\n",
        subscription->cnt);

    rd_kafka_topic_partition_list_destroy(subscription);

    int runningCounter = 0;

    while (runningCounter != 10) {
        rd_kafka_message_t* rkm;

        rkm = rd_kafka_consumer_poll(rk, 100);
        if (!rkm) {
            Sleep(2000);
            runningCounter++;
            continue;
        }
        if (rkm->err) {
            fprintf(stderr,
                "%% Consumer error: %s\n",
                rd_kafka_message_errstr(rkm));
            rd_kafka_message_destroy(rkm);
            continue;
        }

        rd_kafka_topic_partition_list_t* list;
        err = rd_kafka_assignment(rk, &list);

        if (err) {
            fprintf(stderr,
                "%% Failed to subscribe to %d topics: %s\n",
                subscription->cnt, rd_kafka_err2str(err));
            rd_kafka_topic_partition_list_destroy(subscription);
            return 1;
        }

        print_partition_list(stderr, list);

        rd_kafka_topic_partition_list_destroy(list);

        printf("Message on %s [%d] at offset %lld:\n",
            rd_kafka_topic_name(rkm->rkt), rkm->partition,
            rkm->offset);

        if (rkm->key)
            printf(" Key: %.*s\n",
            (int)rkm->key_len, (const char*)rkm->key);
        else if (rkm->key)
            printf(" Key: (%d bytes)\n", (int)rkm->key_len);

        if (rkm->payload)
            printf(" Value: %.*s\n",
            (int)rkm->len, (const char*)rkm->payload);
        else if (rkm->key)
            printf(" Value: (%d bytes)\n", (int)rkm->len);

        rd_kafka_commit_message(rk, rkm, 0);

        rd_kafka_message_destroy(rkm);

        runningCounter++;
    }

    fprintf(stderr, "%% Closing consumer\n");
    rd_kafka_consumer_close(rk);

    rd_kafka_destroy(rk);

    return 0;

}

Я знаю, что есть ответ на аналогичный вопрос здесь LibRdKafka: commited_offset всегда в -1001 , но это не помогает. Я назначаю список разделов Topi c для получателя в rebalance_cb.

Обновление:

Это вывод для примера 2 Сообщения:

> %4|1580198390.566|CONFWARN|rdkafka#consumer-1| [thrd:app]: Configuration property offset.store.method is deprecated: Offset commit store method: 'file' - DEPRECATED: local file store (offset.store.path, et.al), 'broker' - broker commit store (requires Apache Kafka 0.8.2 or later on the broker).

> % Subscribed to 1 topic(s), waiting for rebalance and messages...

> % Consumer group rebalanced: assigned:
> NewTestingTopic [0] offset -1001, NewTestingTopic [1] offset -1001, 
> NewTestingTopic [2] offset -1001, NewTestingTopic [3] offset -1001
> NewTestingTopic [0] offset -1001, NewTestingTopic [1] offset -1001, 
> NewTestingTopic [2] offset -1001, NewTestingTopic [3] offset -1001
> 
> Message on NewTestingTopic [0] at offset 25:
> Key: 0
> Value: ExampleMessage 0
> 
> NewTestingTopic [0] offset -1001, NewTestingTopic [1] offset -1001, 
> NewTestingTopic [2] offset -1001, NewTestingTopic [3] offset -1001
> 
> Message on NewTestingTopic [3] at offset 41:
> Key: 1
> Value: ExampleMessage 1

1 Ответ

0 голосов
/ 01 февраля 2020

Я полагаю, что это может быть умышленно.

Метод rd_kafka_assignment() возвращает назначение, предоставленное с помощью rd_kafka_assign(). Когда Потребителю назначаются разделы внутри группы, назначением является только список разделов, смещения нет.

Аналогично в библиотеке Java, assignment() возвращает Set<TopicPartition>, здесь также нет смещений. В librdkafka rd_kafka_assignment() дает rd_kafka_topic_partition_list_t, что аналогично Set<TopicPartition>. Основное отличие заключается в том, что он использует тип rd_kafka_topic_partition_t, который имеет несколько дополнительных полей, таких как offset.

. Тип rd_kafka_topic_partition_t используется во многих местах и ​​во всех его полях. не имеет смысла во всех контекстах. Это относится к контексту присваивания, поэтому для некоторых полей заданы «пустые» значения, а для смещения это -1001.

Если вы хотите получить текущие смещения для присваивания, вам необходимо использовать rd_kafka_position(). Аналогично, в Java вы будете использовать position().

...