Зависимые конвейеры обработки данных, куда файлы поступают асинхронно - PullRequest
0 голосов
/ 28 апреля 2020

У меня есть несколько зависящих от данных задач / конвейеров, некоторые из которых зависят от выполнения другой. Что еще труднее, так это то, что данные могут поступать асинхронно , что означает, что определенные задачи должны ждать , пока все файлы или задачи на предыдущем шаге не будут обработаны.

Вот пример:

Допустим, у нас есть необработанный файл x[i,j] с индексами, где i обозначает одну конкретную подкатегорию в основной категории j.

Мне нужно запустить следующие конвейеры:

  1. конвейер 1 : очистить необработанный файл x[i,j] и сохранить его как x_clean[i,j]
  2. конвейер 2 : как только конвейер 1 будет выполнен для всех i внутри j, объедините результаты из x_clean[i,j] и сохраните их как y_clean[j]
  3. конвейер 3 : очистить необработанный файл z[j] и сохранить его как z_clean[j]
  4. конвейер 4 : после завершения конвейеров 2 и конвейеров 3 объедините z_clean[j] и y_clean[j] и сохраните его как w_clean[j].

Какую модель я могу применить для обработки такого подхода потока данных? Есть ли какая-то методология, стоящая за такими задачами обработки данных? Есть ли в GCP что-то созданное для таких проблем?

1 Ответ

0 голосов
/ 30 апреля 2020

В процессе производства ...

  • шаги зависят от выполнения других шагов.

  • материал может поступать асинхронно, что означает последующие шаги подождите, пока товар прибудет на работу. Однако следует помнить, что это не означает, что неограниченный материал может выйти из-под контроля, а только материал, который будет использован для этого конкретного c заказа на производство. Если ваш сценарий допускает поток потока неограниченных данных, то вы должны организовать его предварительную обработку, чтобы избежать смешивания различных компонентов продукта. Не ставьте под угрозу структуру процесса, пытаясь обрабатывать асинхронно поступающие данные в некотором буфере или где-либо еще, потому что производственные продукты данных включают в себя реляционные данные, а не сырье.

  • подкомпоненты могут быть завершены в присоединение к ветвям, что означает, что шаг сборки ожидает согласованного набора связанных компонентов, прежде чем начнется сборка.

Я являюсь создателем POWER, единственной совместной (производственной) архитектуры на сегодняшний день. Об этом можно многое узнать, но вы можете найти мои статьи и код в Интернете: http://www.powersemantics.com/

Вот как выглядит ваш процесс в производственной модели для работы:

    class MyProduct
    {
        public object[i,j] x_clean { get; set; }
        public object[j] y_clean { get; set; }
        public object[j] z_clean { get; set; }
        // final product
        public object[j] w_clean { get; set; }
    }
    class MyProcess : Producer<MyProduct>, IProcess, IMachine, IOrganize
    {
        // process inputs
        public object[i,j] x { get; set; }  // raw file
        public object[j] z { get; set; } // raw file

        // machines
        public CleanerA Cleaner1 { get; set; }
        public Aggregator Aggregator1 { get; set }
        public CleanerB Cleaner2 { get; set; }
        public Assembler Assembler1 { get; set; }

        public void D() { // instantiates properties and machines }
        public void O()
        {
            // bind machines to work on the same data points
            // allows maintenance to later remove cleaners if it becomes possible
            // for the process to receive data in the correct form
            Cleaner1.x = x;
            Cleaner1.Product.x_clean = Product.x_clean;

            Aggregator1.x_clean = Product.x_clean;
            Aggregator1.Product.y_clean = Product.y_clean;

            Cleaner2.z = z;
            Cleaner2.Product.z_clean = Product.z_clean;

            Assembler1.z_clean = Product.z_clean;
            Assembler1.y_clean = Product.y_clean;
            Assembler1.Product.w_clean = Product.w_clean;
        }

        // hardcoded synchronous controller
        public void M()
        {
            Cleaner1.M();
            Aggregator1.M();
            Cleaner2.M();
            Assembler1.M();
        }
    }

    // these class pairs are Custom Machines, very specific work organized
    // by user requirements rather than in terms of domain-specific operations
    class CleanerAProduct
    {
        public object[i,j] x_clean { get; set; }
    }
    class CleanerA: Producer<CleanerAProduct>, IMachine
    {
        public object[i,j] x { get; set; }  // raw file
        public void M()
        {
            // clean the raw file x[i,j] and store it as x_clean[i,j]
        }
    }


    class AggregatorProduct
    {
        public object[j] y_clean { get; set; }
    }
    class Aggregator: Producer<AggregatorProduct>, IMachine
    {
        public object[i,j] x_clean { get; set; }
        public void M()
        {
            // aggregate the results from x_clean[i,j] and store it as y_clean[j]
        }
    }


    class CleanerBProduct
    {
        public object[j] z_clean { get; set; }
    }
    class CleanerB : Producer<CleanerBProduct>, IMachine
    {
        public object[j] z { get; set; }
        public void M()
        {
            // clean a raw file z[j] and store it as z_clean[j]
        }
    }


    class AssemblerProduct
    {
        public object[j] w_clean { get; set; }
    }
    class Assembler : Producer<AssemblerProduct>, IMachine
    {
        public object[j] y_clean { get; set; }
        public object[j] z_clean { get; set; }
        public void M()
        {
            // combine z_clean[j] and y_clean[j] and store it as w_clean[j]
        }
    }

Обычное использование класса производственного процесса:

  1. Создание экземпляра. Вызовите D () для создания экземпляров машин и продукта.
  2. Назначьте любые входные данные для процесса.
  3. Вызовите O (), чтобы процесс распределял эти входные данные на машины, а также связывал машины для работы. на конечный продукт. Это ваш последний шанс переопределить эти назначения перед производством.
  4. Вызовите M () для выполнения процесса.

Большинство исходного кода объединяет производителей и потребителей в одном теле функции и таким образом, становится неудобно поддерживать позже, и затем функции отправляют друг другу данные по электронной почте, как бесполезные офисные работники, которые не следят за электронной почтой. Это вызывает проблемы, когда позже вы хотите принять решение о вертикальной интеграции, например, заменить машину или расширить процесс, и все это я задокументировал с помощью источников. POWER - это единственная архитектура, которая избегает таких сложностей, как централизация. Я выпустил его в феврале.

Существуют инструменты ETL и другие решения, такие как TPL Dataflow, но производственные процессы не собираются организовывать или управлять самими программистами. Все программисты должны изучить POWER, чтобы правильно справляться с обязанностями, связанными с отходами, интеграцией, управлением и инструментарием. Работодатели смешно смотрят на нас, когда мы пишем автоматизированный код, а затем не можем остановить выполнение в реальном времени, но наше обучение только готовит нас к созданию процессов, а не к их архитектуре, как это делает производство.

...