Я некоторое время пользуюсь flink1.10.0 и нахожу странную проблему.
Я отправляю одну и ту же работу дважды.
$ flink list -r
28.02.2020 18:04:24 : f9ad14cb86a14c388ed6a146c80988fd : ReadKafkaJob (RUNNING)
28.02.2020 18:07:23 : e05bf26ee986573ffc01af8b1f5d1d59 : ReadKafkaJob (RUNNING)
Два задания имеют один и тот же group.id, но каждое из них может читать данные. Ниже в журнале показано, что одно и то же событие потребляется дважды.
2020-02-28 18:08:29,600 INFO com.stc.sls.stream_process.examples.ReadKafkaJob - eventDo={"id":"odd奇数","value":0.1,"eventTs":1582884509239,"seqNo":0}
2020-02-28 18:08:29,601 INFO com.stc.sls.stream_process.examples.ReadKafkaJob - eventDo={"id":"odd奇数","value":0.1,"eventTs":1582884509239,"seqNo":0}
2020-02-28 18:08:34,442 INFO com.stc.sls.stream_process.examples.ReadKafkaJob - eventDo={"id":"odd奇数","value":0.5,"eventTs":1582884514437,"seqNo":1}
2020-02-28 18:08:34,442 INFO com.stc.sls.stream_process.examples.ReadKafkaJob - eventDo={"id":"odd奇数","value":0.5,"eventTs":1582884514437,"seqNo":1}
2020-02-28 18:08:39,448 INFO com.stc.sls.stream_process.examples.ReadKafkaJob - eventDo={"id":"odd奇数","value":0.2,"eventTs":1582884519443,"seqNo":2}
2020-02-28 18:08:39,448 INFO com.stc.sls.stream_process.examples.ReadKafkaJob - eventDo={"id":"odd奇数","value":0.2,"eventTs":1582884519443,"seqNo":2}
Я установил 'group.id' в коде.
String kafkaTopic = params.get("kafka-topic", "flink-test");
String brokers = params.get("brokers", "192.168.0.100:9092");
String groupId = "simple.read.kafka.job";
Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", brokers);
kafkaProps.setProperty("group.id", groupId);
FlinkKafkaConsumer<EventDo> kafka = new FlinkKafkaConsumer<>(kafkaTopic, new EventDeSerializer(), kafkaProps);
Так почему два клиента в одной группе потребляют от kafka дважды?
Доза FlinkKafkaConsumer имеет какую-то особенную реализацию?
ОБНОВЛЕНИЕ:
Я провел несколько тестов, запускает двух пользователей консоли и один мгновенный потребитель.
Если я использую kafka-console-consumer для потребления, как показано ниже, client.id = 123
kafka-console-consumer --bootstrap-server 192.168.0.100:9092 --topic flink-test --consumer-property group.id=simple.read.kafka.job --consumer-property client.id=123
и другой потребитель с client.id = 456
kafka-console-consumer --bootstrap-server 192.168.0.100:9092 --topic flink-test --consumer-property group.id=simple.read.kafka.job --consumer-property client.id=456
Затем я запускаю задание flink в IDEA, чтобы использовать topi c flink.test с group.id = "simple.read.kafka.job"
20:38:17,107 INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=cid0931c3, groupId=simple.read.kafka.job] Subscribed to partition(s): flink-test-0
Я могу проверить соединения и найти два потребителя.
➜ bin descKafkaConsumerGroup simple.read.kafka.job --members
Java HotSpot(TM) Server VM warning: G1 GC is disabled in this release.
GROUP CONSUMER-ID HOST CLIENT-ID #PARTITIONS ASSIGNMENT
simple.read.kafka.job 123-5925201d-a767-4216-acdc-b46f058db0df /192.168.0.100 123 1 flink-test(0)
simple.read.kafka.job 456-01190de7-5d4e-43c1-9cb6-b599c9c69b41 /192.168.0.101 456 0 -
но где находится потребитель заданий Flink?
Два потребителя консолей ведут себя в одной группе, потребитель заданий Flink ведет себя в другой группе.
ОБНОВЛЕНИЕ2
У меня включена проверка. И весь код, как показано ниже.
import com.stc.sls.stream_process.examples.model.StringDeSerializer;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
import java.util.UUID;
@Slf4j
public class SimpleReadKafkaJob {
final static String clientId = "cid" + StringUtils.remove(UUID.randomUUID().toString(), '-').substring(0, 6);
public static void main(String[] args) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(args);
String kafkaTopic = params.get("kafka-topic", "flink-test");
String brokers = params.get("brokers", "192.168.0.100:9092");
String groupId = "simple.read.kafka.job";
System.out.printf("Reading kafka topic %s @ %s\n", kafkaTopic, brokers);
System.out.println();
Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", brokers);
kafkaProps.setProperty("group.id", groupId);
kafkaProps.setProperty("client.id", clientId);
FlinkKafkaConsumer<String> kafka = new FlinkKafkaConsumer<>(kafkaTopic, new StringDeSerializer(), kafkaProps);
kafka.setStartFromGroupOffsets();
kafka.setCommitOffsetsOnCheckpoints(true);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(20000);
env.setStateBackend((StateBackend) new FsStateBackend("file:///Users/ym/tmp/checkpoint"));
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
DataStream<String> dataStream = env.addSource(kafka);
dataStream.map((MapFunction<String, String>) s -> {
log.info("message={}", s);
return s;
}).addSink(new DiscardingSink<>());
env.execute(SimpleReadKafkaJob.class.getSimpleName());
}
}
и войдите как показано ниже
23:00:47,643 INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=cidb30879, groupId=simple.read.kafka.job] Subscribed to partition(s): flink-test-0
23:00:47,673 INFO org.apache.kafka.clients.Metadata - Cluster ID: LsItbMw1T_SHYQvJMt_6Fw
23:00:47,677 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=cidb30879, groupId=simple.read.kafka.job] Discovered group coordinator 192.168.0.100:9092 (id: 2147483647 rack: null)
23:01:02,160 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1 @ 1583420462134 for job 4d97a614ef4f928b93c8159f30e9c22e.
23:01:02,572 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 1 for job 4d97a614ef4f928b93c8159f30e9c22e (1236 bytes in 437 ms).
23:01:22,136 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 2 @ 1583420482135 for job 4d97a614ef4f928b93c8159f30e9c22e.
23:01:22,147 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 2 for job 4d97a614ef4f928b93c8159f30e9c22e (1236 bytes in 10 ms).
23:01:42,139 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 3 @ 1583420502138 for job 4d97a614ef4f928b93c8159f30e9c22e.
23:01:42,151 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 3 for job 4d97a614ef4f928b93c8159f30e9c22e (1236 bytes in 12 ms).
23:02:02,138 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 4 @ 1583420522137 for job 4d97a614ef4f928b93c8159f30e9c22e.
23:02:02,147 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 4 for job 4d97a614ef4f928b93c8159f30e9c22e (1236 bytes in 9 ms).
23:02:22,141 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 5 @ 1583420542139 for job 4d97a614ef4f928b93c8159f30e9c22e.
23:02:22,152 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 5 for job 4d97a614ef4f928b93c8159f30e9c22e (1236 bytes in 12 ms).
23:02:42,138 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 6 @ 1583420562137 for job 4d97a614ef4f928b93c8159f30e9c22e.
23:02:42,149 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 6 for job 4d97a614ef4f928b93c8159f30e9c22e (1236 bytes in 11 ms).
23:03:02,140 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 7 @ 1583420582139 for job 4d97a614ef4f928b93c8159f30e9c22e.
23:03:02,148 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 7 for job 4d97a614ef4f928b93c8159f30e9c22e (1236 bytes in 8 ms).
23:03:18,031 INFO com.stc.sls.stream_process.examples.SimpleReadKafkaJob - message={seqNo: 1, eventTs: 1583420589670, id: even偶数, value: 2.76}
23:03:22,141 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 8 @ 1583420602141 for job 4d97a614ef4f928b93c8159f30e9c22e.
23:03:22,152 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 8 for job 4d97a614ef4f928b93c8159f30e9c22e (1236 bytes in 10 ms).
23:03:25,544 INFO com.stc.sls.stream_process.examples.SimpleReadKafkaJob - message={seqNo: 2, eventTs: 1583420598370, id: even偶数, value: 5.18}
23:03:33,181 INFO com.stc.sls.stream_process.examples.SimpleReadKafkaJob - message={seqNo: 3, eventTs: 1583420605939, id: odd奇数, value: 0.89}
23:03:40,659 INFO com.stc.sls.stream_process.examples.SimpleReadKafkaJob - message={seqNo: 4, eventTs: 1583420613564, id: even偶数, value: 9.29}
Но использование kafka-consumer-groups показывает нет активных членов
bin descKafkaConsumerGroup simple.read.kafka.job --members
Java HotSpot(TM) Server VM warning: G1 GC is disabled in this release.
Consumer group 'simple.read.kafka.job' has no active members.