Flink выводит в Kafka, правильный способ использования KeyedSerializationSchema - PullRequest
0 голосов
/ 22 февраля 2019

У меня есть простой счетчик слов Flink, который читает из темы Kafka и выводит свой результат в другую тему Kafka,

DataStream<String> input = env.addSource(new FlinkKafkaConsumer<>(inputTopic, new SimpleStringSchema(), props));
DataStream<Tuple2<String, Long>> counts = ......;
counts.addSink(new FlinkKafkaProducer<>(outputTopic, new WordCountSerializer(), props));
//counts.print();
env.execute("foobar");

Проблема в том, что я ничего не вижу в теме вывода через kafka-console-Командная строка consumer.sh.

Чтобы решить эту проблему, я пытаюсь распечатать результат, и он работает нормально, я вижу правильный результат рабочего учета в файле журнала.

Так что предположениечто-то не так в WordCountSerializer, например,

class WordCountSerializer implements KeyedSerializationSchema<Tuple2<String, Long>>, java.io.Serializable {
public byte[] serializeKey(Tuple2<String, Long> element) {
    return new StringSerializer().serialize(null, element.getField(0));
}

public byte[] serializeValue(Tuple2<String, Long> element) {
    return new LongSerializer().serialize(null, element.getField(1));
}

public String getTargetTopic(Tuple2<String, Long> element) {
    return null;
}
}

После изменения serializeValue на

public byte[] serializeValue(Tuple2<String, Long> element) {
    return new StringSerializer().serialize(null, element.getField(1).toString());
}

я вижу, что счетчик был выведен в Kafka (часть слова Tuples все еще отсутствует), как

1
3
...

Мои вопросы:

  1. Я видел несколько примеров в Интернете, используя WordCountSerializer, упомянутый выше, но этоу меня не работает, я тут что-то не так делаю?
  2. После изменения метода serializeValue, как указано выше, он частично работает, но на самом деле мне нужно что-то ниже, как правильнодостичь этого?

    Фу, 1

    бар, 3

    ...

...