При использовании окна Tumbling Kafka Streams возникают отклонения - PullRequest
0 голосов
/ 10 декабря 2018

!! Тестовая среда

  • Производитель, отправляющий сообщение с ключом 'A' раз в секунду.

  • Kafka Streams (Яблочный код) падающее окно каждые 10 секунд.

  • 10 сообщений в 10 секунд должно быть получено, и повторения происходят через равные промежутки времени, как показано ниже.

  • Это должно быть решено.

!! Текущие результаты

(временная отметка, ключ, Количество сообщений на одно окно)

(1544079700000L, 'A ', 10)

(1544079710000L,' A ', 10)

(1544079720000L,' A ', 10)

(1544079730000L,' A ', 9)

(1544079730000L, «A», 10)

(1544079740000L, «A», 10)

(1544079750000L, «A», 10)

(1544079760000L, «A», 9)

(1544079760000L, «A», 10)

(1544079770000L, «A», 10)

(1544079780000L, «A», 10)

(1544079790000L, «A», 9)

!!TumblingWindowKafkaStream.java (Использовать потоки Кафки) `

ackage io.github.timothyrenner.kstreamex.tumblingwindow;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import java.util.Properties;
import java.util.Random;
/** Demonstrates tumbling windows.
 *
 * @author Timothy Renner
 */
public class TumblingWindowKafkaStream {
    /** Runs the streams program, writing to the "long-counts-all" topic.
     *
     * @param args Not used.
     */
    public static void main(String[] args) throws Exception {
        Properties config = new Properties();
        config.put(StreamsConfig.APPLICATION_ID_CONFIG,
            "tumbling-window-kafka-streams");
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
            "localhost:9090,localhost:9091,localhost:9092");
        config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG,
            "localhost:2181");
        config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
            Serdes.ByteArray().getClass().getName());
        config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
            Serdes.Long().getClass().getName());
        KStreamBuilder builder = new KStreamBuilder();
        KStream<byte[], byte[]> longs = builder.stream(
            Serdes.ByteArray(), Serdes.ByteArray(), "t2");
        // The tumbling windows will clear every ten seconds.
        KTable<Windowed<byte[]>, Long> longCounts =
            longs.groupByKey()
                 .count(TimeWindows.of(10000L)
                                   .until(10000L),
                        "t2-counts");
        // Write to topics.
        longCounts.toStream((k,v) -> k.key())
                  .to(Serdes.ByteArray(),
                      Serdes.Long(),
                      "t2-counts-all");
        KafkaStreams streams = new KafkaStreams(builder, config);
        streams.start();
        } // Close main.
} // Close TumblingWindowKafkaStream.

`

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...