Обработка большого файла, разделенного каналом, с многопроцессорной обработкой в ​​Python и без нее - PullRequest
0 голосов
/ 20 октября 2019

У меня большой файл (более 10G и 40м строк). Каждая строка имеет разделенные трубами поля, и одно из полей содержит JSON.

Пример строки ввода:

1|10|{"DelTime":1572299058000,"ValidFrom":1547506800474,"ValidTo":1572217199000,"Units":1,"ID1":1,"ID2":"1","ID3":"1","ID4":"1"}|10000000|

Пример строки вывода:

10|(1|1|10000000|1|1|20190114180000|20191027185959)

Мне нужноизвлечь некоторые конкретные поля и изменить время эпохи на определенный формат даты. Мой код следующий (после включения предложения RomanPerekhrest):

from __future__ import print_function
import sys,json,time

with open(sys.argv[1], 'r') as file:
    for line in file:
                fields = line.split('|')
                json_attributes = json.loads(fields[2])
                print (fields[1], "(" + json_attributes['ID1'], json_attributes['ID2'], fields[3], json_attributes['ID3'], fields[0], time.strftime('%Y%m%d%H%M%S', time.localtime(json_attributes['ValidFrom']/1000)), time.strftime('%Y%m%d%H%M%S', time.localtime(json_attributes['ValidTo']/1000)) + ")", sep='|')

Это на моем ПК (будет работать на многопроцессорном сервере) примерно 100 секунд для 1000000 строк. Найдите ниже cProfile:

         14001936 function calls (14001865 primitive calls) in 100.021 seconds

   Ordered by: standard name

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1   20.052   20.052  100.021  100.021 FUC3.py:1(<module>)
        1    0.000    0.000    0.000    0.000 __future__.py:48(<module>)
        1    0.000    0.000    0.000    0.000 __future__.py:74(_Feature)
        7    0.000    0.000    0.000    0.000 __future__.py:75(__init__)
  1000000    3.073    0.000   40.194    0.000 __init__.py:293(loads)
        1    0.000    0.000    0.000    0.000 __init__.py:49(normalize_encoding)
        1    0.000    0.000    0.001    0.001 __init__.py:71(search_function)
        1    0.001    0.001    0.008    0.008 __init__.py:99(<module>)
        1    0.000    0.000    0.000    0.000 codecs.py:77(__new__)
        1    0.000    0.000    0.001    0.001 decoder.py:17(_floatconstants)
        1    0.000    0.000    0.005    0.005 decoder.py:2(<module>)
        1    0.000    0.000    0.000    0.000 decoder.py:274(JSONDecoder)
        1    0.000    0.000    0.000    0.000 decoder.py:304(__init__)
  1000000    5.564    0.000   37.121    0.000 decoder.py:361(decode)
  1000000   26.296    0.000   26.296    0.000 decoder.py:372(raw_decode)
        1    0.000    0.000    0.000    0.000 encoder.py:101(__init__)
        1    0.000    0.000    0.003    0.003 encoder.py:2(<module>)
        1    0.000    0.000    0.000    0.000 encoder.py:70(JSONEncoder)
        1    0.000    0.000    0.000    0.000 hex_codec.py:27(hex_decode)
        1    0.000    0.000    0.000    0.000 hex_codec.py:45(Codec)
        1    0.000    0.000    0.000    0.000 hex_codec.py:52(IncrementalEncoder)
        1    0.000    0.000    0.000    0.000 hex_codec.py:57(IncrementalDecoder)
        1    0.000    0.000    0.000    0.000 hex_codec.py:62(StreamWriter)
        1    0.000    0.000    0.000    0.000 hex_codec.py:65(StreamReader)
        1    0.000    0.000    0.000    0.000 hex_codec.py:70(getregentry)
        1    0.000    0.000    0.000    0.000 hex_codec.py:8(<module>)
        6    0.000    0.000    0.006    0.001 re.py:188(compile)
        6    0.000    0.000    0.006    0.001 re.py:226(_compile)
        1    0.000    0.000    0.002    0.002 scanner.py:2(<module>)
       14    0.000    0.000    0.003    0.000 sre_compile.py:179(_compile_charset)
       14    0.002    0.000    0.002    0.000 sre_compile.py:208(_optimize_charset)
       54    0.000    0.000    0.000    0.000 sre_compile.py:25(_identityfunction)
        4    0.001    0.000    0.001    0.000 sre_compile.py:259(_mk_bitmap)
     26/6    0.000    0.000    0.003    0.000 sre_compile.py:33(_compile)
        9    0.000    0.000    0.000    0.000 sre_compile.py:355(_simple)
        6    0.000    0.000    0.001    0.000 sre_compile.py:360(_compile_info)
       12    0.000    0.000    0.000    0.000 sre_compile.py:473(isstring)
        6    0.000    0.000    0.004    0.001 sre_compile.py:479(_code)
        6    0.000    0.000    0.006    0.001 sre_compile.py:494(compile)
       50    0.000    0.000    0.000    0.000 sre_parse.py:127(__len__)
       91    0.000    0.000    0.000    0.000 sre_parse.py:131(__getitem__)
        9    0.000    0.000    0.000    0.000 sre_parse.py:135(__setitem__)
       25    0.000    0.000    0.000    0.000 sre_parse.py:139(append)
    35/15    0.000    0.000    0.000    0.000 sre_parse.py:141(getwidth)
        6    0.000    0.000    0.000    0.000 sre_parse.py:179(__init__)
      118    0.000    0.000    0.000    0.000 sre_parse.py:183(__next)
       80    0.000    0.000    0.000    0.000 sre_parse.py:196(match)
       94    0.000    0.000    0.000    0.000 sre_parse.py:202(get)
       18    0.000    0.000    0.000    0.000 sre_parse.py:226(_class_escape)
        4    0.000    0.000    0.000    0.000 sre_parse.py:258(_escape)
     13/6    0.000    0.000    0.002    0.000 sre_parse.py:302(_parse_sub)
     15/6    0.001    0.000    0.002    0.000 sre_parse.py:380(_parse)
        6    0.000    0.000    0.002    0.000 sre_parse.py:676(parse)
        6    0.000    0.000    0.000    0.000 sre_parse.py:68(__init__)
        6    0.000    0.000    0.000    0.000 sre_parse.py:73(opengroup)
        6    0.000    0.000    0.000    0.000 sre_parse.py:84(closegroup)
       26    0.000    0.000    0.000    0.000 sre_parse.py:91(__init__)
        1    0.000    0.000    0.001    0.001 {__import__}
        6    0.000    0.000    0.000    0.000 {_sre.compile}
        1    0.000    0.000    0.000    0.000 {_struct.unpack}
        1    0.000    0.000    0.000    0.000 {binascii.a2b_hex}
        1    0.000    0.000    0.000    0.000 {built-in method __new__ of type object at 0x7f2575fe0e00}
       32    0.000    0.000    0.000    0.000 {chr}
        1    0.000    0.000    0.000    0.000 {hasattr}
      111    0.000    0.000    0.000    0.000 {isinstance}
1000434/1000419    0.353    0.000    0.353    0.000 {len}
        4    0.000    0.000    0.000    0.000 {max}
      340    0.000    0.000    0.000    0.000 {method 'append' of 'list' objects}
        1    0.000    0.000    0.001    0.001 {method 'decode' of 'str' objects}
        1    0.000    0.000    0.000    0.000 {method 'disable' of '_lsprof.Profiler' objects}
  2000000    0.881    0.000    0.881    0.000 {method 'end' of '_sre.SRE_Match' objects}
        4    0.000    0.000    0.000    0.000 {method 'extend' of 'list' objects}
       32    0.000    0.000    0.000    0.000 {method 'format' of 'str' objects}
       38    0.000    0.000    0.000    0.000 {method 'get' of 'dict' objects}
        6    0.000    0.000    0.000    0.000 {method 'items' of 'dict' objects}
        1    0.000    0.000    0.000    0.000 {method 'join' of 'str' objects}
  2000000    4.028    0.000    4.028    0.000 {method 'match' of '_sre.SRE_Pattern' objects}
        6    0.000    0.000    0.000    0.000 {method 'remove' of 'list' objects}
       32    0.000    0.000    0.000    0.000 {method 'setdefault' of 'dict' objects}
  1000001    1.997    0.000    1.997    0.000 {method 'split' of 'str' objects}
        1    0.000    0.000    0.000    0.000 {method 'translate' of 'str' objects}
       56    0.000    0.000    0.000    0.000 {min}
        1    0.000    0.000    0.000    0.000 {open}
       15    0.000    0.000    0.000    0.000 {ord}
  1000000   11.911    0.000   11.911    0.000 {print}
        8    0.000    0.000    0.000    0.000 {range}
  2000000   17.697    0.000   17.697    0.000 {time.localtime}
  2000000    8.162    0.000    8.162    0.000 {time.strftime}

Я также пытался вместо чтения построчно читать много строк вместе, но это почти одно и то же время выполнения:

from __future__ import print_function
import sys,json,time

inputfile = open(sys.argv[1],'r')
tmp_lines = inputfile.readlines(1000000)
while tmp_lines:
    for line in tmp_lines:
        fields = line.split('|')
        json_attributes = json.loads(fields[2])
        print (fields[1], "(" + json_attributes['ID1'], json_attributes['ID2'], fields[3], json_attributes['ID3'], fields[0], time.strftime('%Y%m%d%H%M%S', time.localtime(json_attributes['ValidFrom']/1000)), time.strftime('%Y%m%d%H%M%S', time.localtime(json_attributes['ValidTo']/1000)) + ")", sep='|')

У меня естьследующие вопросы:

  • Есть ли способ сделать это быстрее, не используя многопроцессорность или многопоточность?
  • Полагаю, поскольку я не вижу различий, когда читаю построчно или когда читаю много строк вместе, означает ли это, что я связан с процессором?
  • Полагаю, имеет смыслиспользуйте многопроцессорность или многопоточность, так как на сервере достаточно ресурсов для запуска сценария, но я действительно не имею никакого опыта в этом. Я не могу прочитать весь файл в памяти, так как на сервере не будет свободного объема памяти (10 ГБ) только для моего скрипта на python.

Решение должно работать на Python 2.7.5

...