Я обнаружил, что включение полных клиентских JAR-файлов в задание DataProc, по-видимому, не работает (поэтому они и создали отдельные расширения соединителей для BQ и других служб), поэтому вместо этого я в итоге заставил свое задание Dataproc отправить сообщение Pub / Sub queue указывает, какой ежемесячный файл паркета обновлялся.Затем я создал Облачная функция для мониторинга очереди pub / sub и порождал BigQuery job для загрузки только измененных ежемесячных файлов.
Мне удалось просто удалить ежемесячные записи из таблицы BQ, используя раздел таблицы (например, MyTable $ 20180101 ) и сгруппировав все ежемесячные записи в один и тот же день.(в настоящее время BQ поддерживает разбиение таблицы только по дням, а не по месяцам, поэтому мне пришлось создать отдельное поле для каждой записи, для которой было установлено значение 2018-01-01, например, для всех записей в 2018-01-xx).
Пример кода Scala в Dataproc для записи в очередь Pub / Sub:
import java.text.SimpleDateFormat
import java.util.{Date, TimeZone, UUID}
import ca.my.company.config.ConfigOptions
import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport
import com.google.api.client.json.jackson2.JacksonFactory
import com.google.api.services.pubsub.Pubsub
import com.google.api.services.pubsub.model.{PublishRequest, PubsubMessage}
import com.google.cloud.hadoop.util.RetryHttpInitializer
import org.apache.spark.streaming.pubsub.SparkGCPCredentials
import scala.collection.mutable
case class MyPubSubMessage (jobId: UUID, processedDate: Date, fileDate: Date, updatedFilePath: String)
object PubSubWriter {
private val PUBSUB_APP_NAME: String = "MyPubSubWriter"
private val messages: mutable.ListBuffer[PubsubMessage] = mutable.ListBuffer()
private val publishRequest = new PublishRequest()
private lazy val projectId: String = ConfigOptions().pubsubConfig.projectId
private lazy val topicId: String = ConfigOptions().pubsubConfig.topicId
private lazy val client = new Pubsub.Builder(
GoogleNetHttpTransport.newTrustedTransport(),
JacksonFactory.getDefaultInstance(),
new RetryHttpInitializer(
SparkGCPCredentials.builder.build().provider,
PUBSUB_APP_NAME
))
.setApplicationName(PUBSUB_APP_NAME)
.build()
def queueMessage(message: TlogPubSubMessage) {
if (message == null) return
val targetFileDateFormat = new SimpleDateFormat("yyyyMMdd")
val isoDateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'")
isoDateFormat.setTimeZone(TimeZone.getTimeZone("UTC"))
import scala.collection.JavaConversions._
val pubSubMessage = new PubsubMessage()
.setAttributes(Map("msgType" -> "t-log-notification", "jobId" -> message.jobId.toString, "processedDate" -> isoDateFormat.format(message.processedDate), "fileDate" -> targetFileDateFormat.format(message.fileDate)))
messages.synchronized {
messages.append(pubSubMessage.encodeData(message.updatedFilePath.getBytes))
}
}
def publishMessages(): Unit = {
import scala.collection.JavaConversions._
publishRequest.setMessages(messages)
client.projects().topics()
.publish(s"projects/$projectId/topics/$topicId", publishRequest)
.execute()
println(s"Update notifications: successfully sent ${messages.length} message(s) for topic '${topicId}' to Pub/Sub")
}
}
Пример моей облачной функции Python для использования из очереди и запуска задания загрузки BQ:
def update_bigquery(data, context):
import base64
from datetime import datetime
from dateutil import parser
from google.cloud import bigquery
from google.cloud.bigquery.table import TimePartitioning
from google.api_core.exceptions import GoogleAPICallError
dataset_id = 'mydatasetname'
table_id_base = 'mytablename'
# The data field looks like this:
# {'@type': 'type.googleapis.com/google.pubsub.v1.PubsubMessage', 'attributes': {'fileDate': '20171201',
# 'jobId': '69f6307e-28a1-40fc-bb6d-572c0bea9346', 'msgType': 't-log-notification',
# 'processedDate': '2018-09-08T02:51:54Z'}, 'data': 'Z3M6Ly9nY3MtbGRsLWRzLWRhdGE...=='}
# Retrieve file path (filter out SUCCESS file in the folder path) and build the partition name
attributes = data['attributes']
file_path = base64.b64decode(data['data']).decode('utf-8') + "/part*"
partition_name = attributes['fileDate']
table_partition = table_id_base + "$" + partition_name
# Instantiate BQ client
client = bigquery.Client()
# Get reference to dataset and table
dataset_ref = client.dataset(dataset_id)
table_ref = dataset_ref.table(table_partition)
try:
# This only deletes the table partition and not the entire table
client.delete_table(table_ref) # API request
print('Table {}:{} deleted.'.format(dataset_id, table_partition))
except GoogleAPICallError as e:
print('Error deleting table ' + table_partition + ": " + str(e))
# Create BigQuery loading job
job_config = bigquery.LoadJobConfig()
job_config.source_format = bigquery.SourceFormat.PARQUET
job_config.time_partitioning = TimePartitioning(field='bigQueryPartition')
try :
load_job = client.load_table_from_uri(
file_path,
dataset_ref.table(table_partition),
job_config=job_config) # API request
print('Starting job {}'.format(load_job.job_id))
# This can be commented-out to allow the job to run purely asynchronously
# though if it fails, I'm not sure how I could be notified
# For now, I will set this function to the max timeout (9 mins) and see if the BQ load job can consistently complete in time
load_job.result() # Waits for table load to complete.
print('Job finished.')
except GoogleAPICallError as e:
print("Error running BQ load job: " + str(e))
raise e
return 'Success'