Как обрабатывать недопустимые / ошибочные сообщения JSON, поступающие от PubSub для загрузки таблиц CloudSQL - PullRequest
0 голосов
/ 25 января 2019

Я читаю сообщение JSON, опубликованное в теме PubSub, и загружаю таблицы CloudSQL, используя метод JDBCIO.write() в потоке данных.

Я должен обработать сценарии сообщений об ошибках / недействительных JSON-сообщениях (например, InvalidSchema, InvalidDatatype), чтобы в задании потокового потока данных не отображались ошибки на графике потока данных.

При загрузке данных в Bigquery мы можем обработатьэти сценарии с помощью withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()).

Я пытался обработать этот сценарий для CloudSQL, используя withRetryStrategy(RetryStrategy retryStrategy) в задании потока данных.Но все же я вижу количество записей об ошибках в задании потоковой передачи данных.Из-за этого я не могу получать действительные сообщения после получения недопустимых / ошибочных сообщений JSON в потоке данных.

Ниже приведен фрагмент кода для метода withRetryStrategy():

.withRetryStrategy(new RetryStrategy() { @Override public boolean apply(SQLException sqlException) { logger.warn("SQLState: " + sqlException.getSQLState() + "\t SQLException: " + sqlException.getMessage()); return false; } })

Если apply(SQLException) возвращает true , то JDBCIO.write() будет повторять одно и то же утверждение бесконечное число раз.
В приведенном выше коде я возвратил false в переопределенном методе apply, но все ещеэта проблема возникает.Поэтому я попытался вернуть true и вижу тот же результат.

Может ли кто-нибудь посоветовать, как обрабатывать сценарии с недопустимыми / ошибочными записями такого типа в задании потоковой передачи данных для загрузки таблиц CloudSQL?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...