трубопровод от кафки до редиса - PullRequest
0 голосов
/ 13 февраля 2019

Я использую flink для чтения из kafka и записи в redis.

Для теста я просто хочу прочитать первые 10 сообщений от Кафки.Поэтому я использую счетчик и пытаюсь остановить потребителя при counter = 10

    AtomicInteger counter = new AtomicInteger(0);

    FlinkKafkaConsumer08<String> kafkaConsumer =
            new FlinkKafkaConsumer08<>("my topic",
                    new SimpleStringSchema() {
                        @Override
                        public boolean isEndOfStream(String nextElement) {
                            // It should only read 10 kafka message
                            return counter.getAndIncrement() > 9;
                        }
                    },
                    properties);

, но я получаю 30 сообщений в redis:

llen rtp:example
(integer) 30

Когда я изменяю условие на counter.getAndIncrement() > 8 пишет 27 сообщений в redis.Всегда тройной.

Полный код:

public class FlinkEntry {

    private final static JedisCluster JEDIS_CLUSTER;

    static {
        Set<HostAndPort> hostAndPorts = new HashSet<>();
        hostAndPorts.add(new HostAndPort("localhost", 7001));
        JEDIS_CLUSTER = new JedisCluster(hostAndPorts);
    }


    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();

        FlinkKafkaConsumer08<String> kafkaConsumer = createKafkaConsumer();
        DataStream<String> dataStream = environment.addSource(kafkaConsumer);

        SinkFunction<String> redisSink = createRedisSink();
        dataStream.addSink(redisSink);

        environment.execute();
    }

    private static FlinkKafkaConsumer08<String> createKafkaConsumer() {
        Properties properties = new Properties();
        //... set kafka property

        AtomicInteger counter = new AtomicInteger(0);

        FlinkKafkaConsumer08<String> kafkaConsumer =
                new FlinkKafkaConsumer08<>("my topic",
                        new SimpleStringSchema() {
                            @Override
                            public boolean isEndOfStream(String nextElement) {
                                // It should only read 10 kafka message
                                return counter.getAndIncrement() > 9;
                            }
                        },
                        properties);

        kafkaConsumer.setStartFromLatest();
        return kafkaConsumer;
    }

    private static SinkFunction<String> createRedisSink() {
        return new SinkFunction<String>() {
            @Override
            public void invoke(String value, Context context) {
                JEDIS_CLUSTER.lpush("rtp:example", value);
                JEDIS_CLUSTER.expire("rtp:example", 10 * 60);
            }
        };
    }

}

1 Ответ

0 голосов
/ 16 февраля 2019

Одним из подходов к пониманию этого было бы отключение цепочки операторов путем вызова

    env.disableOperatorChaining();

, а затем просмотр некоторых метрик - например, numRecordsOut в источнике и numRecordsIn в приемнике.Я бы также дважды проверил, что вся работа выполняется с параллелизмом, установленным на 1.

Вам нужно будет отключить цепочку, потому что в противном случае вся работа будет свернута в одну задачу, и метрик не будет.собран для связи между двумя операторами.

...