Одна транзакция с использованием нескольких соединений. (MYSQL / JDBC) - PullRequest
4 голосов
/ 29 ноября 2011

Приложение, над которым я работаю, представляет собой основанный на Java процесс ETL, который загружает данные в несколько таблиц. СУБД - Infobright (СУБД на основе MYSQL, предназначенная для хранилищ данных).

Загрузка данных должна выполняться атомарно; однако по соображениям производительности я хочу загружать данные в несколько таблиц одновременно (используя команду LOAD DATA INFILE). Это означает, что мне нужно открыть несколько соединений.

Есть ли какое-либо решение, которое позволяет мне выполнять нагрузки атомарно и параллельно? (Я предполагаю, что ответ может зависеть от механизма для таблиц, в которые я загружаюсь; большинство из них - Brighthouse, который разрешает Транзакции, но без XA и без точек сохранения).

Чтобы уточнить, я хочу избежать ситуации, когда скажем:

  • загружаю данные в 5 таблиц
  • Я фиксирую нагрузки для первых 4 таблиц
  • Сбой при фиксации 5-й таблицы

В этой ситуации я не могу откатить первые 4 загрузки, потому что они уже зафиксированы.

Ответы [ 2 ]

5 голосов
/ 07 декабря 2011

Intro

Как я и обещал, я взломал полный пример.Я использовал MySQL и создал три таблицы, подобные следующей:

CREATE TABLE `test{1,2,3}` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `data` varchar(255) NOT NULL UNIQUE,
  PRIMARY KEY (`id`)
);

test2 изначально содержит одну строку.

INSERT INTO `test2` (`data`) VALUES ('a');

( Я разместил полный тексткод http://pastebin.com.)

В следующем примере выполняется несколько действий.

  1. Устанавливает threads в 3, который определяет, сколько заданий будет выполняться параллельно.
  2. Создает threads количество подключений.
  3. Изливает некоторые образцы данных для каждой таблицы (по умолчанию данные a для каждой таблицы).
  4. Создает threads количество заданийдля запуска и загрузки их с данными.
  5. Запускает задания в threads количестве потоков и ожидает их завершения (успешно или нет).
  6. Если нетвозникшие исключения фиксируют каждое соединение, в противном случае оно откатывает каждое из них.
  7. Закрывает соединения (однако их можно использовать повторно).

(Обратите внимание, что яиспользуется функция автоматического управления ресурсами Java 7 в SQLTask.call().)

Logic

public static void main(String[] args) throws SQLException, InterruptedException {
  int threads = 3;
  List<Connection> connections = getConnections(threads);
  Map<String, String> tableData = getTableData(threads);
  List<SQLTask> tasks = getTasks(threads, connections);
  setData(tableData, tasks);
  try {
    runTasks(tasks);
    commitConnections(connections);
  } catch (ExecutionException ex) {
    rollbackConnections(connections);
  } finally {
    closeConnections(connections);
  }
}

Данные

private static Map<String, String> getTableData(int threads) {
  Map<String, String> tableData = new HashMap<>();
  for (int i = 1; i <= threads; i++)
    tableData.put("test" + i, "a");
  return tableData;
}

Задачи

private static final class SQLTask implements Callable<Void> {

  private final Connection connection;

  private String data;
  private String table;

  public SQLTask(Connection connection) {
    this.connection = connection;
  }

  public void setTable(String table) {
    this.table = table;
  }

  public void setData(String data) {
    this.data = data;
  }

  @Override
  public Void call() throws SQLException {
    try (Statement statement = connection.createStatement()) {
      statement.executeUpdate(String.format(
        "INSERT INTO `%s` (data) VALUES  ('%s');", table, data));
    }
    return null;
  }
}

private static List<SQLTask> getTasks(int threads, List<Connection> connections) {
  List<SQLTask> tasks = new ArrayList<>();
  for (int i = 0; i < threads; i++)
    tasks.add(new SQLTask(connections.get(i)));
  return tasks;
}

private static void setData(Map<String, String> tableData, List<SQLTask> tasks) {
  Iterator<Entry<String, String>> i = tableData.entrySet().iterator();
  Iterator<SQLTask> j = tasks.iterator();
  while (i.hasNext()) {
    Entry<String, String> entry = i.next();
    SQLTask task = j.next();
    task.setTable(entry.getKey());
    task.setData(entry.getValue());
  }
}

Выполнить

private static void runTasks(List<SQLTask> tasks) 
    throws ExecutionException, InterruptedException {
  ExecutorService executorService = Executors.newFixedThreadPool(tasks.size());
  List<Future<Void>> futures = executorService.invokeAll(tasks);
  executorService.shutdown();
  for (Future<Void> future : futures)
    future.get();
}

Результат

С учетом данных по умолчанию, возвращаемых getTableData(...)

test1 -> `a`
test2 -> `a`
test3 -> `a`

и тот факт, что test2 уже содержит a (а столбец data равен уникально ), второе задание завершится ошибкой и вызовет исключение, поэтому каждое соединение будетоткат.

Если вместо a s вы вернете b s, то соединения будут зафиксированы безопасно.

Это можно сделать аналогично с LOAD DATA.


После ответа ОП на мой ответ я понял, что то, что она / он хочет сделать, невозможно сделать простым и понятным способом.

В основном проблема заключается в том, что после успешной фиксациизафиксированные данные нельзя откатить, потому что операция атомарная.Учитывая, что в данном случае требуется несколько коммитов, откат всего невозможен, если только один не отслеживает всех данных (во всех транзакциях) и если происходит что-то, удаляет все, что было успешно зафиксировано.

Есть хороший ответ , касающийся вопроса коммитов и откатов.

0 голосов
/ 17 февраля 2012

На самом деле в более новой версии IEE, а не ICE, есть дополнительная функция, называемая DLP (распределенная обработка нагрузки).На сайте есть файл PDF, ссылка на который приведена здесь:

http://www.infobright.com/Products/Features/

...