Комментарии к моему вопросу привели меня к выводу, что проблема заключалась в потоке буфера. Промывка заняла много времени - невозможно промыть вручную.
Я закончил тем, что читал эту проблему и комментировал GitHub здесь . Вместо этого предлагается использовать задание загрузки.
После некоторых исследований я понял, что можно читать из io.Reader
, а также из справочника Google Cloud Storage, настроив загрузчик ReaderSource
.
Моя оригинальная реализация, которая использовала потоковый буфер, выглядела так:
var vss []*bigquery.ValuesSaver
// for each row:
vss = append(vss, &bigquery.ValuesSaver{
Schema: schema,
InsertID: fmt.Sprintf(index of loop),
Row: []bigquery.Value{
"data"
},
})
err := uploader.Put(ctx, vss)
if err != nil {
if pmErr, ok := err.(bigquery.PutMultiError); ok {
for _, rowInsertionError := range pmErr {
log.Println(rowInsertionError.Errors)
}
}
return fmt.Errorf("failed to insert data: %v", err)
}
Мне удалось изменить это на загрузочную работу с кодом, который выглядел следующим образом:
var lines []string
for _, v := range rows {
json, err := json.Marshal(v)
if err != nil {
return fmt.Errorf("failed generate json %v, %+v", err, v)
}
lines = append(lines, string(json))
}
dataString := strings.Join(lines, "\n")
rs := bigquery.NewReaderSource(strings.NewReader(dataString))
rs.FileConfig.SourceFormat = bigquery.JSON
rs.FileConfig.Schema = schema
loader := dataset.Table(t2Name).LoaderFrom(rs)
loader.CreateDisposition = bigquery.CreateIfNeeded
loader.WriteDisposition = bigquery.WriteTruncate
job, err := loader.Run(ctx)
if err != nil {
return fmt.Errorf("failed to start load job %v", err)
}
_, err := job.Wait(ctx)
if err != nil {
return fmt.Errorf("load job failed %v", err)
}
Теперь данные доступны в таблице «немедленно» - мне больше не нужно ждать потокового буфера.