Я использую spark-sql-2.4.1 с spark-cassandra-connector-2_11.jar
Я пытаюсь присоединиться к потоковым наборам данных, как показано ниже:
Dataset<Row> companyInfo_df = company_info_df
.select("companyInfo.*" )
.withColumn("companyInfoEventTs", ( col("eventTs").divide(1000) ).cast(DataTypes.TimestampType))
.withWatermark("companyInfoEventTs", "60 seconds");
Dataset<Row> companyFin_df = comapany_fin_df
.select("companyFin.*" )
.withColumn("eventTimeStamp", ( col("eventTs").divide(1000) ).cast(DataTypes.TimestampType))
.withWatermark("eventTimeStamp", "60 seconds")
.groupBy(
window(col("eventTimeStamp").cast(DataTypes.TimestampType), "30 seconds", "20 seconds", "10 seconds")
,col("company_id"),col("year"),col("quarter")
)
.agg(
min("revenue").alias("min_revenue"),
max("revenue").alias("max_revenue") ,
avg("revenue").alias("mean_revenue"),
first("eventTimeStamp").alias("companyFinEventTs")
)
.select("company_id","year", "quarter", "companyFinEventTs", "window.start","window.end","min_revenue", "max_revenue","mean_revenue");
Dataset<Row> companyFinWithWatermark = companyFin_df.withWatermark("companyFinEventTs", "2 minutes");
Dataset<Row> companyInfoWithWatermark = companyInfo_df.withWatermark("companyInfoEventTs", "3 minutes");
Column joinExpr = expr(" company_id = companyid AND companyFinEventTs >= companyInfoEventTs AND companyFinEventTs <= companyInfoEventTs + interval 1 minutes ");
Dataset<Row> companyDfAfterJoin2 = companyFinWithWatermark.join(companyInfoWithWatermark,
joinExpr
//,"leftOuter"
)
.withColumn("last_update_timestamp", current_timestamp())
.withColumn( "avg_revenue", col("mean_revenue"))
Dataset<Row> companyDfAfterJoin = companyDfAfterJoin2
//.withWatermark("companyFinEventTs", "60 seconds")
.select("company_id","company_name","year","quarter", "avg_revenue" ,"last_update_timestamp" , "companyFinEventTs");
System.out.println(" companyDfAfterJoin *******************************");
companyDfAfterJoin
.writeStream()
.format("console")
.outputMode("append")
.option("truncate", false)
.trigger(Trigger.ProcessingTime("10 seconds"))
.start();
Любая подсказка, как это можно исправить, и что здесь не так?
Ошибка ниже:
companyDfAfterJoin *******************************
Exception in thread "main" org.apache.spark.sql.AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;;
Project [company_id#102, company_name#64, year#103, quarter#104, avg_revenue#216, last_update_timestamp#200, companyFinEventTs#137-T120000ms]
+- Project [company_id#102, company_name#64, year#103, quarter#104, avg_revenue#216, last_update_timestamp#200, companyFinEventTs#137-T120000ms]
+- Project [company_id#102, year#103, quarter#104, companyFinEventTs#137-T120000ms, start#147, end#148, min_revenue#131, max_revenue#133, mean_revenue#135, company_name#64, registeredYear#66, headQuarteredCity#67, companyid#74, companyInfoEventTs#81-T180000ms, last_update_timestamp#200, mean_revenue#135 AS avg_revenue#216]
+- Project [company_id#102, year#103, quarter#104, companyFinEventTs#137-T120000ms, start#147, end#148, min_revenue#131, max_revenue#133, mean_revenue#135, company_name#64, registeredYear#66, headQuarteredCity#67, companyid#74, companyInfoEventTs#81-T180000ms, current_timestamp() AS last_update_timestamp#200]
+- Join Inner, (((company_id#102 = companyid#74) && (companyFinEventTs#137-T120000ms >= companyInfoEventTs#81-T180000ms)) && (companyFinEventTs#137-T120000ms <= cast(companyInfoEventTs#81-T180000ms + interval 1 minutes as timestamp)))
:- EventTimeWatermark companyFinEventTs#137: timestamp, interval 2 minutes
: +- Project [company_id#102, year#103, quarter#104, companyFinEventTs#137, window#124.start AS start#147, window#124.end AS end#148, min_revenue#131, max_revenue#133, mean_revenue#135]
: +- Aggregate [window#138, company_id#102, year#103, quarter#104], [window#138 AS window#124, company_id#102, year#103, quarter#104, min(revenue#105) AS min_revenue#131, max(revenue#105) AS max_revenue#133, avg(cast(revenue#105 as bigint)) AS mean_revenue#135, first(eventTimeStamp#112-T60000ms, false) AS companyFinEventTs#137]
: +- Filter ((cast(eventTimeStamp#112-T60000ms as timestamp) >= window#138.start) && (cast(eventTimeStamp#112-T60000ms as timestamp) < window#138.end))
: +- Expand [ArrayBuffer(named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(cast(eventTimeStamp#112-T60000ms as timestamp), TimestampType, LongType) - 10000000) as double) / cast(20000000 as double))) as double) = (cast((precisetimestampconversion(cast(eventTimeStamp#112-T60000ms as timestamp), TimestampType, LongType) - 10000000) as double) / cast(20000000 as double))) THEN (CEIL((cast((precisetimestampconversion(cast(eventTimeStamp#112-T60000ms as timestamp), TimestampType, LongType) - 10000000) as double) / cast(20000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(cast(eventTimeStamp#112-T60000ms as timestamp), TimestampType, LongType) - 10000000) as double) / cast(20000000 as double))) END + cast(0 as bigint)) - cast(2 as bigint)) * 20000000) + 10000000), LongType, TimestampType), end, precisetimestampconversion((((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(cast(eventTimeStamp#112-T60000ms as timestamp), TimestampType, LongType) - 10000000) as double) / cast(20000000 as double))) as double) = (cast((precisetimestampconversion(cast(eventTimeStamp#112-T60000ms as timestamp), TimestampType, LongType) - 10000000) as double) / cast(20000000 as double))) THEN (CEIL((cast((precisetimestampconversion(cast(eventTimeStamp#112-T60000ms as timestamp), TimestampType, LongType) - 10000000) as double) / cast(20000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(cast(eventTimeStamp#112-T60000ms as timestamp), TimestampType, LongType) - 10000000) as double) / cast(20000000 as double))) END + cast(0 as bigint)) - cast(2 as bigint)) * 20000000) + 10000000) + 30000000), LongType, TimestampType)), company_id#102, year#103, quarter#104, revenue#105, eventTimeStamp#112-T60000ms), ArrayBuffer(named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(cast(eventTimeStamp#112-T60000ms as timestamp), TimestampType, LongType) - 10000000) as double) / cast(20000000 as double))) as double) = (cast((precisetimestampconversion(cast(eventTimeStamp#112-T60000ms as timestamp), TimestampType, LongType) - 10000000) as double) / cast(20000000 as double))) THEN (CEIL((cast((precisetimestampconversion(cast(eventTimeStamp#112-T60000ms as timestamp), TimestampType, LongType) - 10000000) as double) / cast(20000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(cast(eventTimeStamp#112-T60000ms as timestamp), TimestampType, LongType) - 10000000) as double) / cast(20000000 as double))) END + cast(1 as bigint)) - cast(2 as bigint)) * 20000000) + 10000000), LongType, TimestampType), end, precisetimestampconversion((((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(cast(eventTimeStamp#112-T60000ms as timestamp), TimestampType, LongType) - 10000000) as double) / cast(20000000 as double))) as double) = (cast((precisetimestampconversion(cast(eventTimeStamp#112-T60000ms as timestamp), TimestampType, LongType) - 10000000) as double) / cast(20000000 as double))) THEN (CEIL((cast((precisetimestampconversion(cast(eventTimeStamp#112-T60000ms as timestamp), TimestampType, LongType) - 10000000) as double) / cast(20000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(cast(eventTimeStamp#112-T60000ms as timestamp), TimestampType, LongType) - 10000000) as double) / cast(20000000 as double))) END + cast(1 as bigint)) - cast(2 as bigint)) * 20000000) + 10000000) + 30000000), LongType, TimestampType)), company_id#102, year#103, quarter#104, revenue#105, eventTimeStamp#112-T60000ms)], [window#138, company_id#102, year#103, quarter#104, revenue#105, eventTimeStamp#112-T60000ms]
: +- EventTimeWatermark eventTimeStamp#112: timestamp, interval 1 minutes
: +- Project [company_id#102, year#103, quarter#104, revenue#105, eventTimeStamp#112]
: +- Project [company_id#102, year#103, quarter#104, revenue#105, eventTs#106L, cast((cast(eventTs#106L as double) / cast(1000 as double)) as timestamp) AS eventTimeStamp#112]
: +- Project [companyFin#100.company_id AS company_id#102, companyFin#100.year AS year#103, companyFin#100.quarter AS quarter#104, companyFin#100.revenue AS revenue#105, companyFin#100.eventTs AS eventTs#106L]
: +- Project [jsontostructs(StructField(company_id,IntegerType,true), StructField(year,IntegerType,true), StructField(quarter,StringType,true), StructField(revenue,IntegerType,true), StructField(eventTs,LongType,true), cast(value#29 as string), Some(America/New_York)) AS companyFin#100]
: +- StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@5f935d49, kafka, Map(includeTimestamp -> true, key.deserializer -> org.apache.kafka.common.serialization.StringDeserializer, retries -> 1, subscribe -> inbound_company_financials, group.id -> ProcessingGroup, enable.auto.commit -> false, failOnDataLoss -> false, value.deserializer -> com.spg.ca.prescore.serde.KafkaJsonCompanyFinRecordSerDe, kafka.bootstrap.servers -> localhost:9092, startingOffsets -> latest, linger.ms -> 5), [key#28, value#29, topic#30, partition#31, offset#32L, timestamp#33, timestampType#34], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@76af34b5,kafka,List(),None,List(),None,Map(includeTimestamp -> true, key.deserializer -> org.apache.kafka.common.serialization.StringDeserializer, retries -> 1, subscribe -> inbound_company_financials, group.id -> ProcessingGroup, enable.auto.commit -> false, failOnDataLoss -> false, value.deserializer -> com.spg.ca.prescore.serde.KafkaJsonCompanyFinRecordSerDe, kafka.bootstrap.servers -> localhost:9092, startingOffsets -> latest, linger.ms -> 5),None), kafka, [key#21, value#22, topic#23, partition#24, offset#25L, timestamp#26, timestampType#27]
+- EventTimeWatermark companyInfoEventTs#81: timestamp, interval 3 minutes
+- Project [company_name#64, registeredYear#66, headQuarteredCity#67, companyid#74, companyInfoEventTs#81-T60000ms]
+- Project [company_name#64, company_id#65, registeredYear#66, headQuarteredCity#67, companyid#74, companyInfoEventTs#81-T60000ms]
+- EventTimeWatermark companyInfoEventTs#81: timestamp, interval 1 minutes
+- Project [company_name#64, company_id#65, registeredYear#66, headQuarteredCity#67, eventTs#68L, companyid#74, cast((cast(eventTs#68L as double) / cast(1000 as double)) as timestamp) AS companyInfoEventTs#81]
+- Project [company_name#64, company_id#65, registeredYear#66, headQuarteredCity#67, eventTs#68L, cast(company_id#65 as int) AS companyid#74]
+- Project [companyInfo#62.company_name AS company_name#64, companyInfo#62.company_id AS company_id#65, companyInfo#62.registeredYear AS registeredYear#66, companyInfo#62.headQuarteredCity AS headQuarteredCity#67, companyInfo#62.eventTs AS eventTs#68L]
+- Project [jsontostructs(StructField(company_name,StringType,true), StructField(company_id,IntegerType,true), StructField(registeredYear,IntegerType,true), StructField(headQuarteredCity,StringType,true), StructField(eventTs,LongType,true), cast(value#42 as string), Some(America/New_York)) AS companyInfo#62]
+- Project [cast(value#8 as string) AS value#42, cast(topic#9 as string) AS topic#43, cast(partition#10 as int) AS partition#44, cast(offset#11L as bigint) AS offset#45L, cast(timestamp#12 as timestamp) AS timestamp#46]
+- StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@3313463c, kafka, Map(includeTimestamp -> true, key.deserializer -> org.apache.kafka.common.serialization.StringDeserializer, retries -> 1, subscribe -> inbound_company_info, group.id -> ProcessingGroup, enable.auto.commit -> false, failOnDataLoss -> false, value.deserializer -> com.spg.ca.prescore.serde.KafkaJsonCompanyInfoRecordSerDe, kafka.bootstrap.servers -> localhost:9092, startingOffsets -> latest, linger.ms -> 5), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@76af34b5,kafka,List(),None,List(),None,Map(includeTimestamp -> true, key.deserializer -> org.apache.kafka.common.serialization.StringDeserializer, retries -> 1, subscribe -> inbound_company_info, group.id -> ProcessingGroup, enable.auto.commit -> false, failOnDataLoss -> false, value.deserializer -> com.spg.ca.prescore.serde.KafkaJsonCompanyInfoRecordSerDe, kafka.bootstrap.servers -> localhost:9092, startingOffsets -> latest, linger.ms -> 5),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:389)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForStreaming(UnsupportedOperationChecker.scala:111)
at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:250)
at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:316)
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:325)
at com.spgmi.ca.prescore.utils.ConfigUtils.displayOnConsole(ConfigUtils.java:84)
что правильно обрабатывать объединенные потоки? Используйте WaterMark правильно.
Большая часть документации не очень полезна, в ней используется просто и легко scarnio, которая не имеет никакой ценности в реальных сценариях.