Python - Как построить многопроцессорность, чтобы она работала с FuncAnimation и вводом? - PullRequest
0 голосов
/ 13 апреля 2020

Я учусь Python и пытаюсь постепенно увеличивать сложность своего проекта. Здесь я пытаюсь сделать программу для визуализации некоторых данных и создания видео. Я в основном использую matplotlib, pandas и многопроцессорность (попытка). Вся идея проста.

Этап 1. Я конвертирую CSV-файл в DataFrame, который является необработанной информацией. Затем определите различные DataFrames для хранения различных видов данных, извлеченных в итерации позже (чтение необработанной информации строка за строкой).

Этап 2. Я определил функцию iter как содержимое итерации (включая обновление DataFrames и вложенную функцию заговора для построения 5 осей), так как я хочу использовать FuncAnimation.

Этап 3. Я определил функцию построения, которая вложена в функцию iter для построения в общей сложности 5 графиков на разных осях. После завершения итерации сохраните ее в файл.

import matplotlib
matplotlib.use("Agg")
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from matplotlib import animation
from itertools import product
from datetime import timedelta
from tqdm import tqdm

location = str(input('Please enter the folder that your CSV file is located in:'))
filename = str(input('Please enter the CSV file name:'))

raw_AT = pd.read_csv(location+'\\'+filename+'.CSV', header=0, index_col=1, parse_dates=True, infer_datetime_format=True).loc[:,
         ['auditor', 'event_text', 'reason_text', 'quantity_filled', 'bid_price', 'offer_price']]
cross_expiry = timedelta(seconds=20)
participation = 0    
b_s = 'buy' if 'buy' in raw_AT.event_text[0] else 'sell'
ord_vol = float(raw_AT.event_text[0].split()[3])
ord_lim = np.nan
progress_pct = pd.Series(index=raw_AT.index, name='progress_pct')
ord_lim_series = pd.Series(index=raw_AT.index, name='ord_lim')
bid_series = pd.Series(index=raw_AT.index, name='bid')
offer_series = pd.Series(index=raw_AT.index, name='offer')
fills_series = pd.DataFrame(index=raw_AT.index, columns=('price', 'size'))
.
.
.
current_t = str()
current_l = 0

fig = plt.figure(figsize=(20, 10))
fig.suptitle('%s %s %s %s %s %s %s' % (filename[0:len(filename)-16], filename[len(filename)-16:len(filename)], raw_AT.event_text[0].split()[2], raw_AT.event_text[0].split()[3], raw_AT.event_text[0].split()[4],
                                       raw_AT.event_text[0].split()[5], raw_AT.event_text[0].split()[6]), fontsize = 30)
ax1 = plt.subplot2grid((4, 7), (0, 0), colspan=3, rowspan=2)
ax2 = plt.subplot2grid((4, 7), (2, 0), colspan=3, rowspan=2)
ax3 = plt.subplot2grid((4, 7), (2, 3), colspan=2, rowspan=2)
ax4 = plt.subplot2grid((4, 7), (2, 5), colspan=2, rowspan=2)
#ax5
ax6 = plt.subplot2grid((4, 7), (0,3), colspan=2, rowspan=2)


def plot_func(i):
    ax1.clear()
    ax2.clear()
    ax3.clear()
    ax4.clear()
    ax6.clear()
    pd.concat([market_volume, target_volume, actual_volume], axis=1, sort=False).plot(ax=ax1)
    ax1.set_title('Volume Tracking updated at %s' % vt_timestamp)
    ax1.xaxis.set_major_locator(mdates.HourLocator())
    ax1.xaxis.set_major_formatter(mdates.DateFormatter("%H:%M"))
    ax1.xaxis.set_tick_params(rotation=0)
    ax1.set_xlim((raw_AT.index[0], end_time))
    ax1.set_xlabel('')
    ax1.legend(loc='upper left', framealpha=0)
    ax1.annotate(r'$%.f$' % market_volume[i], xy=(market_volume.index[i], market_volume[i]), xycoords='data',
                 xytext=(0, -30), textcoords='offset pixels', fontsize=10, arrowprops=dict(arrowstyle='->'))
    ax1.annotate(' Target:%.f\n Filled:%.f\n Target %%: %.2f%%\n Actual %%: %.2f%%' % (
    target_volume[i], actual_volume[i], participation,
    (actual_volume[i] / market_volume[i] * 100) if market_volume[i] != 0 else 0),
                 xy=(target_volume.index[i], target_volume[i]), xycoords='data',
                 xytext=(10, 0), textcoords='offset pixels', fontsize=10)
    pd.concat([ord_lim_series, bid_series, offer_series], axis=1, sort=False).plot(ax=ax2)
    ax2.scatter(fills_series.index, fills_series.price, s=20, alpha=0.3, c='b')
    ax2.xaxis.set_major_locator(mdates.HourLocator())
    ax2.xaxis.set_major_formatter(mdates.DateFormatter("%H:%M"))
    ax2.xaxis.set_tick_params(rotation=0)
    ax2.set_xlim([raw_AT.index[0], end_time])  # till schedule end time
    ax2.set_xlabel('Price & Fills by Time')
    ax2.annotate('Offer: %.2f\n  Bid: %.2f' % (offer_series[i], bid_series[i]),
                 xy=(offer_series.index[i], offer_series[i]), xycoords='data',
                 xytext=(10, 0), textcoords='offset pixels', fontsize=10)
    ax2.annotate('Limit: %.2f' % ord_lim_series[i], xy=(ord_lim_series.index[i], ord_lim_series[i]), xycoords='data',
                 xytext=(10, 0), textcoords='offset pixels', fontsize=10)
    try:
        slices_df.plot(kind='barh', ax=ax3, width=0.4)
    except TypeError:
        pass
    for i, j in product(range(len(slices_df.index)), range(len(slices_df.columns))):
        if not np.isnan(slices_df.iloc[i, j]):
            x = slices_df.iloc[i, j] / 2
            y = i - 0.4 / 2 + (j + 0.5) * 0.4 / len(slices_df.columns)
            ax3.text(x, y,
                     r'$%.f$' % (x * 2),
                     fontdict={'size': 10, 'color': 'k'}, ha='left', va='center')
    ax3.set_title('Current Outstanding Slices')
    ax3.xaxis.set_visible(False)
    ax3.yaxis.set_tick_params(rotation=90)
    try:
        all_fills.plot(kind='barh', ax=ax4, width=0.6)
    except TypeError:
        pass
    try:
        ax4.text(ax4.get_xlim()[0], ax4.get_ylim()[1],
                 ' Current time: %s \n Current line: %d \n Last amended time: %s \n Last amended line: %d \n VolDone%%=VolDone/OrdVol:\n %.2f%%=%.f/%.f \n Average Price: %.4f \n \n'
                 % (current_t, current_l, amend_at_time, amend_at_line, (sum(all_fills.iloc[:, 0]) / ord_vol * 100),
                    sum(all_fills.iloc[:, 0]), ord_vol,
                    sum(all_fills.iloc[:, 0] * all_fills.iloc[:, 0].index) / sum(all_fills.iloc[:, 0])),
                 fontdict={'size': 15, 'color': 'k'})
    except IndexError:
        pass
    for i in range(len(all_fills.index)):
        x = all_fills.iloc[i, 0] / 2
        y = i
        ax4.text(x, y,
                 r'$%.f@%.2f$' % (all_fills.iloc[i, 0], all_fills.index[i]), fontdict={'size': 8, 'color': 'k'},
                 ha='left', va='center')
    ax4.set_title('Fills Summary')
    ax4.xaxis.set_visible(False)
    ax4.yaxis.set_tick_params(rotation=90)
    temp = list(all_fills.index)
    for i in range(len(temp)):
        if i % 3 != 0:
            temp[i] = ''
    ax4.yaxis.set_ticklabels(temp)
    ax6.axis('off')
    bar = matplotlib.patches.Rectangle((0, 0.49), 1, 0.06, color='yellow')
    ax6.add_artist(bar)
    ax6.text(0, 0.6, 'Market quote timestamp: %s' % mq_timestamp, fontsize=15)
    ax6.text(0.25 - 0.25 / 10 * len(mq_bv), 0.5, mq_bv, fontsize=10, fontweight='bold',
             color='blue')
    ax6.text(0.49 - 0.24 / 9 * len(mq_b), 0.5, mq_b, fontsize=11, fontweight='bold', color='blue')
    ax6.text(0.49, 0.5, '-', fontsize=10, fontweight='bold', color='blue')
    ax6.text(0.51, 0.5, mq_a, fontsize=11, fontweight='bold', color='blue')
    ax6.text(0.75, 0.5, mq_av, fontsize=10, fontweight='bold', color='blue')

def povanimationupdate(i):
    globals()['current_t'] = str(raw_AT.index[i].time())
    globals()['current_l'] = i + 2
    actual_volume[i] = raw_AT.quantity_filled[i]
    bid_series[i] = raw_AT.bid_price[i]
    offer_series[i] = raw_AT.offer_price[i]
    if isinstance(raw_AT.reason_text[i], str):
        if 'Volume Tracking:' in raw_AT.reason_text[i]:
            temp = float(raw_AT.reason_text[i].split()[4][1:-1])
            globals()['participation'] = float(raw_AT.reason_text[i].split()[20][:-1])
            market_volume[i] = temp
            target_volume[i] = temp * participation / 100
            globals()['vt_timestamp'] = str(raw_AT.index[i].time())
        else:
            market_volume[i] = market_volume[i-1] if i > 0 else 0
            target_volume[i] = target_volume[i-1] if i > 0 else 0
            if 'acknowledged:' in raw_AT.reason_text[i]:
                name = str(raw_AT.reason_text[i].split("acknowledged:")[0][:-1])
.
.
.
.
.
.
    try:
        for cs in cross_slices.loc['created_time', cross_slices.loc['created_time', :] < (raw_AT.index[i]-cross_expiry)].index:
            if cs in slices_df.columns:
                slices_df.drop(cs, axis=1, inplace=True)
                cross_slices.drop(cs, axis=1, inplace=True)
        for m in slices_df.index:
            if False not in list(np.isnan(slices_df.loc[m, :])):
                slices_df.drop(m, axis=0, inplace=True)
    except TypeError:
        pass
    plot_func(i)


Writer = animation.writers['ffmpeg']
writer = Writer(fps=40, metadata=dict(artist='Patrick'), bitrate=1800)


ani = animation.FuncAnimation(fig=fig, func=povanimationupdate, frames=tqdm(range(len(raw_AT.index)), initial =1, position = 0), interval=25, blit=False, repeat=False)
ani.save(filename + '.mp4', writer=writer)

print('Video generated!')

Работает нормально, за исключением медлительности.

Протестировав, я обнаружил, что графики matplotlib очень медленно сравниваются со скоростью извлечения информации и обновления DataFrames. Поскольку все 5 графиков используют разные DataFrames в качестве материала, что означает, что они независимы и могут выполняться параллельно, я думаю, что это должно быть достаточно безопасно и легко использовать многопроцессорную обработку только на этапе построения.

(Что касается blit, я попытался определить некоторые фиктивные объекты, а затем обновить их в функции, чтобы я мог использовать blit = True, но это вовсе не улучшает скорость.)

Однако после попытки всех Возможные способы, которые я получил от inte rnet, я все еще не могу заставить многопроцессорную работу работать.

Но есть ввод в самом начале. Я не хочу этого быть запущенным четыре раза, так что поставьте их под условие. Поскольку инициация всех переменных основана на чтении необработанной информации, я должен поместить всю часть инициации переменной в условие if.

if __name__ == '__main__':
.
.
.
.
.

Но это вызвало то, что эти переменные не были доступны другим процессорам. Позже, когда он запускает функцию plot, он дает мне неопределенную ошибку переменных для ax1, ax2, ax3 ... ( NameError: имя 'ax1' не определено ) Я переставил plot_fun c, чтобы он мог обрабатываться несколькими процессорами одновременно за одну итерацию:

def plot_func(i, n):
    if n == 1:
        ax1.clear()
        pd.concat([market_volume, target_volume, actual_volume], axis=1, sort=False).plot(ax=ax1)
        ax1.set_title('Volume Tracking updated at %s' % vt_timestamp)
        ax1.xaxis.set_major_locator(mdates.HourLocator())  # only tick at hour
        ax1.xaxis.set_major_formatter(mdates.DateFormatter("%H:%M"))  # only show hour and minute
        ax1.xaxis.set_tick_params(rotation=0)
        ax1.set_xlim((raw_AT.index[0], end_time))
        ax1.set_xlabel('')
        ax1.legend(loc='upper left', framealpha=0)
        ax1.annotate(r'$%.f$' % market_volume[i], xy=(market_volume.index[i], market_volume[i]), xycoords='data',
                     xytext=(0, -30), textcoords='offset pixels', fontsize=10, arrowprops=dict(arrowstyle='->'))
        ax1.annotate(' Target:%.f\n Filled:%.f\n Target %%: %.2f%%\n Actual %%: %.2f%%' % (
            target_volume[i], actual_volume[i], participation,
            (actual_volume[i] / market_volume[i] * 100) if market_volume[i] != 0 else 0),
                     xy=(target_volume.index[i], target_volume[i]), xycoords='data',
                     xytext=(10, 0), textcoords='offset pixels', fontsize=10)
        ax6.clear()
        ax6.axis('off')
        bar = matplotlib.patches.Rectangle((0, 0.49), 1, 0.06,
                                           color='yellow')  # https://matplotlib.org/3.2.1/api/_as_gen/matplotlib.patches.Rectangle.html#matplotlib.patches.Rectangle
        ax6.add_artist(bar)
        ax6.text(0, 0.6, 'Market quote timestamp: %s' % mq_timestamp, fontsize=15)
        ax6.text(0.25 - 0.25 / 10 * len(mq_bv), 0.5, mq_bv, fontsize=10, fontweight='bold',
                 color='blue')  # https://matplotlib.org/3.1.1/api/_as_gen/matplotlib.axes.Axes.text.html#matplotlib.axes.Axes.text
        ax6.text(0.49 - 0.24 / 9 * len(mq_b), 0.5, mq_b, fontsize=11, fontweight='bold', color='blue')
        ax6.text(0.49, 0.5, '-', fontsize=10, fontweight='bold', color='blue')
        ax6.text(0.51, 0.5, mq_a, fontsize=11, fontweight='bold', color='blue')
        ax6.text(0.75, 0.5, mq_av, fontsize=10, fontweight='bold', color='blue')
        return ax1, ax6
    elif n == 2:
        ax2.clear()
        pd.concat([ord_lim_series, bid_series, offer_series], axis=1, sort=False).plot(ax=ax2)
        ax2.scatter(fills_series.index, fills_series.price, s=20, alpha=0.3,
                    c='b')  
        ax2.xaxis.set_major_locator(mdates.HourLocator())
        ax2.xaxis.set_major_formatter(mdates.DateFormatter("%H:%M"))
        ax2.xaxis.set_tick_params(rotation=0)
        ax2.set_xlim([raw_AT.index[0], end_time])  # till schedule end time
        ax2.set_xlabel('Price & Fills by Time')
        ax2.annotate('Offer: %.2f\n  Bid: %.2f' % (offer_series[i], bid_series[i]),
                     xy=(offer_series.index[i], offer_series[i]), xycoords='data',
                     xytext=(10, 0), textcoords='offset pixels', fontsize=10)
        ax2.annotate('Limit: %.2f' % ord_lim_series[i], xy=(ord_lim_series.index[i], ord_lim_series[i]),
                     xycoords='data',
                     xytext=(10, 0), textcoords='offset pixels', fontsize=10)
        return ax2
    elif n == 3:
        ax3.clear()
        try:
            slices_df.plot(kind='barh', ax=ax3, width=0.4)
        except TypeError:
            pass
        for i, j in product(range(len(slices_df.index)), range(len(slices_df.columns))):
            if not np.isnan(slices_df.iloc[i, j]):  
                x = slices_df.iloc[i, j] / 2  # in the middle of the bar
                y = i - 0.4 / 2 + (j + 0.5) * 0.4 / len(
                    slices_df.columns)  # y adjusted by the current column among all columns,
                # sum of width of all bars (including empty) at one tick is equal to width defined in plot
                ax3.text(x, y,
                         r'$%.f$' % (x * 2),
                         fontdict={'size': 10, 'color': 'k'}, ha='left',
                         va='center')  
        ax3.set_title('Current Outstanding Slices')
        ax3.xaxis.set_visible(False)
        ax3.yaxis.set_tick_params(rotation=90)
        return ax3
    elif n == 4:
        ax4.clear()
        try:
            all_fills.plot(kind='barh', ax=ax4, width=0.6)
        except TypeError:
            pass
        try:
            ax4.text(ax4.get_xlim()[0], ax4.get_ylim()[1],
                     ' Current time: %s \n Current line: %d \n Last amended time: %s \n Last amended line: %d \n VolDone%%=VolDone/OrdVol:\n %.2f%%=%.f/%.f \n Average Price: %.4f \n \n'
                     % (current_t, current_l, amend_at_time, amend_at_line,
                        (sum(all_fills.iloc[:, 0]) / ord_vol * 100), sum(all_fills.iloc[:, 0]), ord_vol,
                        sum(all_fills.iloc[:, 0] * all_fills.iloc[:, 0].index) / sum(all_fills.iloc[:, 0])),
                     fontdict={'size': 15, 'color': 'k'})
        except IndexError:
            pass
        for i in range(len(all_fills.index)):  # only one bar at each tick
            x = all_fills.iloc[i, 0] / 2
            y = i
            ax4.text(x, y,
                     r'$%.f@%.2f$' % (all_fills.iloc[i, 0], all_fills.index[i]), fontdict={'size': 8, 'color': 'k'},
                     ha='left', va='center')
        ax4.set_title('Fills Summary')
        ax4.xaxis.set_visible(False)
        ax4.yaxis.set_tick_params(rotation=90)
        temp = list(all_fills.index)
        for i in range(len(temp)):
            if i % 3 != 0:
                temp[i] = ''
        ax4.yaxis.set_ticklabels(temp)
        return ax4

I добавить эту часть в povanimationupdate для вызова plot_fun c.

mp.freeze_support()
pool = mp.Pool()
pool.starmap(plot_func, [(i,1), (i,2), (i,3), (i,4)])

Чтобы устранить ошибку, я попробовал два способа, но не смог. Может кто-нибудь, пожалуйста, сообщите мне, что я сделал неправильно или какие-либо другие решения?

  1. Поместите всю часть инициации переменной за пределы условия if, за исключением ввода. Прочитайте файл и сохраните его как строку в многопроцессорной обработке. Значение в случае, если name == ' main ':

Затем используйте string.value вне if name == ' main ': генерировать переменные, чтобы все процессоры запускали переменные. Но строка попала в неопределенную ошибку переменных. (NameError: имя 'atstring' не определено) ???

Я пытался разделить те переменные, которые необходимы в моей функции печати, между процессорами, даже если они находятся под name == ' main ' :. Я вижу, что Manager () может помочь. Но у меня есть все виды объектов - разные DataFrames, объекты осей matplotlib. Похоже, память акций может использоваться только для значений? Как мне использовать Manager () здесь?

И последнее, но не менее важное: у меня нет возможности проверить его, так как мне это никогда не удастся, но - будут ли 5 ​​осей естественным образом go превращаться в одну фигуру, даже если они генерируются с разных процессоров, если все работает нормально? Нужно ли дополнительно указывать что-то, чтобы они отображались на одной фигуре?

Я знаю, что это большой вопрос. А мой код длинный и его нелегко воспроизвести. Так что любая идея или намек будут оценены. Я думаю, что это больше похоже на изменение структуры / мышления / методологии.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...