Как использовать Luigi с конкретными столбцами таблицы SQL в качестве цели - PullRequest
1 голос
/ 11 июня 2019

В Luigi, предположим, у меня есть TaskA и TaskB, каждый из которых генерирует столбец данных для помещения в таблицу1.Если я использую цель SQL, Луиджи, похоже, захочет записать в полную таблицу, но это делает добавление новых столбцов в таблицу медленным процессом.Представьте, что table1 заполнен столбцами A и B. Я хочу добавить новое поле C, которое может быть выведено taskC, но я бы предпочел записать его непосредственно в таблицу SQL, так как в противном случае мне пришлось бы хранить A,B, C в другом месте, и есть еще одна задача, которая читает их все и записывает их в таблицу.Есть ли способ сделать это, не дублируя базу данных в другом месте, вместо этого просто попросив каждую задачу просто записать свои данные в соответствующие столбцы таблицы SQL?

1 Ответ

0 голосов
/ 15 июня 2019

Если я использую цель SQL, Луиджи хочет записать в полную таблицу, кажется,

Я думаю, вы неправильно поняли, что такое Target.Он не «делает что-то», он просто используется, чтобы сигнализировать Луиджи, закончил ли Task или нет.

Насколько я понял, я думаю, что вы пытались использовать один из этих CopyToTableтакие задачи, как эта , и они действительно предназначены для заполнения нескольких строк за цикл, поэтому, если вы хотите заполнить один столбец, вам, вероятно, следует создать собственный Task и создать собственный запрос SQL.

К счастью, вы все еще можете использовать цель SQL в качестве вывода и получить от нее соединение с базой данных.

Пример

Здесь я использовал MySqlTargetв качестве цели, но вы можете использовать другие цели SQL.

class WriteSingleColumnTask(lg.Task):
    def output():
        return MySqlTarget(
            host=self.host,
            database=self.database,
            user=self.user,
            password=self.password,
            table=self.table,
            update_id=self.update_id
        )

    def  run():
        output=self.output()
        connection = output.connect()
        connection.autocommit = self.autocommit
        cursor = connection.cursor()

        query = "<YOUR QUERY FOR A SINGLE COLUMN HERE>"
        cursor.execute(sql)

        # Update marker table
        self.output().touch(connection)

        # commit and close connection
        connection.commit()
        connection.close()

Пример из здесь .

...