Чтение нескольких файлов CSV и вставка данных в таблицу с помощью многопроцессорной обработки Python без использования панд - PullRequest
1 голос
/ 19 сентября 2019

У меня есть 3 CSV-файла для конкретной области в папке, т.е. data_cityA.csv, data_cityB.csv и data_cityC.csv.Я должен прочитать и идентифицировать файл конкретного региона;вставьте его в таблицу с добавлением одного дополнительного столбца, который будет содержать информацию о конкретном регионе.

list_of_file=glob.glob('./*csv')
for file_name in list_of_files:
    count = 0
    total = 0   

    with open(file_name,'r')as csvfile:
        read=csv.reader(csvfile)
        next(read)
        if "cityA" in file_name:
            reg="cityA"
        elif "cityB" in file_name:
            reg="cityB"
        elif "cityC" in file_name:
            reg="cityC"

        with open(file_name, 'r')as csv_file:
            reader=csv.reader(csv_file)
            data=list(reader)
            total=len(data)     
            temp_data=[]

        for row in read:
            row.append(reg) #concatenating region name 
            temp_data.append(tuple(row))
            count+=1
            total=-1

            if count>999 or total==1:
                insert_query="INSERT INTO table_name(A,B,C,D,E) values (1,2,3,4,5)"
                curser.executemoany(insert_query,temp_data)
                conn.commit()
                count=0
                insert_query=" "
                temp_data=[]

cursor.callproc('any_proc')
conn.close()

Процесс обработки занимает около 4-5 часов (размер данных <= 500 МБ).Я пытался реализовать его с помощью многопроцессорной обработки Python, но не смог сделать это успешно.«Я не могу использовать панд».База данных <code>sybase.Есть идеи?Есть ли лучший способ, чем многопроцессорная обработка?

Ответы [ 3 ]

0 голосов
/ 20 сентября 2019

Базы данных замедляют вас.

По сути, вы совершаете 1 поездку туда и обратно для каждого ряда.500 МБ звучит как много строк ... так что это много поездок в оба конца.Проверьте, есть ли способ в sybase, где вы можете предоставить CSV и загрузить его в таблицу.Меньше вызовов (возможно, даже 1) с большим количеством строк, а не 1 строкой на вызов.

0 голосов
/ 20 сентября 2019

Может быть, вы можете сделать это за пределами Python.

Рассмотрим следующую таблицу ...

create table t1 ( 
    k int not null, 
    v varchar(255) null, 
    city varchar(255) null)
go

... и файл "file.txt"

1,Line 1
2,Line 2
3,Line 3
4,Line 4
5,Line 5

Не указывайте пустую строку в конце файла.

Используйте «Stream EDitor» для добавления дополнительного столбца, в данном случае «CityA»

cat file.txt | sed s/$/\,CityA/g > file_2.txt
cat file_2.txt
1,Line 1,CityA
2,Line 2,CityA
3,Line 3,CityA
4,Line 4,CityA
5,Line 5,CityA

Убедитесь, что база данных настроена для массового копирования, ваш администратор базы данных может помочь с этим.

use master
go
sp_dboption 'db_name', 'select', true
go

Затем используйте утилиту bcp Sybase для загрузки файла:

bcp database.owner.table in file_2.txt -U login -S server -c -t, -Y -b 1000

Параметры приведены ниже.:

  • Имя базы данных
  • Владелец объекта
  • Имя таблицы
  • Направление в
  • Имя файла
  • -U Имя пользователя
  • -S имя сервера (имя экземпляра Sybase, а не имя физического хоста)
  • -c = Использовать символьные данные
  • -t, = терминатор поля равен,
  • -Y Преобразование набора символов на стороне клиента - может не потребоваться
  • -b 1000 = Зафиксировать 1000 строк одновременно.Если вы загружаете 500 МБ, вы, вероятно, хотите этого, чтобы не нажимать LOG_SUSPEND.
0 голосов
/ 19 сентября 2019

Вот одна большая проблема, с которой вы столкнулись: data=list(reader).Это позволит прочитать весь файл в память сразу.Если размер файла составляет 500 МБ, то 500 МБ будут загружены в память одновременно.Другой вариант - использовать читателя в качестве итератора.Это связано с тем недостатком, что вы заранее не знаете общее количество записей, поэтому после выхода из цикла вы должны выполнить вставку оставшихся строк.

Второе, что может сильно повлиять на вашу производительностьэто вставка.Вы можете распараллелить его с помощью многопроцессорной обработки (ниже приведено предложение с использованием пула), но, поскольку это новый процесс, вам придется снова подключиться к базе данных (и закрыть ее впоследствии).

from multiprocessing.pool import Pool
list_of_files = glob.glob('./*csv')
pool = Pool()
pool.map(process_file, list_of_files)
pool.close()
pool.join()
cursor.callproc('any_proc')
conn.close()


def process_file(file_name):
    # Make a new connection
    # conn = ...
    cursor = conn.cursor()
    temp_data = []

    def do_insert():
        insert_query = "INSERT INTO table_name(A,B,C,D,E) values (1,2,3,4,5)"
        cursor.executemany(insert_query, temp_data)
        conn.commit()

    with open(file_name, 'r')as csvfile:
        read = csv.reader(csvfile)
        next(read)
        if "cityA" in file_name:
            reg = "cityA"
        elif "cityB" in file_name:
            reg = "cityB"
        elif "cityC" in file_name:
            reg = "cityC"

        for row in read:
            row.append(reg)  # concatenating region name
            temp_data.append(tuple(row))
            if len(temp_data) > 999:
                do_insert()
                temp_data = []
    if temp_data:
        do_insert()
    conn.close()
...