Асинхронная запись (добавление) в файл несколькими параллельными потоками NIO - PullRequest
0 голосов
/ 28 мая 2019

Можно ли использовать API-интерфейсы NIO Async для записи (добавления) в один файл несколькими параллельными потоками?То, что я пытаюсь сделать, это разбить запросы на разные категории и назначить поток каждой категории, чтобы получить записи из БД и записать в файл (скажем, user-records.json).

Тема A Пользователи некоторой категории A
Запрос по идентификаторам пользователей - записать набор результатов в user-records.json

Поток B Пользователи некоторой категории B
Запрос по идентификаторам пользователей- добавить результирующий набор в user-records.json

Поток C Пользователи некоторой категории C
Запрос по идентификаторам пользователей - добавить результирующий набор в user-records.json

и так далее....

Вот классы, которые не являются реальной реализацией, а представляют собой пример включения / записи в асинхронном файле -

public class AsyncAppender {

    private final AsynchronousFileChannel channel;
    /** Where new append operations are told to start writing. */
    private final AtomicLong              projectedSize;

    AsyncAppender( AsynchronousFileChannel channel) throws IOException {
        this.channel = channel;
        this.projectedSize = new AtomicLong(channel.size());
    }

    public void append( ByteBuffer buf) throws IOException {
        final int buflen = buf.remaining();
        long size;
        do {
            size = projectedSize.get();
        } while (!projectedSize.compareAndSet(size, size + buflen));

        channel.write(buf, channel.size(), channel, new WriteOp(buf, size));
    }
}

public class WriteOp implements CompletionHandler<Integer, AsynchronousFileChannel> {

    private final ByteBuffer buf;
    private long             position;

    WriteOp( ByteBuffer buf, long position) {
        this.buf = buf;
        this.position = position;
    }

    @Override
    public void completed( Integer result, AsynchronousFileChannel channel) {
        if (buf.hasRemaining()) { // incomplete write
            position += result;
            channel.write(buf, position, channel, this);
        }
    }

    @Override
    public void failed( Throwable ex, AsynchronousFileChannel channel) {
        // ?
    }
}

Основной класс -

public class AsyncWriteMain {

    public static void main( String[] args) {
        AsyncWriteMain m = new AsyncWriteMain();
        m.asyncWrite();
    }

    public void asyncWrite() {
        try {
            String filePath = "D:\\temp\\user-records.txt";
            Path file = Paths.get(filePath);
            AsynchronousFileChannel asyncFile = AsynchronousFileChannel.open(file, StandardOpenOption.WRITE, StandardOpenOption.CREATE);

            AsyncAppender aa = new AsyncAppender(asyncFile);

            for ( int i = 0; i < 10; i++) {
                aa.append(ByteBuffer.wrap((i + " Some text to be written").getBytes()));
            }
        } catch (IOException e) {
            System.out.println(e.getMessage());
        }
    }
}

1 Ответ

0 голосов
/ 28 мая 2019

Я думаю, что вы можете.По сути, вам нужно блокировка файла

Future<FileLock> featureLock = asynchFileChannel.lock();
        log.info("Waiting for the file to be locked ...");
        FileLock lock = featureLock.get();
        if (lock.isValid()) {...}

из здесь

...