Я пытаюсь написать потоковое приложение, которое читает и пишет в Kafka.В настоящее время у меня есть это, но я должен toString мой класс кортежа.
object StreamingJob {
def main(args: Array[String]) {
// set up the streaming execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("zookeeper.connect", "localhost:2181")
properties.setProperty("group.id", "test")
val consumer = env.addSource(new FlinkKafkaConsumer08[String]("topic", new SimpleStringSchema(), properties))
val counts = consumer.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
.map { (_, 1) }
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1)
val producer = new FlinkKafkaProducer08[String](
"localhost:9092",
"my-topic",
new SimpleStringSchema())
counts.map(_.toString()).addSink(producer)
env.execute("Window Stream WordCount")
env.execute("Flink Streaming Scala API Skeleton")
}
}
Самое близкое, что я мог получить к этой работе, было следующее, но FlinkKafkaProducer08 отказывается принимать параметр типа как часть конструктора.
val producer = new FlinkKafkaProducer08[(String, Int)](
"localhost:9092",
"my-topic",
new TypeSerializerOutputFormat[(String, Int)])
counts.addSink(producer)
Мне интересно, есть ли способ записать кортежи прямо в мой приемник Кафки.