Запись потокового RDD Кафки в базу данных Oracle в Spark - PullRequest
0 голосов
/ 08 февраля 2019

Я пытаюсь загрузить данные из потока kafka в СУБД Oracle и улей, сообщения в Hive записываются правильно, но при загрузке в Oracle записи отсутствуют.

Ниже приведен фрагмент моего кода.Окно потоковой передачи Spark составляет 1 сек.

rowRDD.foreachPartition(new VoidFunction<Iterator<String () {
  private static final long serialVersionUID = 1L;                                       

    @Override
              public void call(Iterator<String> rowIterator) throws Exception { 

                           //prepared statement query                                 
                          String sql = "insert into  " + tableName + " (TRANS_AMOUNT,) values (?, ?)";

                          Connection connection = null;
                          PreparedStatement ps = null;

                        try {

                             connection = DriverManager.getConnection(jdbcDatabaseUrl, jdbcUsername, jdbcPassword);
                             ps = connection.prepareStatement(sql);

                          int count = 0;

                          RowTemplate message = new RowTemplate();                  

                          while (rowIterator.hasNext() ) {
                             String recordRow = rowIterator.next();

                             if (recordRow != null && !recordRow.equals("")) { 

                                //populate message object properties
                                message.setProperties(recordRow,kafkaRecorddelimiter);       

                                java.util.Date transDate = formatter.parse(trnsDateTime);
                                Timestamp transactionDate = new Timestamp(transDate.getTime());                         

                                String transAmount = message.getTransactionAmount() ;                            
                                ps.setInt(1, Integer.parseInt(transAmount);                                      
                                ps.setTimestamp(2, transactionDate);

                                ps.addBatch();                          

                               if(++count % batchSize == 0) 

                                    ps.executeBatch();

                             }
                             message.clearProperties();

                          }
                          ps.executeBatch(); // insert remaining records


  } 
...