Создание раздела для topi c в кафке-узле - PullRequest
2 голосов
/ 07 января 2020

У меня есть созданный HighLevelProducer для публикации sh сообщений в поток topi c, который будет использоваться ConsumerGroupStream с использованием kafka-узла. Когда я создаю несколько потребителей из одной ConsumerGroup для потребления из одной и той же топики c, создается только один раздел, и потребляет только один потребитель. Я также попытался определить количество разделов для этой топи c, хотя я не уверен, требуется ли ее определение при создании топи c и, если да, то сколько разделов мне понадобится заранее. Кроме того, возможно ли поместить sh объект в поток Transform, а не строку (в настоящее время я использовал JSON .stringify, потому что в противном случае я получил [Object object] в приемнике.

const myProducerStream = ({ kafkaHost, highWaterMark, topic }) => {
    const kafkaClient = new KafkaClient({ kafkaHost });
    const producer = new HighLevelProducer(kafkaClient);
    const options = {
        highWaterMark,
        kafkaClient,
        producer
    }; 

    kafkaClient.refreshMetadata([topic], err => {
        if (err) throw err; 
    }); 

    return new ProducerStream(options);
};

const transfrom = topic => new Transform({
    objectMode: true,
    decodeStrings: true,
    transform(obj, encoding, cb) {
        console.log(`pushing message ${JSON.stringify(obj)} to topic "${topic}"`);

        cb(null, {
            topic,
            messages: JSON.stringify(obj)
        });
    }
});

const publisher = (topic, kafkaHost, highWaterMark) => {
    const myTransfrom = transfrom(topic);
    const producer = myProducerStream({ kafkaHost, highWaterMark, topic });

    myTransfrom.pipe(producer);

    return myTransform;
};

Потребитель:


const createConsumerStream = (sourceTopic, kafkaHost, groupId) => {
    const consumerOptions = {
        kafkaHost,
        groupId,
        protocol: ['roundrobin'],
        encoding: 'utf8',
        id: uuidv4(),
        fromOffset: 'latest',
        outOfRangeOffset: 'earliest',
    };

    const consumerGroupStream = new ConsumerGroupStream(consumerOptions, sourceTopic);

    consumerGroupStream.on('connect', () => {
        console.log(`Consumer id: "${consumerOptions.id}" is connected!`);
    });

    consumerGroupStream.on('error', (err) => {
        console.error(`Consumer id: "${consumerOptions.id}" encountered an error: ${err}`);
    });

    return consumerGroupStream; 
};

const publisher = (func, destTopic, consumerGroupStream, kafkaHost, highWaterMark) => { 
    const messageTransform = new AsyncMessageTransform(func, destTopic);

    const resultProducerStream = myProducerStream({ kafkaHost, highWaterMark, topic: destTopic })

    consumerGroupStream.pipe(messageTransform).pipe(resultProducerStream);
}; 

1 Ответ

2 голосов
/ 07 января 2020

По первому вопросу: максимальное количество работающих потребителей в группе равно количеству разделов.

Таким образом, если у вас есть TopicA с 1 разделом и в вашей группе потребителей 5 потребителей, 4 из них будут простаивать.

Если у вас TopicA с 5 разделами и у вас 5 потребителей в ваша группа потребителей, все они будут активными и потребляющими сообщения от вашего topi c.

Чтобы указать количество разделов, вы должны создать topi c из CLI, а не ожидать, что Kafka создаст его когда вы впервые публикуете sh сообщений.

Чтобы создать топи c с указанным c числом разделов:

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3 --topic test

Чтобы изменить количество разделов в уже Существующие топи c:

bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic test 
       --partitions 40 

Обратите внимание, что вы можете только увеличивать количество разделов, но не можете их уменьшать.

Пожалуйста, обратитесь к Документам Кафки https://kafka.apache.org/documentation.html

Также, если вы хотите узнать больше о Кафке, пожалуйста, проверьте бесплатную книгу https://www.confluent.io/resources/kafka-the-definitive-guide/

...