Spark dataframe writeStream forEach не записывает все строки - PullRequest
1 голос
/ 10 мая 2019

Мой источник данных - Кафка, и я читаю данные из Кафки следующим образом:

var df = spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092,localhost:9093,localhost:9094")
    .option("subscribe", "raw_weather")
    .load()

df = df.selectExpr("CAST(value as STRING)")
        .as[String]
        .select("value")

Полученный value будет выглядеть следующим образом: (725030:14732,2008,12,31,11,0.6,-6.7,1001.7,80,6.2,8,0.0,0.0). В Кафку передано 8784 строки (24 * 366).

Я пытаюсь передать эти данные в базу данных DB2 с помощью класса, расширяющего org.apache.spark.sql.ForeachWriter[org.apache.spark.sql.Row]. Вот как я пытаюсь записать данные:

def writeToDb2(spark: SparkSession, df: DataFrame): Unit = {
    val writer = new JDBCSink(url , user , password)

    val query= df.writeStream
        .foreach(writer)
        .outputMode("append")
        .trigger(Trigger.ProcessingTime(2000))
        .start()

    query.awaitTermination()
}

Вот так выглядит мой JDBCSink:

class JDBCSink(url: String, user:String, pwd:String) extends org.apache.spark.sql.ForeachWriter[org.apache.spark.sql.Row]{
    val driver = "com.ibm.db2.jcc.DB2Driver"
    var connection:java.sql.Connection = _
    var statement:java.sql.Statement = _

    val schema = "SPARK"
    val rawTableName = "RAW_WEATHER_DATA"
    val dailyPrecipitationTable = "DAILY_PRECIPITATION_TABLE"

    def open(partitionId: Long, version: Long):Boolean = {
        Class.forName(driver)
        connection = java.sql.DriverManager.getConnection(url, user, pwd)
        statement = connection.createStatement
        true
    }

    def process(valz: org.apache.spark.sql.Row): Unit = {
        val value = valz(0).toString.split(",")
        val stmt = s"INSERT INTO $schema.$rawTableName(wsid, year, month, day, hour, temperature, dewpoint, pressure, wind_direction, wind_speed, sky_condition, one_hour_precip, six_hour_precip) " +
            "VALUES (" +
            "'" + value(0) + "'," +
            value(1) + "," +
            value(2) + "," +
            value(3) + "," +
            value(4) + "," +
            value(5) + "," +
            value(6) + "," +
            value(7) + "," +
            value(8) + "," +
            value(9) + "," +
            value(10) + "," +
            value(11) + "," +
            value(12) + ")"
        println(value(1) + "," + value(2) + "," + value(3) + "," + value(4) + "," + value(11))

        statement.executeUpdate(stmt)
    }

    def close(errorOrNull:Throwable):Unit = {
        connection.close()
    }
}

Вот в чем дело, когда я отправляю данные в поток, spark не читает все строки. Это стало ясно, когда я посмотрел код, в котором программа пытается писать. Когда я выполнил COUNT (*) в своей таблице, он не записывает все 8784 строки в таблицу. В некоторых итерациях программы число записанных строк колеблется около 7000, затем иногда 7900 и т. Д., Т.е. он не записывает все строки.

В чем может быть причина этого? Я следовал руководящим принципам структурированной потоковой передачи. Более того, я также пытался запускаться с использованием различных других триггеров, но ни один из них не помог.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...