У меня есть простой счетчик слов Flink, который читает из темы Kafka и выводит свой результат в другую тему Kafka,
DataStream<String> input = env.addSource(new FlinkKafkaConsumer<>(inputTopic, new SimpleStringSchema(), props));
DataStream<Tuple2<String, Long>> counts = ......;
counts.addSink(new FlinkKafkaProducer<>(outputTopic, new WordCountSerializer(), props));
//counts.print();
env.execute("foobar");
Проблема в том, что я ничего не вижу в теме вывода через kafka-console-Командная строка consumer.sh.
Чтобы решить эту проблему, я пытаюсь распечатать результат, и он работает нормально, я вижу правильный результат рабочего учета в файле журнала.
Так что предположениечто-то не так в WordCountSerializer, например,
class WordCountSerializer implements KeyedSerializationSchema<Tuple2<String, Long>>, java.io.Serializable {
public byte[] serializeKey(Tuple2<String, Long> element) {
return new StringSerializer().serialize(null, element.getField(0));
}
public byte[] serializeValue(Tuple2<String, Long> element) {
return new LongSerializer().serialize(null, element.getField(1));
}
public String getTargetTopic(Tuple2<String, Long> element) {
return null;
}
}
После изменения serializeValue на
public byte[] serializeValue(Tuple2<String, Long> element) {
return new StringSerializer().serialize(null, element.getField(1).toString());
}
я вижу, что счетчик был выведен в Kafka (часть слова Tuples все еще отсутствует), как
1
3
...
Мои вопросы:
- Я видел несколько примеров в Интернете, используя WordCountSerializer, упомянутый выше, но этоу меня не работает, я тут что-то не так делаю?
После изменения метода serializeValue, как указано выше, он частично работает, но на самом деле мне нужно что-то ниже, как правильнодостичь этого?
Фу, 1
бар, 3
...