Java jdbc Sql Server параллельной записи - PullRequest
0 голосов
/ 21 февраля 2019

У меня есть сценарий со следующими шагами:

  1. Использование mssql jdbc driver Мне нужно подключиться к таблице базы данных Sql Server A и получить несколько миллионовстрок.

  2. Мне нужно выполнить несколько операций обработки и анализа данных таблицы, а также данных, поступающих из других источников (например, веб-сервисов).

  3. Я должен записать обработанные данные в целевые таблицы B, C, D, E, F, G в другой базе данных Sql Server.Записи могут (должны?) Выполняться параллельно.

Я хотел бы спросить несколько советов о том, как правильно обрабатывать пункт 3. Я полагаю, это плохая идея, чтобы обеспечить такое же соединение сразличные потоки для параллельной записи в целевые таблицы.Моя общая идея - создать новый поток для каждой целевой таблицы (в данном случае 6) и создать отдельное соединение jdbc для каждой таблицы, таким образом, теоретически каждая запись может выполняться параллельно и независимо друг от друга.

Будет ли это работать?Предложения по другим / лучшим способам?

Ответы [ 2 ]

0 голосов
/ 22 февраля 2019

Я реализовал следующее решение, которое использует шаблон производителя / потребителя, используя BlockingQueue и ExecutorService .Основной поток (производитель) создает экземпляр BlockingQueue для каждого из рабочих потоков (потребителей) и логическую переменную с переменным значением «прекращено», чтобы сигнализировать рабочим потокам, когда все данные были сгенерированы, и они должны прекратить выполнение (выход из цикла while,очистить очередь и записать оставшиеся данные в соединение jdbc).Производитель создает разные данные для каждого потока, используя два BlockingQueue blockingQueue1 и blockingQueue2.

Вот упрощенный MainThreadProducer, который просто генерирует целочисленные данные для двух рабочих потоков:

// MainThreadProducer.java

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class MainThreadProducer {

    public static Logger logger = LogManager.getLogger(MainThreadProducer.class);
    public final static BlockingQueue<Integer> blockingQueue1 = new LinkedBlockingDeque<>(100);
    public final static BlockingQueue<Integer> blockingQueue2 = new LinkedBlockingDeque<>(100);

    /* signal to the worker threads that all data has been generated */
    public static volatile boolean terminated = false;


    private void run () {

        try {

            ExecutorService executor = Executors.newFixedThreadPool(2);
            Future<Integer> future1 = executor.submit(new WorkerThreadConsumer("1"));
            Future<Integer> future2 = executor.submit(new WorkerThreadConsumer("2"));

            for (int i = 0; i < 10023; ++i) {

                blockingQueue1.put(i);
                blockingQueue2.put(i*2);

            }

            executor.shutdown();
            terminated = true;

            int res1 = future1.get();
            int res2 = future1.get();

            logger.info("Total rows written (thread 1): " + res1);
            logger.info("Total rows written (thread 2): " + res2);            

        }
        catch (Exception e) {

            e.printStackTrace();
            System.exit(1);

        }

    }

    public static void main(String[] args) {

        MainThreadProducer instance = new MainThreadProducer();
        instance.run();

    }

}

Вот класс WorkerThreadConsumer.java.Для этого теста я создаю два потока, которые будут записывать в базу данных DBTEST таблицы TARGET_1 и TARGET_2 соответственно.Каждый поток создается с определенным типом String (1 и 2), поэтому он может знать, из какого BlockingQueue ему нужно прочитать данные.

// WorkerThreadConsumer.java

import java.sql.PreparedStatement;
import com.microsoft.sqlserver.jdbc.SQLServerResultSet;
import java.sql.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import Configuration;


public class WorkerThreadConsumer implements Callable<Integer> {

    private String type;

    public WorkerThreadConsumer (String type) {

        this.type = type;        

    }

    @Override
    public Integer call() {        

        String TAG = "[THREAD_" + Thread.currentThread().getId() + "]";
        int processed = 0; // number of rows currently processed
        int batchSize = 100; // size of the batch we write to the server with the PreparedStatement

        try {
            // load jdbc driver
            Class.forName("com.microsoft.sqlserver.jdbc.SQLServerDriver");
            MainThreadProducer.logger.info(TAG + "\tLoaded com.microsoft.sqlserver.jdbc.SQLServerDriver");

            String stub = String.format("INSERT INTO DBTEST.dbo.TARGET_%s (id) VALUES (?);", this.type);

            BlockingQueue<Integer> queue;

            switch (this.type) {
                case "1":
                    queue = MainThreadProducer.blockingQueue1;
                    break;

                case "2":
                    queue = MainThreadProducer.blockingQueue2;
                    break;

                default:
                    queue = MainThreadProducer.blockingQueue1;
            }

            try (Connection connection = DriverManager.getConnection(Configuration.DWH_DB_CONNECTION_URL);
                 PreparedStatement stmt = connection.prepareStatement(stub);) {

                connection.setAutoCommit(false);

                while (!MainThreadProducer.terminated) {

                    int data = queue.take();
                    stmt.setInt(1, data);
                    stmt.addBatch();
                    processed += 1;

                    if (processed % batchSize == 0) {
                        int[] result = stmt.executeBatch();
                        connection.commit();
                        MainThreadProducer.logger.info(TAG + "\tWritten rows count: " + result.length);
                    }

                }

                // empty queue and write
                while (!queue.isEmpty()) {
                    int data = queue.take();
                    stmt.setInt(1, data);
                    stmt.addBatch();
                    processed += 1;

                    if (processed % batchSize == 0) {
                        int[] result = stmt.executeBatch();
                        connection.commit();
                        MainThreadProducer.logger.info(TAG + "\tWritten rows count: " + result.length);
                    }
                }

                // last write in case queue size > batch size
                int[] result = stmt.executeBatch();
                connection.commit();
                MainThreadProducer.logger.info(TAG + "\tWritten rows count: " + result.length);


            }

        }
        catch (Exception e) {

            e.printStackTrace();
            System.exit(1);

        }

        return processed;
    }



}

Решение, похоже, работает.Пожалуйста, дайте мне знать, если вы видите потенциальные проблемы.

0 голосов
/ 21 февраля 2019

Моя общая идея - создать новый поток для каждой целевой таблицы (в данном случае 6) и создать разные соединения jdbc для каждой таблицы, таким образом, в теории каждая запись может выполняться параллельно и независимо друг от друга..

Конечно, для меня это хороший план.Я бы использовал пул соединений, такой как HikariCP или DBCP , чтобы поддерживать несколько соединений с вашим сервером базы данных.Затем вы можете добавить несколько потоков, каждый из которых может запросить соединение, а затем вернуть его в пул для последующего использования.

Будет ли это работать?Предложения по другим / лучшим способам?

Будет работать.Следует учитывать, что число 6 может быть неправильным.Ваш сервер может не иметь пропускной способности для обработки такого большого количества данных одновременно, поэтому вы можете рассмотреть возможность уменьшения количества потоков в вашем пуле, пока не найдете оптимальное число, которое даст вам наибольшую пропускную способность.Тем не менее, если существует 6 таблиц, тогда 6 действительно может быть правильным числом в зависимости от того, как данные разделены на сервере.

В зависимости от того, насколько вы осведомлены о потоках, вам следует проверить документы по пулу потоков .

...