Не могу понять жизненный цикл TwoPhaseCommitSinkFunction - PullRequest
0 голосов
/ 28 января 2020

Мне понадобился приемник для Postgres БД, поэтому я начал создавать собственную функцию Flink SinkFunction. Поскольку FlinkKafkaProducer реализует функцию TwoPhaseCommitSinkFunction, я решил сделать то же самое. Как указано в книге О'Рейли Потоковая обработка с Apache Flink , вам просто нужно реализовать абстрактные методы, включить контрольные точки, и вы получите go. Но что действительно происходит, когда я запускаю свой код, так это то, что метод commit вызывается только один раз и вызывается до invoke, что совершенно неожиданно, поскольку вы не должны быть готовы к фиксации, если ваш набор готов к совершать транзакции пусто. И хуже всего то, что после фиксации вызывается invoke для всех строк транзакций, присутствующих в моем файле, а затем вызывается abort, что еще более неожиданно.

Когда инициализируется Sink Насколько я понимаю, должно произойти следующее:

  1. beginTransaction вызывается и отправляет идентификатор для вызова
  2. invoke добавляет строки в транзакцию в соответствии с полученным идентификатором
  3. pre-commit выполняет все окончательные изменения данных текущей транзакции
  4. commit обрабатывает завершенную транзакцию предварительно зафиксированных данных

Итак, я не понимаю, почему мой программа не показывает такое поведение.

Вот мой код приемника:

package PostgresConnector

import java.sql.{BatchUpdateException, DriverManager, PreparedStatement, SQLException, Timestamp}
import java.text.ParseException
import java.util.{Date, Properties, UUID}
import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{SinkFunction, TwoPhaseCommitSinkFunction}
import org.apache.flink.streaming.api.scala._
import org.slf4j.{Logger, LoggerFactory}




class PostgreSink(props : Properties, config : ExecutionConfig) extends TwoPhaseCommitSinkFunction[(String,String,String,String),String,String](createTypeInformation[String].createSerializer(config),createTypeInformation[String].createSerializer(config)){

    private var transactionMap : Map[String,Array[(String,String,String,String)]] = Map()

    private var parsedQuery : PreparedStatement = _

    private val insertionString : String = "INSERT INTO mydb (field1,field2,point) values (?,?,point(?,?))"

    override def invoke(transaction: String, value: (String,String,String,String), context: SinkFunction.Context[_]): Unit = {

        val LOG = LoggerFactory.getLogger(classOf[FlinkCEPClasses.FlinkCEPPipeline])

        val res = this.transactionMap.get(transaction)

        if(res.isDefined){

            var array = res.get

            array = array ++ Array(value)

            this.transactionMap += (transaction -> array)

        }else{

            val array = Array(value)

            this.transactionMap += (transaction -> array)


        }

        LOG.info("\n\nPassing through invoke\n\n")

        ()

    }

    override def beginTransaction(): String = {

        val LOG: Logger = LoggerFactory.getLogger(classOf[FlinkCEPClasses.FlinkCEPPipeline])

        val identifier = UUID.randomUUID.toString

        LOG.info("\n\nPassing through beginTransaction\n\n")

        identifier


    }

    override def preCommit(transaction: String): Unit = {

        val LOG = LoggerFactory.getLogger(classOf[FlinkCEPClasses.FlinkCEPPipeline])

        try{

            val tuple : Option[Array[(String,String,String,String)]]= this.transactionMap.get(transaction)

            if(tuple.isDefined){

                tuple.get.foreach( (value : (String,String,String,String)) => {

                    LOG.info("\n\n"+value.toString()+"\n\n")

                    this.parsedQuery.setString(1,value._1)
                    this.parsedQuery.setString(2,value._2)
                    this.parsedQuery.setString(3,value._3)
                    this.parsedQuery.setString(4,value._4)
                    this.parsedQuery.addBatch()

                })

            }

        }catch{

            case e : SQLException =>
                LOG.info("\n\nError when adding transaction to batch: SQLException\n\n")

            case f : ParseException =>
                LOG.info("\n\nError when adding transaction to batch: ParseException\n\n")

            case g : NoSuchElementException =>
                LOG.info("\n\nError when adding transaction to batch: NoSuchElementException\n\n")

            case h : Exception =>
                LOG.info("\n\nError when adding transaction to batch: Exception\n\n")

        }

        this.transactionMap = this.transactionMap.empty

        LOG.info("\n\nPassing through preCommit...\n\n")
    }

    override def commit(transaction: String): Unit = {

        val LOG : Logger = LoggerFactory.getLogger(classOf[FlinkCEPClasses.FlinkCEPPipeline])

        if(this.parsedQuery != null) {
            LOG.info("\n\n" + this.parsedQuery.toString+ "\n\n")
        }

        try{

            this.parsedQuery.executeBatch
            val LOG : Logger = LoggerFactory.getLogger(classOf[FlinkCEPClasses.FlinkCEPPipeline])
            LOG.info("\n\nExecuting batch\n\n")

        }catch{

            case e : SQLException =>
                val LOG : Logger = LoggerFactory.getLogger(classOf[FlinkCEPClasses.FlinkCEPPipeline])
                LOG.info("\n\n"+"Error : SQLException"+"\n\n")

        }

        this.transactionMap = this.transactionMap.empty

        LOG.info("\n\nPassing through commit...\n\n")

    }

    override def abort(transaction: String): Unit = {

        val LOG : Logger = LoggerFactory.getLogger(classOf[FlinkCEPClasses.FlinkCEPPipeline])

        this.transactionMap = this.transactionMap.empty

        LOG.info("\n\nPassing through abort...\n\n")

    }

    override def open(parameters: Configuration): Unit = {

        val LOG: Logger = LoggerFactory.getLogger(classOf[FlinkCEPClasses.FlinkCEPPipeline])

        val driver = props.getProperty("driver")
        val url = props.getProperty("url")
        val user = props.getProperty("user")
        val password = props.getProperty("password")
        Class.forName(driver)
        val connection = DriverManager.getConnection(url + "?user=" + user + "&password=" + password)
        this.parsedQuery = connection.prepareStatement(insertionString)

        LOG.info("\n\nConfiguring BD conection parameters\n\n")
    }
}

И это моя основная программа:

package FlinkCEPClasses

import PostgresConnector.PostgreSink
import org.apache.flink.api.java.io.TextInputFormat
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.pattern.conditions.SimpleCondition
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.core.fs.{FileSystem, Path}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.streaming.api.functions.source.FileProcessingMode
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import java.util.Properties

import org.apache.flink.api.common.ExecutionConfig
import org.slf4j.{Logger, LoggerFactory}

class FlinkCEPPipeline {

  val LOG: Logger = LoggerFactory.getLogger(classOf[FlinkCEPPipeline])
  LOG.info("\n\nStarting the pipeline...\n\n")

  var env : StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

  env.enableCheckpointing(10)
  env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
  env.setParallelism(1)

  //var input : DataStream[String] = env.readFile(new TextInputFormat(new Path("/home/luca/Desktop/lines")),"/home/luca/Desktop/lines",FileProcessingMode.PROCESS_CONTINUOUSLY,1)

  var input : DataStream[String] = env.readTextFile("/home/luca/Desktop/lines").name("Raw stream")

  var tupleStream : DataStream[(String,String,String,String)] = input.map(new S2PMapFunction()).name("Tuple Stream")

  var properties : Properties = new Properties()

  properties.setProperty("driver","org.postgresql.Driver")
  properties.setProperty("url","jdbc:postgresql://localhost:5432/mydb")
  properties.setProperty("user","luca")
  properties.setProperty("password","root")

  tupleStream.addSink(new PostgreSink(properties,env.getConfig)).name("Postgres Sink").setParallelism(1)
  tupleStream.writeAsText("/home/luca/Desktop/output",FileSystem.WriteMode.OVERWRITE).name("File Sink").setParallelism(1)

  env.execute()


}

Мой код функции S2PMapFunction:

package FlinkCEPClasses

import org.apache.flink.api.common.functions.MapFunction

case class S2PMapFunction() extends MapFunction[String,(String,String,String,String)] {

    override def map(value: String): (String, String, String,String) = {


            var tuple = value.replaceAllLiterally("(","").replaceAllLiterally(")","").split(',')

            (tuple(0),tuple(1),tuple(2),tuple(3))

    }
}

Мой конвейер работает следующим образом: я читаю строки из файла, сопоставляю их с кортежем строк и использую данные внутри кортежей, чтобы сохранить их в Postgres DB

Если хочешь сим обработайте данные, просто создайте файл со строками в следующем формате: (field1,field2,pointx,pointy)

Edit

Порядок выполнения методов TwoPhaseCommitSinkFUnction следующий:

Starting pipeline...
beginTransaction
preCommit
beginTransaction
commit
invoke
invoke
invoke
invoke
invoke
invoke
invoke
invoke
invoke
invoke
invoke
invoke
invoke
invoke
invoke
invoke
invoke
invoke
abort

Ответы [ 2 ]

0 голосов
/ 06 февраля 2020

Итак, здесь идет «ответ» на этот вопрос. Просто чтобы прояснить: на данный момент проблема с TwoPhaseCommitSinkFunction еще не решена. Если то, что вы ищете, касается исходной проблемы, то вам следует искать другой ответ. Если вас не волнует, что вы будете использовать в качестве раковины, то, возможно, я могу вам помочь с этим.

В соответствии с предложением @DavidAnderson я начал изучать Table API и посмотреть, может ли он решить мою проблему, которая заключалась в использовании Flink для вставки строк в таблицу базы данных.

Как оказалось, все оказалось очень просто.

OBS: Остерегайтесь используемой версии. Версия моего Флинка 1.9.0.

Исходный код

package FlinkCEPClasses

import java.sql.Timestamp
import java.util.Properties

import org.apache.flink.api.common.typeinfo.{TypeInformation, Types}
import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.table.api.{EnvironmentSettings, Table}
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.sinks.TableSink
import org.postgresql.Driver

class TableAPIPipeline {

    // --- normal pipeline initialization in this block ---

    var env : StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    env.enableCheckpointing(10)
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
    env.setParallelism(1)


    var input : DataStream[String] = env.readTextFile("/home/luca/Desktop/lines").name("Original stream")

    var tupleStream : DataStream[(String,Timestamp,Double,Double)] = input.map(new S2PlacaMapFunction()).name("Tuple Stream")

    var properties : Properties = new Properties()

    properties.setProperty("driver","org.postgresql.Driver")
    properties.setProperty("url","jdbc:postgresql://localhost:5432/mydb")
    properties.setProperty("user","myuser")
    properties.setProperty("password","mypassword")

    // --- normal pipeline initialization in this block END ---

    // These two lines create what Flink calls StreamTableEnvironment. 
    // It seems pretty similar to a normal stream initialization.
    val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
    val tableEnv = StreamTableEnvironment.create(env,settings)

    //Since I wanted to sink data into a database, I used JDBC TableSink,
    //because it is very intuitive and is a exact match with my need. You may
    //look for other TableSink classes that fit better in you solution.
    var tableSink : JDBCAppendTableSink = JDBCAppendTableSink.builder()
    .setBatchSize(1)
    .setDBUrl("jdbc:postgresql://localhost:5432/mydb")
    .setDrivername("org.postgresql.Driver")
    .setPassword("mypassword")
    .setUsername("myuser")
    .setQuery("INSERT INTO mytable (data1,data2,data3) VALUES (?,?,point(?,?))")
    .setParameterTypes(Types.STRING,Types.SQL_TIMESTAMP,Types.DOUBLE,Types.DOUBLE)
    .build()

    val fieldNames = Array("data1","data2","data3","data4")
    val fieldTypes = Array[TypeInformation[_]](Types.STRING,Types.SQL_TIMESTAMP,Types.DOUBLE, Types.DOUBLE)



    // This is the crucial part of the code: first, you need to register
    // your table sink, informing the name, the field names, field types and
    // the TableSink object.

    tableEnv.registerTableSink("postgres-table-sink",
        fieldNames,
        fieldTypes,
        tableSink
    )

    // Then, you transform your DataStream into a Table object.
    var table = tableEnv.fromDataStream(tupleStream)

    // Finally, you insert your stream data into the registered sink.
    table.insertInto("postgres-table-sink")




    env.execute()

}
0 голосов
/ 30 января 2020

Я не эксперт по этой теме c, но пара догадок:

preCommit вызывается всякий раз, когда Flink начинает контрольную точку, и commit вызывается, когда контрольная точка завершена , Эти методы вызываются просто потому, что происходит контрольная точка, независимо от того, получил ли приемник какие-либо данные.

Проверка выполняется периодически, независимо от того, проходят ли какие-либо данные через ваш конвейер. Учитывая ваш очень короткий интервал контрольных точек (10 мсек c), представляется правдоподобным, что первый барьер контрольных точек достигнет приемника до того, как источнику удалось отправить ему какие-либо данные.

Выглядит также, что вы при условии, что одновременно будет открыта только одна транзакция. Я не уверен, что это строго гарантировано, но до тех пор, пока maxConcurrentCheckpoints равно 1 (по умолчанию), все будет в порядке.

...