Как улучшить скорость этого парсера, используя python? - PullRequest
2 голосов
/ 10 марта 2020

В настоящее время я анализирую исторические данные c о задержке из транспортной сети c в Швеции. У меня ~ 5700 файлов (по одному из каждых 15 секунд) с 27 января, содержащих данные о мгновенной задержке для транспортных средств, находящихся в активных поездках в сети. Это, к сожалению, много служебных / дублирующих данных, поэтому я хочу разобрать соответствующий материал для визуализации.

Однако, когда я пытаюсь проанализировать и отфильтровать соответствующие данные о задержке в поездке Уровень, используя скрипт ниже, он работает очень медленно. Он работает уже более 1,5 часов (на моем MacBook Pro 15 '2019 года) и еще не завершен.

  • Как мне оптимизировать / улучшить этот python парсер?
  • Или я должен уменьшить количество файлов и, следовательно, частоту сбора данных для этой задачи?

Заранее большое спасибо. 101

from google.transit import gtfs_realtime_pb2
import gzip
import os
import datetime
import csv
import numpy as np

directory = '../data/tripu/27/'
datapoints = np.zeros((0,3), int)
read_trips = set()

# Loop through all files in directory
for filename in os.listdir(directory)[::3]:

    try:
        # Uncompress and parse protobuff-file using gtfs_realtime_pb2
        with gzip.open(directory + filename, 'rb') as file:
            response = file.read()
            feed = gtfs_realtime_pb2.FeedMessage()
            feed.ParseFromString(response)

            print("Filename: " + filename, "Total entities: " + str(len(feed.entity)))

            for trip in feed.entity:
                if trip.trip_update.trip.trip_id not in read_trips:

                    try:
                        if len(trip.trip_update.stop_time_update) == len(stopsOnTrip[trip.trip_update.trip.trip_id]):
                            print("\t","Adding delays for",len(trip.trip_update.stop_time_update),"stops, on trip_id",trip.trip_update.trip.trip_id)

                            for i, stop_time_update in enumerate(trip.trip_update.stop_time_update[:-1]):

                                # Store the delay data point (arrival difference of two ascending nodes)
                                delay = int(trip.trip_update.stop_time_update[i+1].arrival.delay-trip.trip_update.stop_time_update[i].arrival.delay)

                                # Store contextual metadata (timestamp and edgeID) for the unique delay data point
                                ts = int(trip.trip_update.stop_time_update[i+1].arrival.time)
                                key = int(str(trip.trip_update.stop_time_update[i].stop_id) + str(trip.trip_update.stop_time_update[i+1].stop_id))

                                # Append data to numpy array
                                datapoints = np.append(datapoints, np.array([[key,ts,delay]]), axis=0)

                            read_trips.add(trip.trip_update.trip.trip_id)
                    except KeyError:
                        continue
                else:
                    continue
    except OSError:
        continue

Ответы [ 2 ]

2 голосов
/ 10 марта 2020

Я подозреваю, что проблема здесь в том, что мы постоянно вызываем np.append, чтобы добавить новую строку в массив numpy. Поскольку размер массива numpy фиксирован при его создании, np.append() должен создать новый массив, что означает, что он должен скопировать предыдущий массив. На каждом l oop массив больше, и поэтому все эти копии добавляют квадратичный c коэффициент к вашему времени выполнения. Это становится значительным, когда массив довольно большой (что, по-видимому, в вашем приложении).

В качестве альтернативы вы можете просто создать обычный список Python кортежей, а затем при необходимости преобразовать его в полный numpy массив в конце.

То есть (только измененные строки):

datapoints = []
# ...
                            datapoints.append((key,ts,delay))
# ...
npdata = np.array(datapoints, dtype=int)
1 голос
/ 10 марта 2020

Я все еще думаю, что процедура разбора - это ваше узкое место (даже если оно пришло от Google), но все эти '. Убивали меня! (И они несколько снижают производительность.) Кроме того, я преобразовал ваши итерации i, i + 1 в два итератора, пролистывающих список обновлений, это немного более продвинутый стиль работы со списком. Плюс имена cur/next_update помогли мне быть честными, когда вы хотели сослаться на одно против другого. Наконец, я удаляю завершающий «else: continue», так как вы в конце все равно для l oop.

for trip in feed.entity:
    this_trip_update = trip.trip_update 
    this_trip_id = this_trip_update.trip.trip_id
    if this_trip_id not in read_trips:

        try:
            if len(this_trip_update.stop_time_update) == len(stopsOnTrip[this_trip_id]):
                print("\t", "Adding delays for", len(this_trip_update.stop_time_update), "stops, on trip_id",
                      this_trip_id)

                # create two iterators to walk through the list of updates
                cur_updates = iter(this_trip_update.stop_time_update)
                nxt_updates = iter(this_trip_update.stop_time_update)
                # advance the nxt_updates iter so it is one ahead of cur_updates
                next(nxt_updates)

                for cur_update, next_update in zip(cur_updates, nxt_updates):
                    # Store the delay data point (arrival difference of two ascending nodes)
                    delay = int(nxt_update.arrival.delay - cur_update.arrival.delay)

                    # Store contextual metadata (timestamp and edgeID) for the unique delay data point
                    ts = int(next_update.arrival.time)
                    key = "{}/{}".format(cur_update.stop_id, next_update.stop_id)

                    # Append data to numpy array
                    datapoints = np.append(datapoints, np.array([[key, ts, delay]]), axis=0)

                read_trips.add(this_trip_id)
        except KeyError:
            continue

Этот код должен быть эквивалентным тому, что вы опубликовано, и я не ожидаю значительного увеличения производительности, но, возможно, это будет более приемлемым, если вы вернетесь к нему через 6 месяцев.

(вероятно, - это больше подходит для CodeReview, но я вряд ли когда-нибудь go там.)

...