Есть ли какие-нибудь соединители от потоковой передачи искры к большому запросу Google? - PullRequest
0 голосов
/ 26 апреля 2018

Я ищу соединители с открытым исходным кодом, которые могут передавать данные из Spark Streaming в Google Big Query, есть ли такие?

Из того, что я нашел, есть один из Spotify , но он не поддерживается активно и позволяет отправлять только записи в формате Avro.

1 Ответ

0 голосов
/ 09 июля 2018

Мне тоже это было нужно, но я ничего не смог найти, поэтому я добавил google-cloud-bigquery в мои зависимости, а затем:

implicit class RichDStreamMyClass(dstream: DStream[MyClass]) {
  /** Writes the [[DStream]] with [[MyClass]]s to BigQuery.
    * All the records are inserted at once per RDD (= per partition per window).
    */
  def saveToBigQuery(tableRef: Table) =
    dstream.foreachRDD { rdd =>
      rdd.foreachPartition { partition =>
        val rowsToInsert = partition.map(toRowToInsert).toSeq.asJava
        if (!rowsToInsert.isEmpty) {
          val insertResponse = tableRef.insert(rowsToInsert)
          if (insertResponse.hasErrors) 
            logger.error(s"${insertResponse.getInsertErrors.values()}")
      }
    }
  }
}

/** Creates [[RowToInsert]] for BigQuery by mapping the field of a 
  * [[MyClass]]. */
def toRowToInsert(myClass: MyClass): RowToInsert = {
  val fields = Map(
    "timestamp" -> myClass.timestamp,
    "name" -> myClass.name
  ).asJava
  RowToInsert.of(s"${myClass.key}", fields)
}

Будьте внимательны, метод insert не может вставлять более 10k элементов одновременно, поэтому у меня также есть следующее:

val conf = new SparkConf()
  .set("spark.streaming.kafka.maxRatePerPartition",
    (10000 / config.spark.window).toString) 

tableRef является экземпляром com.google.cloud.bigquery.Table .

...