Flink не добавляет никаких данных в Elasticsearch, но ошибок нет - PullRequest
0 голосов
/ 03 марта 2020

Ребята, я новичок во всем этом процессе потоковой передачи данных, но мне удалось создать и отправить задание Flink, которое будет считывать некоторые данные CSV из Kafka и объединять их, а затем помещать в Elasticsearch.

I был в состоянии сделать первые две части, и распечатать мое агрегирование в STDOUT. Но когда я добавил код, чтобы поместить его в Elasticsearch, там, похоже, ничего не происходит (данные не добавляются). Я посмотрел журнал диспетчера заданий Flink, и он выглядит нормально (без ошибок) и говорит:

2020-03-03 16:18:03,877 INFO 
org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7ApiCallBridge
- Created Elasticsearch RestHighLevelClient connected to [http://elasticsearch-elasticsearch-coordinating-only.default.svc.cluster.local:9200]

Вот мой код на данный момент:

/*
 * This Scala source file was generated by the Gradle 'init' task.
 */
package flinkNamePull

import java.time.LocalDateTime
import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer010, FlinkKafkaProducer010}
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.{DataTypes, Table}
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.descriptors.{Elasticsearch, Json, Schema}

object Demo {
  /**
   * MapFunction to generate Transfers POJOs from parsed CSV data.
   */
  class TransfersMapper extends RichMapFunction[String, Transfers] {
    private var formatter = null

    @throws[Exception]
    override def open(parameters: Configuration): Unit = {
      super.open(parameters)
      //formatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")
    }

    @throws[Exception]
    override def map(csvLine: String): Transfers = {
      //var splitCsv = csvLine.stripLineEnd.split("\n")(1).split(",")
      var splitCsv = csvLine.stripLineEnd.split(",")

      val arrLength = splitCsv.length
      val i = 0
      if (arrLength != 13) {
        for (i <- arrLength + 1 to 13) {
          if (i == 13) {
            splitCsv = splitCsv :+ "0.0"
          } else {
            splitCsv = splitCsv :+ ""
          }
        }
      }
      var trans = new Transfers()
      trans.rowId = splitCsv(0)
      trans.subjectId = splitCsv(1)
      trans.hadmId = splitCsv(2)
      trans.icuStayId = splitCsv(3)
      trans.dbSource = splitCsv(4)
      trans.eventType = splitCsv(5)
      trans.prev_careUnit = splitCsv(6)
      trans.curr_careUnit = splitCsv(7)
      trans.prev_wardId = splitCsv(8)
      trans.curr_wardId = splitCsv(9)
      trans.inTime = splitCsv(10)
      trans.outTime = splitCsv(11)
      trans.los = splitCsv(12).toDouble

      return trans
    }
  }

  def main(args: Array[String]) {
    // Create streaming execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // Set properties per KafkaConsumer API
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "kafka.kafka:9092")
    properties.setProperty("group.id", "test")

    // Add Kafka source to environment
    val myKConsumer = new FlinkKafkaConsumer010[String]("raw.data3", new SimpleStringSchema(), properties)
    // Read from beginning of topic
    myKConsumer.setStartFromEarliest()

    val streamSource = env
      .addSource(myKConsumer)

    // Transform CSV (with a header row per Kafka event into a Transfers object
    val streamTransfers = streamSource.map(new TransfersMapper())

    // create a TableEnvironment
    val tEnv = StreamTableEnvironment.create(env)

    println("***** NEW EXECUTION STARTED AT " + LocalDateTime.now() + " *****")

    // register a Table
    val tblTransfers: Table = tEnv.fromDataStream(streamTransfers)
    tEnv.createTemporaryView("transfers", tblTransfers)

    tEnv.connect(
      new Elasticsearch()
        .version("7")
        .host("elasticsearch-elasticsearch-coordinating-only.default.svc.cluster.local", 9200, "http")   // required: one or more Elasticsearch hosts to connect to
        .index("transfers-sum")
        .documentType("_doc")
        .keyNullLiteral("n/a")
    )
      .withFormat(new Json().jsonSchema("{type: 'object', properties: {curr_careUnit: {type: 'string'}, sum: {type: 'number'}}}"))
      .withSchema(new Schema()
        .field("curr_careUnit", DataTypes.STRING())
        .field("sum", DataTypes.DOUBLE())
      )
      .inUpsertMode()
      .createTemporaryTable("transfersSum")

    val result = tEnv.sqlQuery(
      """
        |SELECT curr_careUnit, sum(los)
        |FROM transfers
        |GROUP BY curr_careUnit
        |""".stripMargin)

    result.insertInto("transfersSum")

    // Elasticsearch elasticsearch-elasticsearch-coordinating-only.default.svc.cluster.local:9200

    env.execute("Flink Streaming Demo Dump to Elasticsearch")
  }
}

Я не уверен как я могу отладить этого зверя ... Интересно, может ли кто-нибудь помочь мне выяснить, почему задание Flink не добавляет данные в Elasticsearch :( Из моего кластера Flink я могу просто отлично запросить Elasticsearch (вручную) и добавить записи в мой индекс:

curl -XPOST "http://elasticsearch-elasticsearch-coordinating-only.default.svc.cluster.local:9200/transfers-sum/_doc" -H 'Content-Type: application/json' -d'{"curr_careUnit":"TEST123","sum":"123"}'

Ответы [ 2 ]

1 голос
/ 06 марта 2020

Добрая душа в Flink mailist указала на тот факт, что это может быть Elasticsearch, буферизующий мои записи ... Ну, это так. ;)

Я добавил следующие параметры в соединитель Elasticsearch:

.bulkFlushMaxActions(2)
.bulkFlushInterval(1000L)
0 голосов
/ 07 мая 2020

Flink Elasticsearch Connector 7 с использованием Scala

Пожалуйста, найдите рабочий и подробный ответ, который я предоставил здесь .

...