Использование RDKAFKA Назначение нескольких идентификаторов групп нескольким потребителям - PullRequest
0 голосов
/ 10 сентября 2018

Я использовал RDKAFKA в php для параллельной работы потребителя.

Но первый потребитель принял все сообщения из темы, поэтому второй потребитель не получил сообщения темы.

Таким образом, я использовал разные идентификаторы группы для разных потребителей, но проблема остается такой же.

Пожалуйста, помогите мне. $ conf-> set ('group.id', 'myConsumerGroup'); $ conf-> set ('metadata.broker.list', '127.0.0.2'); $ topicConf = new RdKafka \ TopicConf (); $ topicConf-> set ('auto.offset.reset', 'smallle'); $ conf-> setDefaultTopicConf ($ topicConf); $ consumer1 = new RdKafka \ KafkaConsumer ($ conf); $ consumer1-> подписаться ([ 'DBTEST']); while (true) { $ j = 0; $ сообщение = $ потребитель1-> потреблять (120 * 1000);

switch ($ message-> err) { case RD_KAFKA_RESP_ERR_NO_ERROR: $ dataEx = json_decode ($ message-> payload, true); var_dump ($ данных); $ sql = "INSERT INTO emp (name, email) VALUES ('". $ dataEx [' name ']. "', '". $ dataEx [' email ']. "')";

    `$`servername = "localhost";
    `$`username = "A";
    `$`password = "ASD";
    `$`dbname = "test";
    `$`conn = new mysqli(`$`servername, `$`username, `$`password, `$`dbname);
    if (`$`conn->connect_error) {
        die("Connection failed: " . `$`conn->connect_error);
    }

    if (`$`conn->query(`$`sql) === TRUE) {
        echo "New record created successfully to datbase ".`$`dbname."/n";
    } else {
        echo "Error: " . `$`sql . "<br>" . `$`conn->error;
    }
    `$`conn->close();
    echo "produced `$`j----------------------<br> ";
    break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
    echo "No more messages; will wait for more\n";
    break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
    echo "Timed out\n";
    break;
default:
    throw new \Exception(`$`message->errstr(), `$`message->err);
    break;
}
$j++;

} эхо "здесь"; $ conf-> set ('group.id', 'myConsumerGroup1'); $ conf-> set ('metadata.broker.list', '127.0.0.2'); $ topicConf = new RdKafka \ TopicConf (); $ topicConf-> set ('auto.offset.reset', 'smallle'); $ conf-> setDefaultTopicConf ($ topicConf);

$ consumer2 = new RdKafka \ KafkaConsumer ($ conf); эхо "сонали"; $ consumer2-> подписаться ([ 'DBTEST']); while (true) { $ message2 = $ потребитель2-> потреблять (120 * 1000); switch ($ message2-> err) { case RD_KAFKA_RESP_ERR_NO_ERROR: $ dataEx = json_decode ($ message2-> payload, true); `` $ sql = "INSERT INTO emp (name, email) VALUES ('". $ dataEx [' name ']. "', '". $ dataEx [' email ']. "')"; $ servername1 = "localhost"; $ username1 = "A"; $ password1 = "ASD"; $ dbname1 = "test1"; $ conn1 = new mysqli ($ имя_сервера1, $ имя_пользователя1, $ пароль1, $ имя_базы1); if ($ conn1-> connect_error) { die («Ошибка подключения:». $ conn1-> connect_error); }

    if (`$`conn1->query(`$`sql) === TRUE) {
        echo "New record created successfully ".`$`dbname1;
    } else {
        echo "Error: " . `$`sql . "<br>" . `$`conn1->error;
    }
    `$`conn1->close();
    echo "produced `$`j----------------------<br> ";
    break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
    echo "No more messages; will wait for more\n";
    break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
    echo "Timed out\n";
    break;
default:
    throw new \Exception(`$`message->errstr(), `$`message->err);
    break;
}
`$`j++;

}

...