BQ Переключение в секционированную таблицу TIMESTAMP - PullRequest
0 голосов
/ 07 февраля 2019

Я пытаюсь перенести IngestionTime (_PARTITIONTIME) в многораздельные таблицы TIMESTAMP в BQ.При этом мне также нужно добавить несколько обязательных столбцов.Однако, когда я щелкаю переключатель и перенаправляю свой поток данных в новую многораздельную таблицу TIMESTAMP, он ломается.На заметку:

  • Успешно вставлено около двух миллионов строк (вероятно, одна партия).Работа продолжает выполняться, но после этого ничего не вставляется.
  • Задание выполняется партиями.
  • Мой проект полностью на Java
  • Когда я запускаю его как потоковую передачу, он, кажется, работает как задумано.К сожалению, это не практично для моего варианта использования, и требуется пакетная обработка.

Я изучал проблему в течение нескольких дней и пытался разбить переход на мельчайшие возможные шаги.Похоже, что этап, ответственный за ошибку, вводит ОБЯЗАТЕЛЬНЫЕ переменные (он отлично работает, когда те же переменные имеют значение NULLABLE)Чтобы избежать любых возможных ошибок синтаксического анализа, я установил значения по умолчанию для всех ОБЯЗАТЕЛЬНЫХ переменных.

В настоящий момент я получаю следующую комбинацию ошибок, и я не уверен, как их устранить:

Первая ошибка, нечасто повторяется, но обычно в группах:

Агент профилирования не найден.Профили не будут доступны от этого работника

Часто встречается и в больших группах:

Невозможно проверить, что сериализованные элементы типа BoundedSource имеют хорошо определенный метод equals.Это может привести к неверным результатам в некоторых PipelineRunner

. Похоже, это одна очень большая группа из них:

Aborting Operations.java.lang.RuntimeException: Невозможно прочитать значение из состояния

В конце эта ошибка появляется каждые 5 минут только при незначительных ошибках синтаксического анализа, описанных ниже.

Обработказастрял в шаге BigQueryIO.Write / BatchLoads / SinglePartitionWriteTables / ParMultiDo (WriteTables) как минимум на 20 минут без вывода или завершения при завершении состояния

Из-за огромного объема данных, которые анализирует мой проект, существует несколько парсинговошибки, такие как неожиданный характер.Они редки, но не должны нарушать вставку данных.Если это произойдет, у меня возникнет более серьезная проблема, поскольку данные, которые я собираю, часто изменяются, и я могу настроить анализатор только после того, как увижу ошибку и, следовательно, увижу новый формат данных.Кроме того, это не приводит к разрыву таблицы ingestiontime (или к другим таблицам разделов меток времени).Тем не менее, вот пример ошибки синтаксического анализа:

Ошибка: неожиданный символ (',' (код 44)): ожидал двойную кавычку, чтобы начать имя поля

РЕДАКТИРОВАТЬ: Некоторые соответствующие образец кода:

public PipelineResult streamData() {
        try {
            GenericSection generic = new GenericSection(options.getBQProject(), options.getBQDataset(), options.getBQTable());
            Pipeline pipeline = Pipeline.create(options);

            pipeline.apply("Read PubSub Events", PubsubIO.readMessagesWithAttributes().fromSubscription(options.getInputSubscription()))
                                              .apply(options.getWindowDuration() + " Windowing",  generic.getWindowDuration(options.getWindowDuration()))
                                              .apply(generic.getPubsubToString())
                                              .apply(ParDo.of(new CrowdStrikeFunctions.RowBuilder()))
                                              .apply(new BigQueryBuilder().setBQDest(generic.getBQDest())
                                                                          .setStreaming(options.getStreamingUpload())
                                                                          .setTriggeringFrequency(options.getTriggeringFrequency())

                                                                          .build());

            return pipeline.run();
        } 
        catch (Exception e) {
            LOG.error(e.getMessage(), e);
            return null;
        }

Запись в BQ.Я попытался установить здесь поле для разделения напрямую, но, похоже, оно ни на что не влияло:

BigQueryIO.writeTableRows()
                .to(BQDest)
                .withMethod(Method.FILE_LOADS)
                .withNumFileShards(1000)
                .withTriggeringFrequency(this.triggeringFrequency)
                .withTimePartitioning(new TimePartitioning().setType("DAY"))
                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER);
    }

1 Ответ

0 голосов
/ 09 февраля 2019

После долгих копаний я обнаружил ошибку.У меня была логика синтаксического анализа (try / catch), которая ничего не возвращала (по сути, пустая строка) в случае ошибки синтаксического анализа.Это сломало бы BigQuery, поскольку в моей схеме было несколько ТРЕБУЕМЫХ строк.

Поскольку мое задание выполнялось в пакетном режиме, даже одна пустая строка может привести к сбою всего пакетного задания и ничего не вставлять.Это также объясняет, почему потоковая передача вставлена ​​просто отлично.Я удивлен, что BigQuery не выдал ошибку, утверждая, что я пытался вставить ноль в обязательное поле.

Придя к такому выводу, я также понял, что установка поля раздела в моем коде такженеобходимо, а не просто в схеме.Это можно сделать с помощью

.setField(partitionField)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...