sqoop2 импорт из mysql в hdfs некоторые вопросы - PullRequest
0 голосов
/ 19 марта 2019

когда я использую java api, импортирую данные mysql в hdfs, у меня возникают вопросы. Это не ошибка. Вот моя демка

        // RDBMS link
    MLink rdbmsLink = client.createLink("generic-jdbc-connector");
    MConfigList configs = rdbmsLink.getConnectorLinkConfig();
   configs.getStringInput("linkConfig.jdbcDriver").setValue("com.mysql.jdbc.Driver");
    configs.getStringInput("linkConfig.connectionString").setValue("jdbc:mysql://127.0.0.1:3306/sqoop_test");
    configs.getStringInput("linkConfig.username").setValue("root");
    configs.getStringInput("linkConfig.password").setValue("123456789");
    rdbmsLink.getConnectorLinkConfig("dialect").getStringInput("dialect.identifierEnclose").setValue(" ");
    rdbmsLink.setName("mysql-append-link");
    Status fromStatus = client.saveLink(rdbmsLink);

Ссылка является общей.

  MLink hdfsLink = client.createLink("hdfs-connector");
    hdfsLink.setName("hdfs-append-link");
    hdfsLink.setCreationUser("root");
    MLinkConfig toLinkConfig = hdfsLink.getConnectorLinkConfig();
    toLinkConfig.getStringInput("linkConfig.uri").setValue("hdfs://cdh:9000/");
    client.saveLink(hdfsLink);

Эта ссылка также распространена. Важный код идет.

     MConfigList jobConfig = job.getFromJobConfig();
    jobConfig.getStringInput("fromJobConfig.sql").setValue("SELECT a.`jobid`,a.`userid`,a.`jobname`,a.`joblink`,a.`jobdate` ,b.`username` FROM `job_msg` as a LEFT JOIN `user_msg` as b ON a.`userid` = b.`userid`  WHERE ${CONDITIONS}");
    jobConfig.getStringInput("fromJobConfig.partitionColumn").setValue("jobdate");
    jobConfig.getStringInput("incrementalRead.checkColumn").setValue("jobdate");
    jobConfig.getStringInput("incrementalRead.lastValue").setValue("2018-08-09 00:11:11");

Я поставил fromJobConfig.sql, который использовал левое соединение. когда я запускаю работу, она может работать, но когда я вижу логи sqoop2-сервера, я обнаружил что-то вопрос.

New maximal value for incremental import is 2019-03-13 17:00:13.0

Использование запроса min / max: SELECT MIN ("дата задания"), MAX ("дата задания") ОТ (ВЫБРАТЬ a. jobid, a. userid, a. jobname, a. joblink, a. jobdate, b. username ОТ job_msg как ЛЕВОЕ СОЕДИНЕНИЕ user_msg как b ВКЛ. a. userid = b. userid ГДЕ 1 = 1) SQOOP_SUBQUERY_ALIAS ГДЕ "jobdate">? И "Jobdate" <=? </p>

Как мы видим, sql иррационально. Условие ("jobdate">? AND "jobdate" <=?) Находится вне подзапроса. Если есть много строк данных, я думаю, что это будет медленно. Я вижу исходный код Sqoop1.99.7 </p>

 sb.setLength(0);
  sb.append("SELECT ");
  sb.append("MAX(").append(executor.encloseIdentifier(jobConf.incrementalRead.checkColumn)).append(") ");
  sb.append("FROM ");
  sb.append(fromFragment);

  String incrementalNewMaxValueQuery = sb.toString();
  LOG.info("Incremental new max value query:  " + incrementalNewMaxValueQuery);
  try (
          PreparedStatement columnTypeStatement = executor.prepareStatement("SELECT " + executor.encloseIdentifier(jobConf.incrementalRead.checkColumn) + " FROM " + fromFragment + " WHERE 1 = 2");
          ResultSet columnTypeResultSet = columnTypeStatement.executeQuery();
          Statement statement = executor.createStatement();
          ResultSet rs = statement.executeQuery(incrementalNewMaxValueQuery)
  ) {
    ResultSetMetaData checkColumnMetaData = columnTypeResultSet.getMetaData();
    checkColumnScale = checkColumnMetaData.getScale(1);
    checkColumnType = checkColumnMetaData.getColumnType(1);

    if (!rs.next()) {
      throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0022);
    }

incrementalNewMaxValueQuery - это поиск sql. Теперь я не знаю, что это моя демоверсия sqoop2'job неправильно или есть что улучшить.

Спасибо

...