Я реализовал следующее решение, которое использует шаблон производителя / потребителя, используя BlockingQueue и ExecutorService .Основной поток (производитель) создает экземпляр BlockingQueue для каждого из рабочих потоков (потребителей) и логическую переменную с переменным значением «прекращено», чтобы сигнализировать рабочим потокам, когда все данные были сгенерированы, и они должны прекратить выполнение (выход из цикла while,очистить очередь и записать оставшиеся данные в соединение jdbc).Производитель создает разные данные для каждого потока, используя два BlockingQueue blockingQueue1 и blockingQueue2.
Вот упрощенный MainThreadProducer, который просто генерирует целочисленные данные для двух рабочих потоков:
// MainThreadProducer.java
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class MainThreadProducer {
public static Logger logger = LogManager.getLogger(MainThreadProducer.class);
public final static BlockingQueue<Integer> blockingQueue1 = new LinkedBlockingDeque<>(100);
public final static BlockingQueue<Integer> blockingQueue2 = new LinkedBlockingDeque<>(100);
/* signal to the worker threads that all data has been generated */
public static volatile boolean terminated = false;
private void run () {
try {
ExecutorService executor = Executors.newFixedThreadPool(2);
Future<Integer> future1 = executor.submit(new WorkerThreadConsumer("1"));
Future<Integer> future2 = executor.submit(new WorkerThreadConsumer("2"));
for (int i = 0; i < 10023; ++i) {
blockingQueue1.put(i);
blockingQueue2.put(i*2);
}
executor.shutdown();
terminated = true;
int res1 = future1.get();
int res2 = future1.get();
logger.info("Total rows written (thread 1): " + res1);
logger.info("Total rows written (thread 2): " + res2);
}
catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
}
public static void main(String[] args) {
MainThreadProducer instance = new MainThreadProducer();
instance.run();
}
}
Вот класс WorkerThreadConsumer.java.Для этого теста я создаю два потока, которые будут записывать в базу данных DBTEST таблицы TARGET_1 и TARGET_2 соответственно.Каждый поток создается с определенным типом String (1 и 2), поэтому он может знать, из какого BlockingQueue ему нужно прочитать данные.
// WorkerThreadConsumer.java
import java.sql.PreparedStatement;
import com.microsoft.sqlserver.jdbc.SQLServerResultSet;
import java.sql.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import Configuration;
public class WorkerThreadConsumer implements Callable<Integer> {
private String type;
public WorkerThreadConsumer (String type) {
this.type = type;
}
@Override
public Integer call() {
String TAG = "[THREAD_" + Thread.currentThread().getId() + "]";
int processed = 0; // number of rows currently processed
int batchSize = 100; // size of the batch we write to the server with the PreparedStatement
try {
// load jdbc driver
Class.forName("com.microsoft.sqlserver.jdbc.SQLServerDriver");
MainThreadProducer.logger.info(TAG + "\tLoaded com.microsoft.sqlserver.jdbc.SQLServerDriver");
String stub = String.format("INSERT INTO DBTEST.dbo.TARGET_%s (id) VALUES (?);", this.type);
BlockingQueue<Integer> queue;
switch (this.type) {
case "1":
queue = MainThreadProducer.blockingQueue1;
break;
case "2":
queue = MainThreadProducer.blockingQueue2;
break;
default:
queue = MainThreadProducer.blockingQueue1;
}
try (Connection connection = DriverManager.getConnection(Configuration.DWH_DB_CONNECTION_URL);
PreparedStatement stmt = connection.prepareStatement(stub);) {
connection.setAutoCommit(false);
while (!MainThreadProducer.terminated) {
int data = queue.take();
stmt.setInt(1, data);
stmt.addBatch();
processed += 1;
if (processed % batchSize == 0) {
int[] result = stmt.executeBatch();
connection.commit();
MainThreadProducer.logger.info(TAG + "\tWritten rows count: " + result.length);
}
}
// empty queue and write
while (!queue.isEmpty()) {
int data = queue.take();
stmt.setInt(1, data);
stmt.addBatch();
processed += 1;
if (processed % batchSize == 0) {
int[] result = stmt.executeBatch();
connection.commit();
MainThreadProducer.logger.info(TAG + "\tWritten rows count: " + result.length);
}
}
// last write in case queue size > batch size
int[] result = stmt.executeBatch();
connection.commit();
MainThreadProducer.logger.info(TAG + "\tWritten rows count: " + result.length);
}
}
catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
return processed;
}
}
Решение, похоже, работает.Пожалуйста, дайте мне знать, если вы видите потенциальные проблемы.