Я новичок в мире BigQuery !!!
У меня есть задача запросить данные BigQuery (оператор SQL) (и открыть другие методы) и загрузить их в корзину S3.Найдите прилагаемый код для запроса данных и загрузки в корзину S3 (работает не так, как ожидалось).Мы стараемся избегать хранения данных в GCS и извлекать их оттуда или использовать GSUTIL.
Любые указатели на примеры или способы сделать это приветствуются.
Заранее спасибо
Извлечение данных из BigQuery:
import java.util.UUID
import com.google.cloud.bigquery.{Dataset, JobId, JobInfo,
QueryJobConfiguration}
import com.gwf.Logging
class BQQuery extends Logging{
def extractBQData: String = {
val bqClient = (new BigQueryClient).createBQClient
logger.info("projectId: " + bqClient.getOptions.getProjectId)
val START_DATE = "20180514"
val END_DATE = "20180514"
val query = "SELECT " + " FORMAT_TIMESTAMP('%Y-%m-%d %H:%M:%S',
TIMESTAMP_SECONDS(SAFE_CAST(visitStartTime+hits.time/1000 AS INT64)),
'America/Denver') AS hit_timestamp, " + " FORMAT_TIMESTAMP('%Y-%m-%d
%H:%M:%S', TIMESTAMP_SECONDS(SAFE_CAST(visitStartTime AS INT64)),
'America/Denver') AS hit_visitStartTime, " + " CONCAT(SUBSTR(date,0,4),'-
',SUBSTR(date,5,2),'-',SUBSTR(date,7,2)) AS date, " + " visitNumber, " + "
visitId, " + " fullVisitorId, " + " totals.hits, " + " totals.pageviews, " +
" totals.timeOnSite, " + " totals.bounces, " + " totals.newVisits, " + "
trafficSource.referralPath, " + " trafficSource.campaign, " + "
trafficSource.source, " + " trafficSource.medium, " + "
trafficSource.keyword, " + " trafficSource.adContent, " + " device.browser,
" + " device.browserVersion, " + " device.browserSize, " + "
device.operatingSystem, " + " device.operatingSystemVersion, " + "
device.isMobile, " + " device.mobileDeviceBranding, " + "
device.flashVersion, " + " device.javaEnabled, " + " device.language, " + "
device.screenColors, " + " device.screenResolution, " + "
device.deviceCategory, " + " geoNetwork.continent, " + "
geoNetwork.subContinent, " + " geoNetwork.country, " + " geoNetwork.region,
" + " geoNetwork.metro, " + " hits.type, " + " hits.hitNumber, " + "
hits.social.socialInteractionNetwork, " + "
hits.social.socialInteractionAction, " + " hits.time/1000 AS hits_time, " +
" hits.hour, " + " hits.minute, " + " hits.isSecure, " + "
hits.isInteraction, " + " hits.isEntrance, " + " hits.isExit, " + "
hits.referer, " + " hits.page.pagePath, " + " hits.page.hostname, " + "
hits.page.pageTitle, " + " hits.page.searchKeyword, " + "
hits.page.searchCategory, " + " hits.eventInfo.eventCategory, " + "
hits.eventInfo.eventAction, " + " hits.eventInfo.eventLabel, " + "
hits.eventInfo.eventValue, " + " (SELECT MAX(IF(index=1, value, NULL)) FROM
UNNEST(hits.customDimensions)) AS LIAT_event_source_cd1, " + " (SELECT
MAX(IF(index=2, value, NULL)) FROM UNNEST(hits.customDimensions)) AS
LIAT_event_id_cd2, " + " (SELECT MAX(IF(index=3, value, NULL)) FROM
UNNEST(hits.customDimensions)) AS Slider_id_cd3, " + " (SELECT
MAX(IF(index=4, value, NULL)) FROM UNNEST(hits.customDimensions)) AS
Individual_id_cd4, " + " (SELECT MAX(IF(index=5, value, NULL)) FROM
UNNEST(hits.customDimensions)) AS Plan_id_cd5, " + " (SELECT MAX(IF(index=6,
value, NULL)) FROM UNNEST(hits.customDimensions)) AS Terminated_status_cd6,
" + " (SELECT MAX(IF(index=7, value, NULL)) FROM
UNNEST(hits.customDimensions)) AS LIAT_Slider_Changed_Element_cd7, " + "
(SELECT MAX(IF(index=8, value, NULL)) FROM UNNEST(hits.customDimensions)) AS
LIAT_Page_Name_cd8, " + " (SELECT MAX(IF(index=9, value, NULL)) FROM
UNNEST(hits.customDimensions)) AS Login_Status_cd9, " + " (SELECT
MAX(IF(index=10, value, NULL)) FROM UNNEST(hits.customDimensions)) AS
Debug_cd10, " + " (SELECT MAX(IF(index=11, value, NULL)) FROM
UNNEST(hits.customDimensions)) AS Age_cd11, " + " (SELECT MAX(IF(index=12,
value, NULL)) FROM UNNEST(hits.customDimensions)) AS Salary_Group_cd12, " +
" (SELECT MAX(IF(index=13, value, NULL)) FROM UNNEST(hits.customDimensions))
AS Gender_cd13, " + " (SELECT MAX(IF(index=14, value, NULL)) FROM
UNNEST(hits.customDimensions)) AS TransactionId_cd14, " + " (SELECT
MAX(IF(index=15, value, NULL)) FROM UNNEST(hits.customDimensions)) AS
Transaction_User_Id_cd15, " + " (SELECT MAX(IF(index=16, value, NULL)) FROM
UNNEST(hits.customDimensions)) AS Unique_Event_Id_cd16, " + " (SELECT
MAX(IF(index=17, value, NULL)) FROM UNNEST(hits.customDimensions)) AS
not_set_dimension_cd17, " + " (SELECT MAX(IF(index=18, value, NULL)) FROM
UNNEST(hits.customDimensions)) AS ValueUnits_cd18, " + " (SELECT
MAX(IF(index=19, value, NULL)) FROM UNNEST(hits.customDimensions)) AS
Deferral_Type_Code_cd19, " + " (SELECT MAX(IF(index=20, value, NULL)) FROM
UNNEST(hits.customDimensions)) AS Income_Term_cd20, " + " (SELECT
MAX(IF(index=21, value, NULL)) FROM UNNEST(hits.customDimensions)) AS
Plan_Name_cd21, " + " (SELECT MAX(IF(index=22, value, NULL)) FROM
UNNEST(hits.customDimensions)) AS Company_Match_Rule_cd22, " + " (SELECT
MAX(IF(index=23, value, NULL)) FROM UNNEST(hits.customDimensions)) AS
Company_Match_Description_cd23, " + " (SELECT MAX(IF(index=24, value, NULL))
FROM UNNEST(hits.customDimensions)) AS Has_Company_Match_cd24, " + " (SELECT
MAX(IF(index=25, value, NULL)) FROM UNNEST(hits.customDimensions)) AS
TimeStamp_cd25, " + " (SELECT MAX(IF(index=26, value, NULL)) FROM
UNNEST(hits.customDimensions)) AS Enrollment_Type_cd26, " + " (SELECT
MAX(IF(index=27, value, NULL)) FROM UNNEST(hits.customDimensions)) AS
test_Liat_Page_cd27, " + " (SELECT MAX(IF(index=28, value, NULL)) FROM
UNNEST(hits.customDimensions)) AS hdic_ageselection_cd28, " + " (SELECT
MAX(IF(index=29, value, NULL)) FROM UNNEST(hits.customDimensions)) AS
hdic_salaryselection_cd29, " + " (SELECT MAX(IF(index=30, value, NULL)) FROM
UNNEST(hits.customDimensions)) AS hdic_genderselection_cd30, " + " (SELECT
MAX(IF(index=31, value, NULL)) FROM UNNEST(hits.customDimensions)) AS
hdic_my_pct_goal_cd31, " + " (SELECT MAX(IF(index=32, value, NULL)) FROM
UNNEST(hits.customDimensions)) AS hdic_mycontribution_cd32, " + " (SELECT
MAX(IF(index=33, value, NULL)) FROM UNNEST(hits.customDimensions)) AS
hdic_mybalance_cd33, " + " (SELECT MAX(IF(index=34, value, NULL)) FROM
UNNEST(hits.customDimensions)) AS hdic_peer_pctgoal_cd34, " + " (SELECT
MAX(IF(index=35, value, NULL)) FROM UNNEST(hits.customDimensions)) AS
hdic_peer_contributionrate_cd35, " + " (SELECT MAX(IF(index=36, value,
NULL)) FROM UNNEST(hits.customDimensions)) AS hdic_peer_balance_cd36, " + "
(SELECT MAX(IF(index=37, value, NULL)) FROM UNNEST(hits.customDimensions))
AS hdic_top_pct_goal_cd37, " + " (SELECT MAX(IF(index=38, value, NULL)) FROM
UNNEST(hits.customDimensions)) AS hdic_top_contributionrate_cd38, " + "
(SELECT MAX(IF(index=39, value, NULL)) FROM UNNEST(hits.customDimensions))
AS hdic_top_balance_cd39, " + " (SELECT MAX(IF(index=40, value, NULL)) FROM
UNNEST(hits.customDimensions)) AS AgeRange_cd40, " + " (SELECT
MAX(IF(index=1, value, NULL)) FROM UNNEST(hits.customMetrics)) AS
Slider_value_cm1, " + " (SELECT MAX(IF(index=2, value, NULL)) FROM
UNNEST(hits.customMetrics)) AS Slider_previous_value_cm2, " + " (SELECT
MAX(IF(index=3, value, NULL)) FROM UNNEST(hits.customMetrics)) AS
Slider_Users_cm3, " + " (SELECT MAX(IF(index=4, value, NULL)) FROM
UNNEST(hits.customMetrics)) AS Slider_End_Value_cm4, " + " (SELECT
MAX(IF(index=5, value, NULL)) FROM UNNEST(hits.customMetrics)) AS
Slider_Start_Value_cm5, " + " (SELECT MAX(IF(index=6, value, NULL)) FROM
UNNEST(hits.customMetrics)) AS Saved_Retirement_Age_Start_Value_cm6, " + "
(SELECT MAX(IF(index=7, value, NULL)) FROM UNNEST(hits.customMetrics)) AS
Saved_Retirement_Age_End_Value_cm7, " + " (SELECT MAX(IF(index=8, value,
NULL)) FROM UNNEST(hits.customMetrics)) AS
Saved_Deferral_Rate_Start_Value_cm8, " + " (SELECT MAX(IF(index=9, value,
NULL)) FROM UNNEST(hits.customMetrics)) AS
Saved_Deferral_Rate_End_Value_cm9, " + " (SELECT MAX(IF(index=10, value,
NULL)) FROM UNNEST(hits.customMetrics)) AS Equity_Start_Value_cm10, " + "
(SELECT MAX(IF(index=11, value, NULL)) FROM UNNEST(hits.customMetrics)) AS
Equity_End_Value_cm11, " + " (SELECT MAX(IF(index=12, value, NULL)) FROM
UNNEST(hits.customMetrics)) AS Bond_Start_Value_cm12, " + " (SELECT
MAX(IF(index=13, value, NULL)) FROM UNNEST(hits.customMetrics)) AS
Bond_End_Value_cm13, " + " (SELECT MAX(IF(index=14, value, NULL)) FROM
UNNEST(hits.customMetrics)) AS Salary_cm14, " + " (SELECT MAX(IF(index=15,
value, NULL)) FROM UNNEST(hits.customMetrics)) AS
Deferral_Rate_Change_Pct_cm15, " + " (SELECT MAX(IF(index=16, value, NULL))
FROM UNNEST(hits.customMetrics)) AS BNS_Recommended_Dollar_cm16, " + "
(SELECT MAX(IF(index=17, value, NULL)) FROM UNNEST(hits.customMetrics)) AS
BNS_Recommended_Pct_cm17, " + " (SELECT MAX(IF(index=18, value, NULL)) FROM
UNNEST(hits.customMetrics)) AS Participant_Age_cm18, " + " (SELECT
MAX(IF(index=19, value, NULL)) FROM UNNEST(hits.customMetrics)) AS
Estimated_Monthly_Income_cm19, " + " (SELECT MAX(IF(index=20, value, NULL))
FROM UNNEST(hits.customMetrics)) AS Percent_of_Goal_cm20, " + " (SELECT
MAX(IF(index=21, value, NULL)) FROM UNNEST(hits.customMetrics)) AS
Income_Goal_cm21, " + " (SELECT MAX(IF(index=22, value, NULL)) FROM
UNNEST(hits.customMetrics)) AS Income_Gap_cm22, " + " (SELECT
MAX(IF(index=23, value, NULL)) FROM UNNEST(hits.customMetrics)) AS
test_Avg_Deferral_Start_cm23, " + " (SELECT MAX(IF(index=24, value, NULL))
FROM UNNEST(hits.customMetrics)) AS test_Avg_Deferral_End_cm24, " + "
(SELECT MAX(IF(index=25, value, NULL)) FROM UNNEST(hits.customMetrics)) AS
Goal_Base_Salary_New_cm25, " + " (SELECT MAX(IF(index=26, value, NULL)) FROM
UNNEST(hits.customMetrics)) AS Goal_Percent_Of_Salary_New_cm26, " + "
(SELECT MAX(IF(index=27, value, NULL)) FROM UNNEST(hits.customMetrics)) AS
Goal_Base_Salary_Previous_cm27, " + " (SELECT MAX(IF(index=28, value, NULL))
FROM UNNEST(hits.customMetrics)) AS Goal_Percent_Of_Salary_Previous_cm28, "
+ " hits.sourcePropertyInfo.sourcePropertyDisplayName, " + "
hits.sourcePropertyInfo.sourcePropertyTrackingId " + " FROM `<ProjectID>.
<DataSetId>.ga_sessions_*` , UNNEST(hits) as hits " + " WHERE _TABLE_SUFFIX
BETWEEN '" + START_DATE + "' AND '" + END_DATE + "' LIMIT 5"
val queryConfig =
QueryJobConfiguration.newBuilder(query).setUseLegacySql(false).build()
logger.info("query:" + query)
val jobId = JobId.of(UUID.randomUUID().toString)
logger.info("Jobid:" + jobId)
val bQry =
bqClient.create(JobInfo.newBuilder(queryConfig)
.setJobId(jobId).build()).getBigQuery
/*var queryJob =
bqClient.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build())
queryJob = queryJob.waitFor()
if (queryJob.getStatus.getError != null) {
//TODO:
} else if (queryJob == null) {
//TODO:
}
*/
bQry.getDataset(queryConfig.getDefaultDataset).toString
}
Чтение данных JSON в DataFrame:
<code>
def loadDataFromGoogleAnalytics : DataFrame = {
env.sparkSession
.read
.json((new BQQuery).extractBQData)
.toDF()
}
</code>