Я пытаюсь загрузить данные из потока kafka в СУБД Oracle и улей, сообщения в Hive записываются правильно, но при загрузке в Oracle записи отсутствуют.
Ниже приведен фрагмент моего кода.Окно потоковой передачи Spark составляет 1 сек.
rowRDD.foreachPartition(new VoidFunction<Iterator<String () {
private static final long serialVersionUID = 1L;
@Override
public void call(Iterator<String> rowIterator) throws Exception {
//prepared statement query
String sql = "insert into " + tableName + " (TRANS_AMOUNT,) values (?, ?)";
Connection connection = null;
PreparedStatement ps = null;
try {
connection = DriverManager.getConnection(jdbcDatabaseUrl, jdbcUsername, jdbcPassword);
ps = connection.prepareStatement(sql);
int count = 0;
RowTemplate message = new RowTemplate();
while (rowIterator.hasNext() ) {
String recordRow = rowIterator.next();
if (recordRow != null && !recordRow.equals("")) {
//populate message object properties
message.setProperties(recordRow,kafkaRecorddelimiter);
java.util.Date transDate = formatter.parse(trnsDateTime);
Timestamp transactionDate = new Timestamp(transDate.getTime());
String transAmount = message.getTransactionAmount() ;
ps.setInt(1, Integer.parseInt(transAmount);
ps.setTimestamp(2, transactionDate);
ps.addBatch();
if(++count % batchSize == 0)
ps.executeBatch();
}
message.clearProperties();
}
ps.executeBatch(); // insert remaining records
}