когда я использую java api, импортирую данные mysql в hdfs, у меня возникают вопросы. Это не ошибка.
Вот моя демка
// RDBMS link
MLink rdbmsLink = client.createLink("generic-jdbc-connector");
MConfigList configs = rdbmsLink.getConnectorLinkConfig();
configs.getStringInput("linkConfig.jdbcDriver").setValue("com.mysql.jdbc.Driver");
configs.getStringInput("linkConfig.connectionString").setValue("jdbc:mysql://127.0.0.1:3306/sqoop_test");
configs.getStringInput("linkConfig.username").setValue("root");
configs.getStringInput("linkConfig.password").setValue("123456789");
rdbmsLink.getConnectorLinkConfig("dialect").getStringInput("dialect.identifierEnclose").setValue(" ");
rdbmsLink.setName("mysql-append-link");
Status fromStatus = client.saveLink(rdbmsLink);
Ссылка является общей.
MLink hdfsLink = client.createLink("hdfs-connector");
hdfsLink.setName("hdfs-append-link");
hdfsLink.setCreationUser("root");
MLinkConfig toLinkConfig = hdfsLink.getConnectorLinkConfig();
toLinkConfig.getStringInput("linkConfig.uri").setValue("hdfs://cdh:9000/");
client.saveLink(hdfsLink);
Эта ссылка также распространена.
Важный код идет.
MConfigList jobConfig = job.getFromJobConfig();
jobConfig.getStringInput("fromJobConfig.sql").setValue("SELECT a.`jobid`,a.`userid`,a.`jobname`,a.`joblink`,a.`jobdate` ,b.`username` FROM `job_msg` as a LEFT JOIN `user_msg` as b ON a.`userid` = b.`userid` WHERE ${CONDITIONS}");
jobConfig.getStringInput("fromJobConfig.partitionColumn").setValue("jobdate");
jobConfig.getStringInput("incrementalRead.checkColumn").setValue("jobdate");
jobConfig.getStringInput("incrementalRead.lastValue").setValue("2018-08-09 00:11:11");
Я поставил fromJobConfig.sql
, который использовал левое соединение.
когда я запускаю работу, она может работать, но когда я вижу логи sqoop2-сервера, я обнаружил что-то вопрос.
New maximal value for incremental import is 2019-03-13 17:00:13.0
Использование запроса min / max: SELECT MIN ("дата задания"), MAX ("дата задания") ОТ (ВЫБРАТЬ a. jobid
, a. userid
, a. jobname
, a. joblink
, a. jobdate
, b. username
ОТ job_msg
как ЛЕВОЕ СОЕДИНЕНИЕ user_msg
как b ВКЛ. a. userid
= b. userid
ГДЕ 1 = 1) SQOOP_SUBQUERY_ALIAS ГДЕ "jobdate">? И "Jobdate" <=? </p>
Как мы видим, sql иррационально. Условие ("jobdate">? AND "jobdate" <=?) Находится вне подзапроса. Если есть много строк данных, я думаю, что это будет медленно. Я вижу исходный код Sqoop1.99.7 </p>
sb.setLength(0);
sb.append("SELECT ");
sb.append("MAX(").append(executor.encloseIdentifier(jobConf.incrementalRead.checkColumn)).append(") ");
sb.append("FROM ");
sb.append(fromFragment);
String incrementalNewMaxValueQuery = sb.toString();
LOG.info("Incremental new max value query: " + incrementalNewMaxValueQuery);
try (
PreparedStatement columnTypeStatement = executor.prepareStatement("SELECT " + executor.encloseIdentifier(jobConf.incrementalRead.checkColumn) + " FROM " + fromFragment + " WHERE 1 = 2");
ResultSet columnTypeResultSet = columnTypeStatement.executeQuery();
Statement statement = executor.createStatement();
ResultSet rs = statement.executeQuery(incrementalNewMaxValueQuery)
) {
ResultSetMetaData checkColumnMetaData = columnTypeResultSet.getMetaData();
checkColumnScale = checkColumnMetaData.getScale(1);
checkColumnType = checkColumnMetaData.getColumnType(1);
if (!rs.next()) {
throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0022);
}
incrementalNewMaxValueQuery - это поиск sql.
Теперь я не знаю, что это моя демоверсия sqoop2'job неправильно или есть что улучшить.
Спасибо