Соединитель BigQuery для Apache Spark - обновить секционированную таблицу - PullRequest
0 голосов
/ 27 августа 2018

Я пишу задание Spark в Scala в Google DataProc, которое выполняется ежедневно и обрабатывает записи, каждая из которых отмечена временем транзакции.Записи сгруппированы по годам и месяцам, и каждая группа записывается в отдельный ежемесячный файл паркета в GCS (например, 2018-07-file.parquet, 2018-08-file.parquet и т. Д.).Обратите внимание, что эти файлы возвращаются примерно на 5 лет и образуют очень большой набор данных (~ 1 ТБ).

Я хочу записать эти файлы в BigQuery, и задание обновляет только ежемесячные записи, которые изменились втекущий пробег .Для простоты я хотел бы удалить существующие записи за любой месяц с обновленными, а затем просто загрузить данные из ежемесячного файла паркета.

Я пытаюсь использовать BigQuery Connector для DataProc , но поддерживает только обновление всей таблицы , а не пакет записей, отфильтрованных по полю датыНапример,

Каков наилучший способ сделать это? Я попытался включить в свой проект полную библиотеку JAR библиотеки BigQuery и использовать запрос на манипулирование данными для удаления существующих ежемесячных записей, как показано ниже.:

def writeDataset(sparkContext: SparkContext, monthYear: String, ds: Dataset[TargetOrder]) = {
    val dtMonthYear = FeedWriter.parquetDateFormat.parse(monthYear)
    val bigquery: BigQuery = BigQueryOptions.getDefaultInstance.getService
    val queryConfig: QueryJobConfiguration =
      QueryJobConfiguration.newBuilder("DELETE FROM `" + getBQTableName(monthYear) + "` " +
        "WHERE header.trans_time BETWEEN PARSE_DATETIME('" + FeedWriter.parquetDateFormat.toPattern + "', '" + monthYear + "') " +
        "AND PARSE_DATETIME('" + FeedWriter.parquetDateFormat.toPattern + "', '" + DateUtils.addMonths(dtMonthYear, 1) + "') ")
    .setUseLegacySql(false)
    .build();

    val jobId: JobId = JobId.of(UUID.randomUUID().toString());
    val queryJob: Job = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build()).waitFor()
}

, но я получаю следующую ошибку (я предполагаю, что включение JAR-файла полного BQ-клиента в задание DataProc недопустимо или, возможно, оно просто не очень хорошо работает с соединителем BQ):

java.lang.NoSuchMethodError: com.google.api.services.bigquery.model.JobReference.setLocation(Ljava/lang/String;)Lcom/google/api/services/bigquery/model/JobReference;
  at com.google.cloud.bigquery.JobId.toPb(JobId.java:114)
  at com.google.cloud.bigquery.JobInfo.toPb(JobInfo.java:370)
  at com.google.cloud.bigquery.BigQueryImpl.create(BigQueryImpl.java:198)
  at com.google.cloud.bigquery.BigQueryImpl.create(BigQueryImpl.java:187)
  at ca.mycompany.myproject.output.BigQueryWriter$.writeDataset(BigQueryWriter.scala:39)

Ответы [ 2 ]

0 голосов
/ 10 сентября 2018

Я обнаружил, что включение полных клиентских JAR-файлов в задание DataProc, по-видимому, не работает (поэтому они и создали отдельные расширения соединителей для BQ и других служб), поэтому вместо этого я в итоге заставил свое задание Dataproc отправить сообщение Pub / Sub queue указывает, какой ежемесячный файл паркета обновлялся.Затем я создал Облачная функция для мониторинга очереди pub / sub и порождал BigQuery job для загрузки только измененных ежемесячных файлов.

Мне удалось просто удалить ежемесячные записи из таблицы BQ, используя раздел таблицы (например, MyTable $ 20180101 ) и сгруппировав все ежемесячные записи в один и тот же день.(в настоящее время BQ поддерживает разбиение таблицы только по дням, а не по месяцам, поэтому мне пришлось создать отдельное поле для каждой записи, для которой было установлено значение 2018-01-01, например, для всех записей в 2018-01-xx).

Пример кода Scala в Dataproc для записи в очередь Pub / Sub:

import java.text.SimpleDateFormat
import java.util.{Date, TimeZone, UUID}

import ca.my.company.config.ConfigOptions
import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport
import com.google.api.client.json.jackson2.JacksonFactory
import com.google.api.services.pubsub.Pubsub
import com.google.api.services.pubsub.model.{PublishRequest, PubsubMessage}
import com.google.cloud.hadoop.util.RetryHttpInitializer
import org.apache.spark.streaming.pubsub.SparkGCPCredentials

import scala.collection.mutable

case class MyPubSubMessage (jobId: UUID, processedDate: Date, fileDate: Date,  updatedFilePath: String)

object PubSubWriter {
  private val PUBSUB_APP_NAME: String = "MyPubSubWriter"
  private val messages: mutable.ListBuffer[PubsubMessage] = mutable.ListBuffer()
  private val publishRequest = new PublishRequest()
  private lazy val projectId: String = ConfigOptions().pubsubConfig.projectId
  private lazy val topicId: String = ConfigOptions().pubsubConfig.topicId

  private lazy val client = new Pubsub.Builder(
    GoogleNetHttpTransport.newTrustedTransport(),
    JacksonFactory.getDefaultInstance(),
    new RetryHttpInitializer(
      SparkGCPCredentials.builder.build().provider,
      PUBSUB_APP_NAME
    ))
    .setApplicationName(PUBSUB_APP_NAME)
    .build()

  def queueMessage(message: TlogPubSubMessage) {
    if (message == null) return
    val targetFileDateFormat = new SimpleDateFormat("yyyyMMdd")
    val isoDateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'")
    isoDateFormat.setTimeZone(TimeZone.getTimeZone("UTC"))

    import scala.collection.JavaConversions._
    val pubSubMessage = new PubsubMessage()
      .setAttributes(Map("msgType" -> "t-log-notification", "jobId" -> message.jobId.toString, "processedDate" -> isoDateFormat.format(message.processedDate), "fileDate" -> targetFileDateFormat.format(message.fileDate)))

    messages.synchronized {
      messages.append(pubSubMessage.encodeData(message.updatedFilePath.getBytes))
    }
  }

  def publishMessages(): Unit = {
    import scala.collection.JavaConversions._
    publishRequest.setMessages(messages)
    client.projects().topics()
      .publish(s"projects/$projectId/topics/$topicId", publishRequest)
      .execute()

    println(s"Update notifications: successfully sent ${messages.length} message(s) for topic '${topicId}' to Pub/Sub")
  }
}

Пример моей облачной функции Python для использования из очереди и запуска задания загрузки BQ:

def update_bigquery(data, context):
    import base64
    from datetime import datetime
    from dateutil import parser
    from google.cloud import bigquery
    from google.cloud.bigquery.table import TimePartitioning
    from google.api_core.exceptions import GoogleAPICallError

    dataset_id = 'mydatasetname'
    table_id_base = 'mytablename'

    # The data field looks like this:
    # {'@type': 'type.googleapis.com/google.pubsub.v1.PubsubMessage', 'attributes': {'fileDate': '20171201',
    # 'jobId': '69f6307e-28a1-40fc-bb6d-572c0bea9346', 'msgType': 't-log-notification',
    # 'processedDate': '2018-09-08T02:51:54Z'}, 'data': 'Z3M6Ly9nY3MtbGRsLWRzLWRhdGE...=='}

    # Retrieve file path (filter out SUCCESS file in the folder path) and build the partition name
    attributes = data['attributes']
    file_path = base64.b64decode(data['data']).decode('utf-8') + "/part*"
    partition_name = attributes['fileDate']
    table_partition = table_id_base + "$" + partition_name

    # Instantiate BQ client
    client = bigquery.Client()

    # Get reference to dataset and table
    dataset_ref = client.dataset(dataset_id)
    table_ref = dataset_ref.table(table_partition)

    try:
        # This only deletes the table partition and not the entire table
        client.delete_table(table_ref)  # API request
        print('Table {}:{} deleted.'.format(dataset_id, table_partition))

    except GoogleAPICallError as e:
        print('Error deleting table ' + table_partition + ": " + str(e))

    # Create BigQuery loading job
    job_config = bigquery.LoadJobConfig()
    job_config.source_format = bigquery.SourceFormat.PARQUET
    job_config.time_partitioning = TimePartitioning(field='bigQueryPartition')

    try :
        load_job = client.load_table_from_uri(
            file_path,
            dataset_ref.table(table_partition),
            job_config=job_config)  # API request

        print('Starting job {}'.format(load_job.job_id))

        # This can be commented-out to allow the job to run purely asynchronously
        # though if it fails, I'm not sure how I could be notified
        # For now, I will set this function to the max timeout (9 mins) and see if the BQ load job can consistently complete in time
        load_job.result()  # Waits for table load to complete.
        print('Job finished.')

    except GoogleAPICallError as e:
        print("Error running BQ load job: " + str(e))
        raise e

    return 'Success'
0 голосов
/ 28 августа 2018

как насчет bigquery4s ?

Это оболочка Scala для Java-клиента BQ. У меня была та же проблема, и она работала для меня.

...