Я читаю сообщение JSON, опубликованное в теме PubSub, и загружаю таблицы CloudSQL, используя метод JDBCIO.write()
в потоке данных.
Я должен обработать сценарии сообщений об ошибках / недействительных JSON-сообщениях (например, InvalidSchema, InvalidDatatype), чтобы в задании потокового потока данных не отображались ошибки на графике потока данных.
При загрузке данных в Bigquery мы можем обработатьэти сценарии с помощью withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
.
Я пытался обработать этот сценарий для CloudSQL, используя withRetryStrategy(RetryStrategy retryStrategy)
в задании потока данных.Но все же я вижу количество записей об ошибках в задании потоковой передачи данных.Из-за этого я не могу получать действительные сообщения после получения недопустимых / ошибочных сообщений JSON в потоке данных.
Ниже приведен фрагмент кода для метода withRetryStrategy()
:
.withRetryStrategy(new RetryStrategy() {
@Override
public boolean apply(SQLException sqlException) {
logger.warn("SQLState: " + sqlException.getSQLState() + "\t SQLException: "
+ sqlException.getMessage());
return false;
}
})
Если apply(SQLException)
возвращает true , то JDBCIO.write()
будет повторять одно и то же утверждение бесконечное число раз.
В приведенном выше коде я возвратил false в переопределенном методе apply
, но все ещеэта проблема возникает.Поэтому я попытался вернуть true и вижу тот же результат.
Может ли кто-нибудь посоветовать, как обрабатывать сценарии с недопустимыми / ошибочными записями такого типа в задании потоковой передачи данных для загрузки таблиц CloudSQL?