Мне тоже это было нужно, но я ничего не смог найти, поэтому я добавил google-cloud-bigquery в мои зависимости, а затем:
implicit class RichDStreamMyClass(dstream: DStream[MyClass]) {
/** Writes the [[DStream]] with [[MyClass]]s to BigQuery.
* All the records are inserted at once per RDD (= per partition per window).
*/
def saveToBigQuery(tableRef: Table) =
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partition =>
val rowsToInsert = partition.map(toRowToInsert).toSeq.asJava
if (!rowsToInsert.isEmpty) {
val insertResponse = tableRef.insert(rowsToInsert)
if (insertResponse.hasErrors)
logger.error(s"${insertResponse.getInsertErrors.values()}")
}
}
}
}
/** Creates [[RowToInsert]] for BigQuery by mapping the field of a
* [[MyClass]]. */
def toRowToInsert(myClass: MyClass): RowToInsert = {
val fields = Map(
"timestamp" -> myClass.timestamp,
"name" -> myClass.name
).asJava
RowToInsert.of(s"${myClass.key}", fields)
}
Будьте внимательны, метод insert не может вставлять более 10k элементов одновременно, поэтому у меня также есть следующее:
val conf = new SparkConf()
.set("spark.streaming.kafka.maxRatePerPartition",
(10000 / config.spark.window).toString)
tableRef является экземпляром com.google.cloud.bigquery.Table .