Обработка исключений в конвейерах Apache Beam при записи в базу данных с использованием Java - PullRequest
0 голосов
/ 31 мая 2019

При записи простых записей в таблицу в Postgres (может быть любой дБ) в конце конвейера некоторые потенциальные записи нарушают ограничения уникальности и вызывают исключение. Насколько я могу судить, нет простого способа справиться с этим изящно - конвейер либо полностью выдает ошибку, либо, в зависимости от бегуна, входит в бесконечную смертельную спираль.

Похоже, что нет никаких упоминаний об обработке ошибок для этого случая в луче docs . Средние сообщения об обработке ошибок, похоже, не относятся к этому конкретному типу PTransform, который возвращает PDone.

Этот ответ не понятен и лишен примеров.

В моем примере я читаю из файла с 2-мя дублирующими строками и пытаюсь записать их в таблицу.

CREATE TABLE foo (
    field CHARACTER VARYING(100) UNIQUE
);

foo.txt содержит:

a
a

Трубопровод выглядит так:

        Pipeline p = Pipeline.create();
        p.apply(TextIO.read().from("/path/to/foo.txt"))
        .apply(
                JdbcIO.<String>write()
                .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create("org.postgresql.Driver", "jdbc:postgresql://localhost:5432/somedb"))
                .withStatement("INSERT INTO foo (field) VALUES (?)")
                .withPreparedStatementSetter(new JdbcIO.PreparedStatementSetter<String>() {
                    private static final long serialVersionUID = 1L;
                    public void setParameters(String element, PreparedStatement query) throws SQLException {
                        query.setString(1, element);
                    }
                }))
        ;
        p.run();

Вот вывод из простого примера выше:

[WARNING]
org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO foo (field) VALUES ('a') was aborted: ERROR: duplicate key value violates unique constraint "foo_field_key"
  Detail: Key (field)=(a) already exists.  Call getNextException to see other errors in the batch.
    at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish (DirectRunner.java:332)
    at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish (DirectRunner.java:302)
    at org.apache.beam.runners.direct.DirectRunner.run (DirectRunner.java:197)
    at org.apache.beam.runners.direct.DirectRunner.run (DirectRunner.java:64)
    at org.apache.beam.sdk.Pipeline.run (Pipeline.java:313)
    at org.apache.beam.sdk.Pipeline.run (Pipeline.java:299)
    at com.thing.Main.main (Main.java:105)
    at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke (Method.java:498)
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:282)
    at java.lang.Thread.run (Thread.java:748)
Caused by: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO foo (field) VALUES ('a') was aborted: ERROR: duplicate key value violates unique constraint "foo_field_key"
  Detail: Key (field)=(a) already exists.  Call getNextException to see other errors in the batch.
    at org.postgresql.jdbc.BatchResultHandler.handleError (BatchResultHandler.java:148)
    at org.postgresql.core.ResultHandlerDelegate.handleError (ResultHandlerDelegate.java:50)
    at org.postgresql.core.v3.QueryExecutorImpl.processResults (QueryExecutorImpl.java:2184)
    at org.postgresql.core.v3.QueryExecutorImpl.execute (QueryExecutorImpl.java:481)
    at org.postgresql.jdbc.PgStatement.executeBatch (PgStatement.java:840)
    at org.postgresql.jdbc.PgPreparedStatement.executeBatch (PgPreparedStatement.java:1538)
    at org.apache.commons.dbcp2.DelegatingStatement.executeBatch (DelegatingStatement.java:345)
    at org.apache.commons.dbcp2.DelegatingStatement.executeBatch (DelegatingStatement.java:345)
    at org.apache.commons.dbcp2.DelegatingStatement.executeBatch (DelegatingStatement.java:345)
    at org.apache.commons.dbcp2.DelegatingStatement.executeBatch (DelegatingStatement.java:345)
    at org.apache.beam.sdk.io.jdbc.JdbcIO$Write$WriteFn.executeBatch (JdbcIO.java:846)
    at org.apache.beam.sdk.io.jdbc.JdbcIO$Write$WriteFn.finishBundle (JdbcIO.java:819)
Caused by: org.postgresql.util.PSQLException: ERROR: duplicate key value violates unique constraint "foo_field_key"
  Detail: Key (field)=(a) already exists.
    at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse (QueryExecutorImpl.java:2440)
    at org.postgresql.core.v3.QueryExecutorImpl.processResults (QueryExecutorImpl.java:2183)
    at org.postgresql.core.v3.QueryExecutorImpl.execute (QueryExecutorImpl.java:481)
    at org.postgresql.jdbc.PgStatement.executeBatch (PgStatement.java:840)
    at org.postgresql.jdbc.PgPreparedStatement.executeBatch (PgPreparedStatement.java:1538)
    at org.apache.commons.dbcp2.DelegatingStatement.executeBatch (DelegatingStatement.java:345)
    at org.apache.commons.dbcp2.DelegatingStatement.executeBatch (DelegatingStatement.java:345)
    at org.apache.commons.dbcp2.DelegatingStatement.executeBatch (DelegatingStatement.java:345)
    at org.apache.commons.dbcp2.DelegatingStatement.executeBatch (DelegatingStatement.java:345)
    at org.apache.beam.sdk.io.jdbc.JdbcIO$Write$WriteFn.executeBatch (JdbcIO.java:846)
    at org.apache.beam.sdk.io.jdbc.JdbcIO$Write$WriteFn.finishBundle (JdbcIO.java:819)
    at org.apache.beam.sdk.io.jdbc.JdbcIO$Write$WriteFn$DoFnInvoker.invokeFinishBundle (Unknown Source)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.finishBundle (SimpleDoFnRunner.java:285)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimplePushbackSideInputDoFnRunner.finishBundle (SimplePushbackSideInputDoFnRunner.java:118)
    at org.apache.beam.runners.direct.ParDoEvaluator.finishBundle (ParDoEvaluator.java:223)
    at org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.finishBundle (DoFnLifecycleManagerRemovingTransformEvaluator.java:73)
    at org.apache.beam.runners.direct.DirectTransformExecutor.finishBundle (DirectTransformExecutor.java:188)
    at org.apache.beam.runners.direct.DirectTransformExecutor.run (DirectTransformExecutor.java:126)
    at java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511)
    at java.util.concurrent.FutureTask.run (FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:624)
    at java.lang.Thread.run (Thread.java:748)

Я бы хотел иметь возможность арестовать это исключение и перенаправить его на какую-нибудь мертвую букву.

1 Ответ

0 голосов
/ 31 мая 2019

В Beam пока нет общего способа сделать это.Время от времени ведутся дискуссии об изменении IO, чтобы не возвращать PDone, но, насколько мне известно, нет ничего легкодоступного.

В настоящий момент я могу придумать пару обходных путей, все они далеки от идеальных:

  • в программе драйвера обрабатывает перезапуск конвейера при сбое;
  • копирование-вставка JdbcIO, его частей или реализация собственного Jdbc ParDo с пользовательской обработкой исключений;
  • добавим функцию обработки исключений в JdbcIO и добавим ее в Beam, это будет оценено;
...