Flink - Почему я должен создать свою собственную функцию RichSinkFunction вместо того, чтобы просто открывать и закрывать мое соединение PostgreSql? - PullRequest
0 голосов
/ 21 мая 2019

Я хотел бы знать, почему мне действительно нужно создать свою собственную функцию RichSinkFunction или использовать JDBCOutputFormat для подключения к базе данных, а не просто для создания моего соединения, выполнения запроса и закрытия соединения с использованием традиционных драйверов PostgreSQL внутри моей SinkFunction?

Я нашел много статей, рассказывающих об этом, но не объясняет почему? В чем разница?

Пример кода с использованием JDBCOutputFormat,

JDBCOutputFormat jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
     .setDrivername("org.postgresql.Driver")
     .setDBUrl("jdbc:postgresql://localhost:1234/test?user=xxx&password=xxx")
     .setQuery(query)
     .setSqlTypes(new int[] { Types.VARCHAR, Types.VARCHAR, Types.VARCHAR }) //set the types
     .finish();

Пример кода, реализующего собственную RichSinkFunction,

public class RichCaseSink extends RichSinkFunction<Case> {

  private static final String UPSERT_CASE = "INSERT INTO public.cases (caseid, tracehash) "
      + "VALUES (?, ?) "
      + "ON CONFLICT (caseid) DO UPDATE SET "
      + "  tracehash=?";

  private PreparedStatement statement;


  @Override
  public void invoke(Case aCase) throws Exception {

    statement.setString(1, aCase.getId());
    statement.setString(2, aCase.getTraceHash());
    statement.setString(3, aCase.getTraceHash());
    statement.addBatch();
    statement.executeBatch();
  }

  @Override
  public void open(Configuration parameters) throws Exception {
    Class.forName("org.postgresql.Driver");
    Connection connection =
        DriverManager.getConnection("jdbc:postgresql://localhost:5432/casedb?user=signavio&password=signavio");

    statement = connection.prepareStatement(UPSERT_CASE);
  }

}

почему я не могу просто использовать драйвер PostgreSQL?

public class Storable implements SinkFunction<Activity>{

    @Override
    public void invoke(Activity activity) throws Exception {
        Class.forName("org.postgresql.Driver");
        try(Connection connection =
            DriverManager.getConnection("jdbc:postgresql://localhost:5432/casedb?user=signavio&password=signavio")){

        statement = connection.prepareStatement(UPSERT_CASE);

        //Perform the query

        //close connection...
        }
    }

}

Кто-нибудь знает технический ответ на лучшую практику во Флинке? Осуществляет ли реализация RichSinkFunction или использование JDBCOutputFormat что-то особенное?

Заранее спасибо.

1 Ответ

2 голосов
/ 22 мая 2019

Ну, вы можете использовать свой собственный SinkFunction, который будет просто использовать invoke() метод для открытия соединения и записи данных, и это должно работать в целом. Но его производительность в большинстве случаев будет очень, очень плохой.

Фактическая разница между первым и вторым примерами заключается в том, что в RichSinkFunction вы используете open() метод, чтобы открыть соединение и подготовить оператор. Этот метод open() вызывается только один раз при инициализации функции. Во втором примере вы откроете соединение с базой данных и подготовите оператор внутри метода invoke(), который вызывается для каждого элемента ввода DataStream. Вы фактически откроете новое соединение для каждого элемента в поток .

Создание соединения с базой данных - дело дорогое, и оно наверняка будет иметь ужасные недостатки в производительности.

...