Разработка независимого от источника данных приложения с использованием пакетных данных - PullRequest
0 голосов
/ 29 октября 2019

У нас есть унаследованное приложение, которое считывает данные из mongo для каждого пользователя (результат запроса от маленького до большого в зависимости от запроса пользователя), и наше приложение создает файл для каждого пользователя и переносит его на FTP-сервер / s3. Мы читаем данные в виде курсора монго и записываем каждый пакет в файл, как только он получает данные пакета, поэтому производительность записи в файл приличная. Это приложение прекрасно работает, но привязано к монго и курсору монго.

Теперь нам нужно изменить дизайн этого приложения, поскольку мы должны поддерживать различные источники данных, например, MongoDB, Postgres DB, Kinesis, S3 и т. Д. на данный момент:

  1. Создание API данных для каждого источника и предоставление разбитого на страницы ответа REST. Это реальное решение, но оно может быть медленным при сравнении больших данных запроса с текущим откликом курсора.
  2. Создайте слой абстракции данных, подавая пакетные данные в kafka и читая поток пакетных данных в нашем файле generator.but большинствовремени пользователь запрашивает отсортированные данные, поэтому нам нужно будет читать сообщения в последовательности. Мы потеряем выгоду от большой пропускной способности и большого количества дополнительной работы, чтобы объединить эти сообщения данных перед записью в файл.

Мы ищем решение для замены текущего курсора монго и обеспечения независимости нашего генератора файлов отисточник данных.

1 Ответ

0 голосов
/ 29 октября 2019

Похоже, что вы, по сути, хотите создать API, в котором вы сможете поддерживать эффективность потоковой передачи в максимально возможной степени, как вы это делаете при записи файла во время чтения пользовательских данных.

Inв этом случае вы можете захотеть определить API push-parser для ваших ReadSource s, которые будут передавать данные на ваши WriteTarget s, которые будут записывать данные во все, для чего у вас есть реализация. Сортировка будет выполняться на стороне ReadSource, поскольку для некоторых источников вы можете читать упорядоченным образом (например, из баз данных);Для тех источников, для которых вы не можете этого сделать, вы можете просто выполнить промежуточный шаг для сортировки ваших данных (например, записи во временную таблицу) и затем передать их в WriteTarget.

. Выглядит примерно так:

public class UserDataRecord {
    private String data1;
    private String data2;

    public String getRecordAsString() {
        return data1 + "," + data2;
    }
}

public interface WriteTarget<Record> {
    /** Write a record to the target */
    public void writeRecord(Record record);

    /** Finish writing to the target and save everything */
    public void commit();

    /** Undo whatever was written */
    public void rollback();
}

public abstract class ReadSource<Record> {
    protected final WriteTarget<Record> writeTarget;

    public ReadSource(WriteTarget<Record> writeTarget) { this.writeTarget = writeTarget; }

    public abstract void read();
}

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
public class RelationalDatabaseReadSource extends ReadSource<UserDataRecord> {
    private Connection dbConnection;

    public RelationalDatabaseReadSource (WriteTarget<UserDataRecord> writeTarget, Connection dbConnection) {
        super(writeTarget);
        this.dbConnection = dbConnection;
    }

    @Override public void read() {
        // read user data from DB and encapsulate it in a record
        try (Statement statement = dbConnection.createStatement();
                ResultSet resultSet = statement.executeQuery("Select * From TABLE Order By COLUMNS");) {
            while (resultSet.next()) {
                UserDataRecord record = new UserDataRecord();
                // stream the records to the write target
                writeTarget.writeRecord(record);
            }
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
}

import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.nio.charset.StandardCharsets;
public class FileWriteTarget implements WriteTarget<UserDataRecord> {
    private File fileToWrite;
    private PrintWriter writer;

    public FileWriteTarget(File fileToWrite) throws IOException {
        this.fileToWrite = fileToWrite;
        this.writer = new PrintWriter(new FileWriter(fileToWrite));
    }

    @Override public void writeRecord(UserDataRecord record) {
        writer.println(record.getRecordAsString().getBytes(StandardCharsets.UTF_8));
    }

    @Override public void commit() {
        // write trailing records
        writer.close();
    }

    @Override
    public void rollback() {
        try { writer.close(); } catch (Exception e) { }
        fileToWrite.delete();
    }
}

Это только общая идея и требует серьезногоулучшение. Любой, пожалуйста, не стесняйтесь обновлять этот API.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...