Gcp Dataflow обрабатывает недопустимые данные - PullRequest
0 голосов
/ 06 августа 2020

У нас есть API в качестве прокси-сервера между клиентами и Google Pub / Sub, поэтому он в основном извлекает тело JSON и публикует его в топах c. Затем он обрабатывается DataFlow, который сохраняет его в BigQuery. Кроме того, мы используем UDF для преобразования, например, значения поля в верхний регистр; он анализирует JSON отправленных и выдает новый.

Проблема в следующем. Количество байтов, отправленных в целевую таблицу, намного меньше, чем в мертвую букву, и сообщение об ошибке составляет 99% процентов и содержит ошибку о том, что отправленное JSON недействительно. И это правда, столбец payloadstring содержит искаженные файлы JSON: они могут быть усечены, объединены с другими или даже обоими. Я добавил журналы на стороне API, чтобы увидеть, где набор сообщений поврежден, но ни полученные, ни отправленные API тела JSON недействительны.

Как я могу отладить эту проблему? Есть ли вероятность повреждения сообщений публикацией / подпиской или потоком данных? Если да, что я могу сделать, чтобы это исправить?

UPD. Кстати, мы используем предоставленный Google шаблон под названием «pubsub topi c to bigquery»

UPD2. API написан на Go, и мы отправляем сообщение просто путем вызова res := p.topic.Publish(ctx, &pubsub.Message{Data: msg})

Переменная res затем используется для регистрации ошибок. p - это настраиваемая структура.

Сообщение, которое мы отправили, - это JSON с 15 полями, и для краткости я высмею его и UDF. Сообщение:

 {"MessageName":"Name","MessageTimestamp":123123123",...}

UDF:

function transform(inJson) {
  var obj;
  try {
    obj = JSON.parse(inJson);
  } catch (error){
    throw 'parse JSON error: '+error;
  }

  if (Object.keys(obj).length !== 15){
    throw "Message is invalid";
  }

  if (!(obj.hasOwnProperty('EventSource') && typeof obj.EventSource === 'string' && obj.MessageName.length>0)) {
    throw "MessageName is absent or invalid";
  }
  /*
     other fields check
  */

  obj.MessageName = obj.MessageName.toUpperCase()
  /*
     other fields transform
  */ 


  return JSON.stringify(obj);
}

UPD3:

Помимо повреждения, я заметил, что каждый сообщение дублируется по крайней мере один раз, и дубликаты часто усекаются. Проблема возникла несколько дней a go, когда было значительное увеличение количества сообщений, но теперь оно вернулось в норму, и ошибка все еще там. Проблема наблюдалась раньше, но это был гораздо более редкий случай.

Ответы [ 2 ]

0 голосов
/ 11 августа 2020

Я выполнил тест, отправив JSON сообщений, содержащих 15 полей. Ваша функция UDF, а также шаблон потока данных работают нормально, так как я смог вставить данные в BigQuery.

Исходя из этого, кажется, что ваши сообщения уже повреждены до того, как попасть в Pub / Sub, я предлагаю вам проверьте свои сообщения, когда они поступят в Pub / Sub, и убедитесь, что они имеют правильный формат.

enter image description here введите описание изображения здесь Обратите внимание, что это необходимо для соответствия схемы сообщений схеме таблицы BigQuery.

0 голосов
/ 10 августа 2020

Описанное вами поведение предполагает, что данные повреждены до того, как они попадут в Pubsub или Dataflow.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...