групповая идеальная стратегия в Spark Streaming - PullRequest
1 голос
/ 21 мая 2019

Я читаю данные, используя Spark Streaming из источника Kafka, откуда я создаю кадр данных со столбцами wsid, year, month, day, oneHourPrecip:

val df = spark.readStream
    .format("kafka")
    .option("subscribe", "raw_weather")
    .option("kafka.bootstrap.servers", "<host1:port1,host2:port2>...")
    .option("kafka.security.protocol", "SASL_SSL")
    .option("kafka.sasl.mechanism" , "PLAIN")
    .option("kafka.sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"token\" password=\"" + "<some password>" + "\";")
    .option("kafka.ssl.protocol", "TLSv1.2")
    .option("kafka.ssl.enabled.protocols", "TLSv1.2")
    .option("kafka.ssl.endpoint.identification.algorithm", "HTTPS")
    .load()
    .selectExpr("CAST(value as STRING)")
    .as[String]
    .withColumn("_tmp", split(col("value"), "\\,"))
    .select(
        $"_tmp".getItem(0).as("wsid"),
        $"_tmp".getItem(1).as("year").cast("int"),
        $"_tmp".getItem(2).as("month").cast("int"),
        $"_tmp".getItem(3).as("day").cast("int"),
        $"_tmp".getItem(11).as("oneHourPrecip").cast("double")
    )
    .drop("_tmp")

Затем я выполняю групповую передачу, а затем пытаюсь записать данные этого потока в таблицу, используя JDBC.Для этого это мой код:

val query= df.writeStream
    .outputMode(OutputMode.Append())
    .foreachBatch((df: DataFrame , id: Long) => {
        println(df.count())
        df.groupBy($"wsid" , $"year" , $"month" , $"day")
            .agg(sum($"oneHourPrecip").as("precipitation"))
            .write
            .mode(SaveMode.Append)
            .jdbc(url , s"$schema.$table" , getProperties)
    })
    .trigger(Trigger.ProcessingTime(1))
    .start()

Проблема связана с пакетом.С помощью Spark Streaming мы не можем предсказать количество строк, приходящих на каждый пакет в кадре данных.Поэтому довольно часто я получаю несвязанные данные (т. Е. Для заданных общих значений (wsid,year,month,day), некоторые строки появляются в одном пакете, а некоторые другие появляются в другом пакете).

Затем, когда я группируюи попробуйте написать его, используя JDBC, вот ошибка, которую я получаю:

com.ibm.db2.jcc.am.BatchUpdateException: [jcc][t4][102][10040][4.25.13] Batch failure.  The batch was submitted, but at least one exception occurred on an individual member of the batch.
Use getNextException() to retrieve the exceptions for specific batched elements. ERRORCODE=-4229, SQLSTATE=null
    at com.ibm.db2.jcc.am.b6.a(b6.java:502)
    at com.ibm.db2.jcc.am.Agent.endBatchedReadChain(Agent.java:434)
    at com.ibm.db2.jcc.am.k4.a(k4.java:5452)
    at com.ibm.db2.jcc.am.k4.c(k4.java:5026)
    at com.ibm.db2.jcc.am.k4.executeBatch(k4.java:3058)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:672)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:834)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:834)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
    Suppressed: com.ibm.db2.jcc.am.SqlIntegrityConstraintViolationException: Error for batch element #1: DB2 SQL Error: SQLCODE=-803, SQLSTATE=23505, SQLERRMC=1;SPARK.DAILY_PRECIPITATION_DATA, DRIVER=4.25.13
        at com.ibm.db2.jcc.am.b6.a(b6.java:806)
        at com.ibm.db2.jcc.am.b6.a(b6.java:66)
        at com.ibm.db2.jcc.am.b6.a(b6.java:140)
        at com.ibm.db2.jcc.t4.ab.a(ab.java:1283)
        at com.ibm.db2.jcc.t4.ab.a(ab.java:128)
        at com.ibm.db2.jcc.t4.p.a(p.java:57)
        at com.ibm.db2.jcc.t4.aw.a(aw.java:225)
        at com.ibm.db2.jcc.am.k4.a(k4.java:3605)
        at com.ibm.db2.jcc.am.k4.d(k4.java:6020)
        at com.ibm.db2.jcc.am.k4.a(k4.java:5372)
        ... 17 more

Как видно из SqlIntegrityConstraintViolationException выше, это потому, что после одного пакета записывает groupby ed значения, используя JDBC, вставкаследующий набор значений потерпит неудачу из-за первичного ключа (wsid,year,month,day).

Учитывая, что будет фиксированное количество oneHourPrecip значений (24) для данного (wsid,year,month,day) из источника, как сделатьмы гарантируем, что groupBy работает правильно для всех данных, которые передаются из источника, чтобы вставка в базу данных не была проблемой?

Ответы [ 2 ]

2 голосов
/ 21 мая 2019

SaveMode.Upsert недоступно :-) Нет ничего общего с groupBy. сгруппировать по группам значений. Нарушение целостности (com.ibm.db2.jcc.am.SqlIntegrityConstraintViolationException) вам необходимо позаботиться на уровне sql.

Вариант 1:

Вы можете вставить обновление, чтобы избежать нарушения целостности.

для этого вам нужно использовать, как показано ниже, псевдокод ...

dataframe.foreachPartition {

update TABLE_NAME set FIELD_NAME=xxxxx where MyID=XXX;

INSERT INTO TABLE_NAME values (colid,col1,col2) 
WHERE NOT EXISTS(select 1 from TABLE_NAME where colid=xxxx);
}

Вариант 2: или проверьте оператор слияния в db2

один из способов - создать пустую временную таблицу (без каких-либо ограничений), которая имеет ту же схему, и заполнить ее, и в конце вы можете выполнить скрипт, который будет сливаться с целью стол.

0 голосов
/ 21 мая 2019

Я кое-что понял, но это может иметь некоторые проблемы с производительностью. В любом случае, у меня это сработало, поэтому выкладываю ответ:

Я понял, что для сохранения groupby отредактированных данных в таблице DB2 нам придется подождать, пока мы не получим все данные из источника. Для этого я использую OutputMode.Complete().

Тогда я понял, что если я напишу это в DB2 после группировки в текущем методе, это все равно вызовет ту же ошибку. Для этого мне пришлось использовать SaveMode.Overwrite внутри foreachBatch.

Я попытался запустить мою программу с таким подходом, но он выдал эту ошибку:

org.apache.spark.sql.AnalysisException: Complete output mode not supported when there are no streaming aggregations on streaming DataFrames/Datasets

Поэтому я решил сделать групповую обработку и агрегацию в течение readStream. Таким образом, мой код выглядит так:

readStream часть:

val df = spark.readStream
    .format("kafka")
    .option("subscribe", "raw_weather")
    .option("kafka.bootstrap.servers", "<host1:port1,host2:port2>...")
    .option("kafka.security.protocol", "SASL_SSL")
    .option("kafka.sasl.mechanism" , "PLAIN")
    .option("kafka.sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"token\" password=\"" + "<some password>" + "\";")
    .option("kafka.ssl.protocol", "TLSv1.2")
    .option("kafka.ssl.enabled.protocols", "TLSv1.2")
    .option("kafka.ssl.endpoint.identification.algorithm", "HTTPS")
    .load()
    .selectExpr("CAST(value as STRING)")
    .as[String]
    .withColumn("_tmp", split(col("value"), "\\,"))
    .select(
        $"_tmp".getItem(0).as("wsid"),
        $"_tmp".getItem(1).as("year").cast("int"),
        $"_tmp".getItem(2).as("month").cast("int"),
        $"_tmp".getItem(3).as("day").cast("int"),
        $"_tmp".getItem(11).as("oneHourPrecip").cast("double")
    )
    .drop("_tmp")
    .groupBy($"wsid" , $"year" , $"month" , $"day")
    .agg(sum($"oneHourPrecip").as("precipitation"))

writeStream часть:

val query= df.writeStream
    .outputMode(OutputMode.Complete())
    .foreachBatch((df: DataFrame , id: Long) => {
        println(df.count())
        df.write
            .mode(SaveMode.Overwrite)
            .jdbc(url , s"$schema.$table" , getProperties)
    })
    .trigger(Trigger.ProcessingTime(1))
    .start()

query.awaitTermination()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...