Попытка написать кортеж в Flink Kafka раковину - PullRequest
0 голосов
/ 05 октября 2018

Я пытаюсь написать потоковое приложение, которое читает и пишет в Kafka.В настоящее время у меня есть это, но я должен toString мой класс кортежа.

object StreamingJob {
  def main(args: Array[String]) {
    // set up the streaming execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
    properties.setProperty("zookeeper.connect", "localhost:2181")
    properties.setProperty("group.id", "test")

    val consumer = env.addSource(new FlinkKafkaConsumer08[String]("topic", new SimpleStringSchema(), properties))

    val counts = consumer.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
      .map { (_, 1) }
      .keyBy(0)
      .timeWindow(Time.seconds(5))
      .sum(1)

    val producer = new FlinkKafkaProducer08[String](
      "localhost:9092",
      "my-topic",
      new SimpleStringSchema())

    counts.map(_.toString()).addSink(producer)

    env.execute("Window Stream WordCount")
    env.execute("Flink Streaming Scala API Skeleton")
  }
}

Самое близкое, что я мог получить к этой работе, было следующее, но FlinkKafkaProducer08 отказывается принимать параметр типа как часть конструктора.

val producer = new FlinkKafkaProducer08[(String, Int)](
  "localhost:9092",
  "my-topic",
  new TypeSerializerOutputFormat[(String, Int)])

counts.addSink(producer)

Мне интересно, есть ли способ записать кортежи прямо в мой приемник Кафки.

1 Ответ

0 голосов
/ 06 октября 2018

Вам нужен примерно такой класс, который сериализует ваши кортежи:

private class SerSchema extends SerializationSchema[Tuple2[String, Int]] {
  override def serialize(tuple2: Tuple2[String, Int]): Array[Byte] = ...
}
...