Как продюсер Flink Kafka установил Semantic в Scala - PullRequest
0 голосов
/ 25 февраля 2019

Я хочу установить Flink kafka продюсер semantic. EXACTLY_ONCE, я использую код scala следующим образом

alertEnrichStream.map(_.toJsonStr)
  .addSink(
    new FlinkKafkaProducer011(
      kafkaBrokers,
      kafkaOutputTopic,
      new SimpleStringSchema))

Как отредактировать код для установки Semantic?

Ответы [ 2 ]

0 голосов
/ 30 июля 2019

Я придумал использование этого инструктора, и он работает довольно хорошо:

val producerProps = new Properties
producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
producerProps.setProperty(ProducerConfig.RETRIES_CONFIG, "2147483647")
producerProps.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1")
producerProps.setProperty(ProducerConfig.ACKS_CONFIG, "all")
producerProps.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")

new FlinkKafkaProducer011[String](
  topic,
  new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema),
  producerProps,
  Optional.of(new FlinkFixedPartitioner[String]),
  FlinkKafkaProducer011.Semantic.EXACTLY_ONCE,
  10
)
0 голосов
/ 25 февраля 2019

Используйте конструктор, который может определять семантику.Как этот:

FlinkKafkaProducer011<Integer> kafkaProducer = new FlinkKafkaProducer011<>(
    topic,
    new SimpleStringSchema,
    properties,
    Semantic.EXACTLY_ONCE);
...