Конвейерная обработка событий - PullRequest
0 голосов
/ 15 октября 2019

В настоящее время я работаю над вычислениями, выполненными в несколько этапов:

  1. Извлечение данных из БД (класс данных может отличаться в зависимости от запроса)
  2. Добавление статических данных вкаждая запись (статические данные - это память). После этого шага все данные представлены одним и тем же классом.
  3. Вызовите конечную точку REST для выполнения фактического расчета
  4. Постобработка результата

Посколькуобъем данных может быть очень большим, каждый шаг имеет свой собственный пул потоков, так что разные запросы могут обрабатываться параллельно. Перед отправкой запроса в конвейер вычисляется количество записей, и если оно превышает определенный порог, запрос разбивается на страницы.

Я начал проектировать его как конвейер, в котором каждый канал является шагом.

Моя идея состояла в том, чтобы отделить управление запросом от фактической работы, выполняемой каждым каналом над данными, чтобы я мог заменить канал в зависимости от извлекаемых данных, но при этом использовать ту же логикуобрабатывать выгружаемые и не выгружаемые запросы.

Проблема заключается в том, как сохранить общее управление запросами и специфичным для обработки данных и переключать различные реализации на основе данных.

Логика подкачкизапрос и сбор результатов страниц выполняется путем добавления к каждому запросу количества страниц, номера страницы и размера. В какой-то момент (на шаге 3) я собираю все страницы для одного и того же запроса (для каждого запроса есть идентификатор) и жду, пока все страницы будут собраны.

На данный момент у меня 3 штуки:

  • Трубопровод
  • Труба
  • Запрос

И я хочучтобы эти фрагменты были как можно более общими, в то время как базовые данные должны быть конкретными.

Итак, первый вопрос заключается в том, должен ли я создать и реализовать каждый канал для каждого имеющегося у меня данных и переключать их в зависимости от запроса? Или оставить ту же реализацию для канала и позволить каналу решать, что делать внутри? (Например, используя разные объекты, каждый из которых способен обрабатывать данные одного типа). Второй вопрос: как реализовать решение о том, какую реализацию мне следует использовать в зависимости от типа данных?

В данный момент я пытался установить тип для запроса, но проблема все еще присутствует, поскольку только каналзнает об универсальном интерфейсе запроса. Я также пытался параметризовать запрос по данным, но все же, единственный известный тип - это интерфейс Запрос.

interface Request {
    UUID getId();
    int getTotalPages();
}

interface Pipe {
    void execute(Request request);
    void addNextPipe(Request request);
}

interface Pipeline {
    void execute(Request request);
    void addPipe(Pipe pipe);
}

PagedRequest extends Request {
    UUID id;
    int totalPages;
    int page;
    int size;

    List<Data> data;

    UUID getId() {
        ...
    }

    int getTotalPages() {
        ...
    }
}

Step1 implements Pipe {
    Pipe nextPipe;
    ThreadPool pool;

    execute(Request request) {
        pool.execute(new Worker(request));
    }

    private class Worker implements Runnable {
        private Request request;

        run() {
            //do specific work
            nextPipe.execute(request);
        }
    }
}

То, чего я хочу достичь, это что-то вроде:

  • Получите запрос
  • Поиск типа / данных
  • Отправьте запрос правильному обработчику. Тот, который способен работать с данными в запросе
  • Передать общий запрос в следующий канал

Предполагая, что у нас есть три типа данных A, B и C:

StepN implements Pipe {
    execute(Request request) {
        if(handlerForDataA can handle) {
            handlerForDataA.handle(request);
        } else if(handlerForDataB can handle) {
            handlerForDataB.handle(request);
        } else if(handlerForDataC can handle) {  
            handlerForDataC.handle(request);
        }
    }
}

Обработчик также может быть реальным работником.

Спасибо

...