CSV-копия в Postgres с массивом нестандартного типа с использованием JDBC - PullRequest
0 голосов
/ 30 мая 2018

У меня есть пользовательский тип, определенный в моей базе данных как

CREATE TYPE address AS (ip inet, port int);

И таблица, которая использует этот тип в массиве:

CREATE TABLE my_table (
  addresses  address[] NULL
)

У меня есть образец файла CSV сследующее содержимое

{(10.10.10.1,80),(10.10.10.2,443)}
{(10.10.10.3,8080),(10.10.10.4,4040)}

И я использую следующий фрагмент кода для выполнения моей КОПИИ:

    Class.forName("org.postgresql.Driver");

    String input = loadCsvFromFile();

    Reader reader = new StringReader(input);

    Connection connection = DriverManager.getConnection(
            "jdbc:postgresql://db_host:5432/db_name", "user",
            "password");

    CopyManager copyManager = connection.unwrap(PGConnection.class).getCopyAPI();

    String copyCommand = "COPY my_table (addresses) " + 
                         "FROM STDIN WITH (" + 
                           "DELIMITER '\t', " + 
                           "FORMAT csv, " + 
                           "NULL '\\N', " + 
                           "ESCAPE '\"', " +
                           "QUOTE '\"')";

    copyManager.copyIn(copyCommand, reader);

При выполнении этой программы выдается следующее исключение:

Exception in thread "main" org.postgresql.util.PSQLException: ERROR: malformed record literal: "(10.10.10.1"
  Detail: Unexpected end of input.
  Where: COPY only_address, line 1, column addresses: "{(10.10.10.1,80),(10.10.10.2,443)}"
    at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2422)
    at org.postgresql.core.v3.QueryExecutorImpl.processCopyResults(QueryExecutorImpl.java:1114)
    at org.postgresql.core.v3.QueryExecutorImpl.endCopy(QueryExecutorImpl.java:963)
    at org.postgresql.core.v3.CopyInImpl.endCopy(CopyInImpl.java:43)
    at org.postgresql.copy.CopyManager.copyIn(CopyManager.java:185)
    at org.postgresql.copy.CopyManager.copyIn(CopyManager.java:160)

Iпробовал с разными комбинациями скобок во входных данных, но, похоже, не работает COPY.Есть идеи, где я могу пойти не так?

Ответы [ 3 ]

0 голосов
/ 08 июня 2018

В формате CSV , когда вы указываете разделитель, вы не можете использовать его как символ в ваших данных, если только вы не избежите его!пример csv-файла с использованием запятой в качестве разделителя

правильная запись: data1, data2 результаты анализа: [0] => data1 [1] => data2

неправильный результат: data,1, data2 результаты анализа: [0] => data [1] => 1 [2] => data2

наконец, вам не нужно загружать ваш файл как csv, а как простой файл, поэтому замените ваш метод loadCsvFromFile(); на

public String loadRecordsFromFile(File file) {
 LineIterator it = FileUtils.lineIterator(file, "UTF-8");
 StringBuilder sb = new StringBuilder();
 try {
   while (it.hasNext()) {
     sb.append(it.nextLine()).append(System.nextLine);
   }
 } 
 finally {
   LineIterator.closeQuietly(iterator);
 }

 return sb.toString();
}

Не забудьте добавить эту зависимость в ваш файл pom

<!-- https://mvnrepository.com/artifact/commons-io/commons-io -->

    <dependency>
        <groupId>commons-io</groupId>
        <artifactId>commons-io</artifactId>
        <version>2.6</version>
    </dependency>

Или загрузить JAR из commons.apache.org

0 голосов
/ 10 июня 2018

1NF

Прежде всего, я думаю, что ваш стол неправильный, потому что он не соответствует 1NF .Каждое поле должно содержать только атомарные атрибуты, но это не так.Почему не таблица типа:

CREATE TABLE my_table (
    id,
    ip inet,
    port int
)

Где id - номер вашей строки в исходном файле и ip / port один из адресов в этой строке?Пример данных:

id | ip         | port
-----------------------
1  | 10.10.10.1 | 80
1  | 10.10.10.2 | 443
2  | 10.10.10.3 | 8080
2  | 10.10.10.4 | 4040
...

Следовательно, вы сможете запрашивать вашу базу данных по одному адресу (найти все связанные адреса, вернуть true, если два адреса находятся на одной строке, что бы вы ни хотели)..).

Загрузить данные

Но давайте предположим, что вы знаете, что делаете.Основная проблема заключается в том, что ваш файл входных данных находится в специальном формате.Это может быть CSV-файл с одним столбцом, но это будет очень вырожденный CSV-файл.В любом случае, вы должны преобразовать строки, прежде чем вставлять их в базу данных.У вас есть два варианта:

  1. вы читаете каждую строку входного файла и делаете INSERT (это может занять некоторое время);
  2. вы конвертируете входной файл втекстовый файл с ожидаемым форматом и использовать COPY.

Вставить один за другим

Первые варианты кажутся простыми: для первой строки файла CSV, {(10.10.10.1,80),(10.10.10.2,443)},вам нужно выполнить запрос:

INSERT INTO my_table VALUES (ARRAY[('10.10.10.1',80),('10.10.10.2',443)]::address[], 4)

. Для этого вам просто нужно создать новую строку:

String value = row.replaceAll("\\{", "ARRAY[")
                    .replaceAll("\\}", "]::address[]")
                    .replaceAll("\\(([0-9.]+),", "'$1'");
String sql = String.format("INSERT INTO my_table VALUES (%s)", value);

и выполнить запрос для каждой строки входного файла (или для большей безопасности используйте подготовленный оператор ).

Вставьте с COPY

Я подробно остановлюсь на втором варианте.Вы должны использовать в коде Java:

copyManager.copyIn(sql, from);

, где запрос на копирование является оператором COPY FROM STDIN, а from - читателем.Оператор будет выглядеть следующим образом:

COPY my_table (addresses) FROM STDIN WITH (FORMAT text);

Для подачи в менеджер копий необходимы данные типа (обратите внимание на кавычки):

{"(10.10.10.1,80)","(10.10.10.2,443)"}
{"(10.10.10.3,8080)","(10.10.10.4,4040)"}

С временным файлом

Более простой способ получить данные в правильном формате - создать временный файл.Вы читаете каждую строку входного файла и заменяете ( на "( и ) на )".Запишите эту обработанную строку во временный файл.Затем передайте средство чтения этого файла в менеджер копий.

На лету

С двумя потоками Вы можете использовать два потока:

  • поток 1 читает входной файл, обрабатывает строки одну за другой и записывает их в PipedWriter.

  • поток 2 передает PipedReader подключен к предыдущему PipedWriter к диспетчеру копирования.

Основная трудность заключается в синхронизации потоков таким образом, что поток 2 начинает читать PipedReader перед потоком 1начинает записывать данные в PipedWriter.См. этот мой проект для примера.

С пользовательским считывателем Считыватель from может быть примером чего-то вроде (Наивная версия):

class DataReader extends Reader {
    PushbackReader csvFileReader;
    private boolean wasParenthese;

    public DataReader(Reader csvFileReader) {
        this.csvFileReader = new PushbackReader(csvFileReader, 1);
        wasParenthese = false;
    }

    @Override
    public void close() throws IOException {
        this.csvFileReader.close();
    }

    @Override
    public int read(char[] cbuf, int off, int len) throws IOException {
        // rely on read()
        for (int i = off; i < off + len; i++) {
            int c = this.read();
            if (c == -1) {
                return i-off > 0 ? i-off : -1;
            }
            cbuf[i] = (char) c;
        }
        return len;
    }

    @Override
    public int read() throws IOException {
        final int c = this.csvFileReader.read();
        if (c == '(' && !this.wasParenthese) {
            this.wasParenthese = true;
            this.csvFileReader.unread('(');
            return '"'; // add " before (
        } else {
            this.wasParenthese = false;
            if (c == ')') {
                this.csvFileReader.unread('"');
                return ')';  // add " after )
            } else {
                return c;
            }
        }
    }
}

(Это наивная версия, потому что правильный способ сделать это - переопределить только public int read(char[] cbuf, int off, int len). Но вам нужно обработать cbuf, чтобы добавить кавычки и сохранитьдополнительные символы сдвинуты вправо: это немного утомительно).Теперь, если r считыватель для файла:

{(10.10.10.1,80),(10.10.10.2,443)}
{(10.10.10.3,8080),(10.10.10.4,4040)}

Просто используйте:

Class.forName("org.postgresql.Driver");
Connection connection = DriverManager
        .getConnection("jdbc:postgresql://db_host:5432/db_base", "user", "passwd");

CopyManager copyManager = connection.unwrap(PGConnection.class).getCopyAPI();
copyManager.copyIn("COPY my_table FROM STDIN WITH (FORMAT text)", new DataReader(r));

При массовой загрузке

Если вы загружаете огромное количестводанных, не забудьте основные советы : отключить автокоммит, удалить индексы и ограничения и использовать TRUNCATE и ANALYZE следующим образом:

TRUNCATE my_table;
COPY ...;
ANALYZE my_table;

Это ускоритзагрузка.

0 голосов
/ 03 июня 2018

См. https://git.mikael.io/mikaelhg/pg-object-csv-copy-poc/ для проекта с тестом JUnit, который делает то, что вы хотите.

По сути, вы хотите иметь возможность использовать запятые для двух вещей: для разделения элементов массива и дляотдельные поля типа, но вы не хотите, чтобы анализ CSV интерпретировал запятые как полевые линеаторы.

Итак,

  1. вы хотите сказать парсеру CSV считать всю строку одной строкой, одним полем, что вы можете сделать, заключив ее в одинарные кавычки и сообщив CSVпарсер об этом, и
  2. вы хотите, чтобы парсер полей PG считал каждый экземпляр типа элемента массива заключенным в двойные кавычки.

Код:

copyManager.copyIn("COPY my_table (addresses) FROM STDIN WITH CSV QUOTE ''''", reader);

DML пример 1:

COPY my_table (addresses) FROM STDIN WITH CSV QUOTE ''''

CSV пример 1:

'{"(10.0.0.1,1)","(10.0.0.2,2)"}'
'{"(10.10.10.1,80)","(10.10.10.2,443)"}'
'{"(10.10.10.3,8080)","(10.10.10.4,4040)"}'

DML пример 2, минуя двойные кавычки:

COPY my_table (addresses) FROM STDIN WITH CSV

CSV пример 2,экранирование двойных кавычек:

"{""(10.0.0.1,1)"",""(10.0.0.2,2)""}"
"{""(10.10.10.1,80)"",""(10.10.10.2,443)""}"
"{""(10.10.10.3,8080)"",""(10.10.10.4,4040)""}"

Полный тестовый класс JUnit:

package io.mikael.poc;

import com.google.common.io.CharStreams;
import org.junit.*;
import org.postgresql.PGConnection;
import org.postgresql.copy.CopyManager;
import org.testcontainers.containers.PostgreSQLContainer;

import java.io.*;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;

import static java.nio.charset.StandardCharsets.UTF_8;

public class CopyTest {

    private Reader reader;

    private Connection connection;

    private CopyManager copyManager;

    private static final String CREATE_TYPE = "CREATE TYPE address AS (ip inet, port int)";

    private static final String CREATE_TABLE = "CREATE TABLE my_table (addresses  address[] NULL)";

    private String loadCsvFromFile(final String fileName) throws IOException {
        try (InputStream is = getClass().getResourceAsStream(fileName)) {
            return CharStreams.toString(new InputStreamReader(is, UTF_8));
        }
    }

    @ClassRule
    public static PostgreSQLContainer db = new PostgreSQLContainer("postgres:10-alpine");

    @BeforeClass
    public static void beforeClass() throws Exception {
        Class.forName("org.postgresql.Driver");
    }

    @Before
    public void before() throws Exception {
        String input = loadCsvFromFile("/data_01.csv");
        reader = new StringReader(input);

        connection = DriverManager.getConnection(db.getJdbcUrl(), db.getUsername(), db.getPassword());
        copyManager = connection.unwrap(PGConnection.class).getCopyAPI();

        connection.setAutoCommit(false);
        connection.beginRequest();

        connection.prepareCall(CREATE_TYPE).execute();
        connection.prepareCall(CREATE_TABLE).execute();
    }

    @After
    public void after() throws Exception {
        connection.rollback();
    }

    @Test
    public void copyTest01() throws Exception {
        copyManager.copyIn("COPY my_table (addresses) FROM STDIN WITH CSV QUOTE ''''", reader);

        final StringWriter writer = new StringWriter();
        copyManager.copyOut("COPY my_table TO STDOUT WITH CSV", writer);
        System.out.printf("roundtrip:%n%s%n", writer.toString());

        final ResultSet rs = connection.prepareStatement(
                "SELECT array_to_json(array_agg(t)) FROM (SELECT addresses FROM my_table) t")
                .executeQuery();
        rs.next();
        System.out.printf("json:%n%s%n", rs.getString(1));
    }

}

Тестовый вывод:

roundtrip:
"{""(10.0.0.1,1)"",""(10.0.0.2,2)""}"
"{""(10.10.10.1,80)"",""(10.10.10.2,443)""}"
"{""(10.10.10.3,8080)"",""(10.10.10.4,4040)""}"

json:
[{"addresses":[{"ip":"10.0.0.1","port":1},{"ip":"10.0.0.2","port":2}]},{"addresses":[{"ip":"10.10.10.1","port":80},{"ip":"10.10.10.2","port":443}]},{"addresses":[{"ip":"10.10.10.3","port":8080},{"ip":"10.10.10.4","port":4040}]}]
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...