Настройка FlinkKafkaConsumer group.id не работает должным образом при использовании - PullRequest
0 голосов
/ 28 февраля 2020

Я некоторое время пользуюсь flink1.10.0 и нахожу странную проблему.

Я отправляю одну и ту же работу дважды.

$ flink list -r
28.02.2020 18:04:24 : f9ad14cb86a14c388ed6a146c80988fd : ReadKafkaJob (RUNNING)
28.02.2020 18:07:23 : e05bf26ee986573ffc01af8b1f5d1d59 : ReadKafkaJob (RUNNING)

Два задания имеют один и тот же group.id, но каждое из них может читать данные. Ниже в журнале показано, что одно и то же событие потребляется дважды.

2020-02-28 18:08:29,600 INFO  com.stc.sls.stream_process.examples.ReadKafkaJob              - eventDo={"id":"odd奇数","value":0.1,"eventTs":1582884509239,"seqNo":0}
2020-02-28 18:08:29,601 INFO  com.stc.sls.stream_process.examples.ReadKafkaJob              - eventDo={"id":"odd奇数","value":0.1,"eventTs":1582884509239,"seqNo":0}
2020-02-28 18:08:34,442 INFO  com.stc.sls.stream_process.examples.ReadKafkaJob              - eventDo={"id":"odd奇数","value":0.5,"eventTs":1582884514437,"seqNo":1}
2020-02-28 18:08:34,442 INFO  com.stc.sls.stream_process.examples.ReadKafkaJob              - eventDo={"id":"odd奇数","value":0.5,"eventTs":1582884514437,"seqNo":1}
2020-02-28 18:08:39,448 INFO  com.stc.sls.stream_process.examples.ReadKafkaJob              - eventDo={"id":"odd奇数","value":0.2,"eventTs":1582884519443,"seqNo":2}
2020-02-28 18:08:39,448 INFO  com.stc.sls.stream_process.examples.ReadKafkaJob              - eventDo={"id":"odd奇数","value":0.2,"eventTs":1582884519443,"seqNo":2}

Я установил 'group.id' в коде.

String kafkaTopic = params.get("kafka-topic", "flink-test");
String brokers = params.get("brokers", "192.168.0.100:9092");
String groupId = "simple.read.kafka.job";

Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", brokers);
kafkaProps.setProperty("group.id", groupId);

FlinkKafkaConsumer<EventDo> kafka = new FlinkKafkaConsumer<>(kafkaTopic, new EventDeSerializer(), kafkaProps);

Так почему два клиента в одной группе потребляют от kafka дважды?

Доза FlinkKafkaConsumer имеет какую-то особенную реализацию?

ОБНОВЛЕНИЕ:

Я провел несколько тестов, запускает двух пользователей консоли и один мгновенный потребитель.

Если я использую kafka-console-consumer для потребления, как показано ниже, client.id = 123

kafka-console-consumer --bootstrap-server 192.168.0.100:9092 --topic flink-test --consumer-property group.id=simple.read.kafka.job --consumer-property client.id=123

и другой потребитель с client.id = 456

kafka-console-consumer --bootstrap-server 192.168.0.100:9092 --topic flink-test --consumer-property group.id=simple.read.kafka.job --consumer-property client.id=456

Затем я запускаю задание flink в IDEA, чтобы использовать topi c flink.test с group.id = "simple.read.kafka.job"

20:38:17,107 INFO  org.apache.kafka.clients.consumer.KafkaConsumer               - [Consumer clientId=cid0931c3, groupId=simple.read.kafka.job] Subscribed to partition(s): flink-test-0

Я могу проверить соединения и найти два потребителя.

➜  bin descKafkaConsumerGroup simple.read.kafka.job --members          
Java HotSpot(TM) Server VM warning: G1 GC is disabled in this release.

GROUP                 CONSUMER-ID                              HOST            CLIENT-ID       #PARTITIONS     ASSIGNMENT  
simple.read.kafka.job 123-5925201d-a767-4216-acdc-b46f058db0df /192.168.0.100  123             1               flink-test(0)
simple.read.kafka.job 456-01190de7-5d4e-43c1-9cb6-b599c9c69b41 /192.168.0.101  456             0               -

но где находится потребитель заданий Flink?

Два потребителя консолей ведут себя в одной группе, потребитель заданий Flink ведет себя в другой группе.

ОБНОВЛЕНИЕ2

У меня включена проверка. И весь код, как показано ниже.

import com.stc.sls.stream_process.examples.model.StringDeSerializer;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Properties;
import java.util.UUID;


@Slf4j
public class SimpleReadKafkaJob {

    final static String clientId = "cid" + StringUtils.remove(UUID.randomUUID().toString(), '-').substring(0, 6);

    public static void main(String[] args) throws Exception {
        final ParameterTool params = ParameterTool.fromArgs(args);

        String kafkaTopic = params.get("kafka-topic", "flink-test");
        String brokers = params.get("brokers", "192.168.0.100:9092");
        String groupId = "simple.read.kafka.job";

        System.out.printf("Reading kafka topic %s @ %s\n", kafkaTopic, brokers);
        System.out.println();

        Properties kafkaProps = new Properties();
        kafkaProps.setProperty("bootstrap.servers", brokers);
        kafkaProps.setProperty("group.id", groupId);
        kafkaProps.setProperty("client.id", clientId);

        FlinkKafkaConsumer<String> kafka = new FlinkKafkaConsumer<>(kafkaTopic, new StringDeSerializer(), kafkaProps);
        kafka.setStartFromGroupOffsets();
        kafka.setCommitOffsetsOnCheckpoints(true);

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.enableCheckpointing(20000);
        env.setStateBackend((StateBackend) new FsStateBackend("file:///Users/ym/tmp/checkpoint"));
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);


        DataStream<String> dataStream = env.addSource(kafka);
        dataStream.map((MapFunction<String, String>) s -> {
            log.info("message={}", s);
            return s;
        }).addSink(new DiscardingSink<>());

        env.execute(SimpleReadKafkaJob.class.getSimpleName());
    }


}

и войдите как показано ниже

23:00:47,643 INFO  org.apache.kafka.clients.consumer.KafkaConsumer               - [Consumer clientId=cidb30879, groupId=simple.read.kafka.job] Subscribed to partition(s): flink-test-0
23:00:47,673 INFO  org.apache.kafka.clients.Metadata                             - Cluster ID: LsItbMw1T_SHYQvJMt_6Fw
23:00:47,677 INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - [Consumer clientId=cidb30879, groupId=simple.read.kafka.job] Discovered group coordinator 192.168.0.100:9092 (id: 2147483647 rack: null)
23:01:02,160 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 1 @ 1583420462134 for job 4d97a614ef4f928b93c8159f30e9c22e.
23:01:02,572 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 1 for job 4d97a614ef4f928b93c8159f30e9c22e (1236 bytes in 437 ms).
23:01:22,136 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 2 @ 1583420482135 for job 4d97a614ef4f928b93c8159f30e9c22e.
23:01:22,147 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 2 for job 4d97a614ef4f928b93c8159f30e9c22e (1236 bytes in 10 ms).
23:01:42,139 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 3 @ 1583420502138 for job 4d97a614ef4f928b93c8159f30e9c22e.
23:01:42,151 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 3 for job 4d97a614ef4f928b93c8159f30e9c22e (1236 bytes in 12 ms).
23:02:02,138 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 4 @ 1583420522137 for job 4d97a614ef4f928b93c8159f30e9c22e.
23:02:02,147 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 4 for job 4d97a614ef4f928b93c8159f30e9c22e (1236 bytes in 9 ms).
23:02:22,141 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 5 @ 1583420542139 for job 4d97a614ef4f928b93c8159f30e9c22e.
23:02:22,152 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 5 for job 4d97a614ef4f928b93c8159f30e9c22e (1236 bytes in 12 ms).
23:02:42,138 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 6 @ 1583420562137 for job 4d97a614ef4f928b93c8159f30e9c22e.
23:02:42,149 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 6 for job 4d97a614ef4f928b93c8159f30e9c22e (1236 bytes in 11 ms).
23:03:02,140 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 7 @ 1583420582139 for job 4d97a614ef4f928b93c8159f30e9c22e.
23:03:02,148 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 7 for job 4d97a614ef4f928b93c8159f30e9c22e (1236 bytes in 8 ms).
23:03:18,031 INFO  com.stc.sls.stream_process.examples.SimpleReadKafkaJob        - message={seqNo: 1, eventTs: 1583420589670, id: even偶数, value: 2.76}
23:03:22,141 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 8 @ 1583420602141 for job 4d97a614ef4f928b93c8159f30e9c22e.
23:03:22,152 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 8 for job 4d97a614ef4f928b93c8159f30e9c22e (1236 bytes in 10 ms).
23:03:25,544 INFO  com.stc.sls.stream_process.examples.SimpleReadKafkaJob        - message={seqNo: 2, eventTs: 1583420598370, id: even偶数, value: 5.18}
23:03:33,181 INFO  com.stc.sls.stream_process.examples.SimpleReadKafkaJob        - message={seqNo: 3, eventTs: 1583420605939, id: odd奇数, value: 0.89}
23:03:40,659 INFO  com.stc.sls.stream_process.examples.SimpleReadKafkaJob        - message={seqNo: 4, eventTs: 1583420613564, id: even偶数, value: 9.29}


Но использование kafka-consumer-groups показывает нет активных членов

bin descKafkaConsumerGroup simple.read.kafka.job --members
Java HotSpot(TM) Server VM warning: G1 GC is disabled in this release.

Consumer group 'simple.read.kafka.job' has no active members.

1 Ответ

0 голосов
/ 28 февраля 2020

Flink реализует механизм сохранения смещения посредством контрольной точки по умолчанию. Это означает, что смещения сохраняются в Kafka на контрольной точке. Таким образом, при сбое задания вы можете безопасно воспроизвести события, которые не были обработаны полностью.

Таким образом, при развертывании двух заданий вполне возможно и ожидается, что оба будут обрабатывать записи, поскольку смещения периодически сохраняются. Существуют различные возможности изменить это поведение, например, включить автоматическую фиксацию с правильным интервалом. Хотя это может отрицательно сказаться на производительности или привести к потере данных при перезапуске.

Для получения дополнительной информации см. Документацию здесь .

...