Загрузить данные в таблицу, не ожидая очистки потокового буфера - PullRequest
0 голосов
/ 13 января 2019

У меня есть программа Go, которая загружает данные из таблицы (T1), форматирует их и загружает в новую временную таблицу (T2). После загрузки данных (30 с или около того) их следует скопировать в третью таблицу (T3).

После загрузки отформатированных данных в T2 запрос к таблице возвращает результаты в порядке. Однако при копировании таблицы - задание завершается практически мгновенно, а таблица назначения (T3) пуста.

Я копирую таблицу как предложено здесь - но результат при выполнении действия в пользовательском интерфейсе тот же.

В разделе метаданных таблицы он отображается как 0B, 0 строк, но там около 100k строк и 18 МБ данных - или, по крайней мере, это то, что возвращается из запроса.

Редактировать Я не заметил, что эти данные все еще застряли в потоковом буфере - см. Мой ответ.

1 Ответ

0 голосов
/ 14 января 2019

Комментарии к моему вопросу привели меня к выводу, что проблема заключалась в потоке буфера. Промывка заняла много времени - невозможно промыть вручную.

Я закончил тем, что читал эту проблему и комментировал GitHub здесь . Вместо этого предлагается использовать задание загрузки.

После некоторых исследований я понял, что можно читать из io.Reader, а также из справочника Google Cloud Storage, настроив загрузчик ReaderSource.

Моя оригинальная реализация, которая использовала потоковый буфер, выглядела так:

var vss []*bigquery.ValuesSaver
// for each row:
vss = append(vss, &bigquery.ValuesSaver{
    Schema:   schema,
    InsertID: fmt.Sprintf(index of loop),
    Row: []bigquery.Value{
        "data"
    },
})

err := uploader.Put(ctx, vss)
if err != nil {
    if pmErr, ok := err.(bigquery.PutMultiError); ok {
        for _, rowInsertionError := range pmErr {
            log.Println(rowInsertionError.Errors)
        }
    }

    return fmt.Errorf("failed to insert data: %v", err)
}

Мне удалось изменить это на загрузочную работу с кодом, который выглядел следующим образом:

var lines []string
for _, v := range rows {
    json, err := json.Marshal(v)
    if err != nil {
        return fmt.Errorf("failed generate json %v, %+v", err, v)
    }
    lines = append(lines, string(json))
}

dataString := strings.Join(lines, "\n")

rs := bigquery.NewReaderSource(strings.NewReader(dataString))
rs.FileConfig.SourceFormat = bigquery.JSON
rs.FileConfig.Schema = schema
loader := dataset.Table(t2Name).LoaderFrom(rs)
loader.CreateDisposition = bigquery.CreateIfNeeded
loader.WriteDisposition = bigquery.WriteTruncate
job, err := loader.Run(ctx)
if err != nil {
    return fmt.Errorf("failed to start load job %v", err)
}
_, err := job.Wait(ctx)
if err != nil {
    return fmt.Errorf("load job failed %v", err)
}

Теперь данные доступны в таблице «немедленно» - мне больше не нужно ждать потокового буфера.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...