Как использовать Dask Delayed с rpy2? - PullRequest
0 голосов
/ 26 апреля 2018

Я пытаюсь использовать Dask, в частности, dask с задержкой, чтобы генерировать прогноз временных рядов параллельно с использованием rpy2 и пакета прогноза в R. Мой процесс работает, когда используется только 1 ядро, но я получаю

NotImplementedError: Conversion 'py2ri' not defined for objects of type '<class 'pandas.core.series.Series'>'

при использовании dask задерживается более чем с 1 ядром. Код, использованный для воспроизведения этой проблемы, показан ниже:

from rpy2.robjects.packages import importr
from rpy2.robjects import pandas2ri
import rpy2.robjects as robjects
#get ts object as python object
ts=robjects.r('ts')
pandas2ri.activate()

import pandas as pd
import numpy as np
from dask.distributed import Client, LocalCluster
import dask

#start cluster:

cluster = LocalCluster()
client = Client(cluster)

#define R function to generate time series in R from python series
def r_vecs(time_series):

    rdata=ts(time_series,frequency=12)

    return rdata

#Generate DataFrame of time series
rows = 24
ncolumns = 5
column_names = ['ts1','ts2','ts3','ts4','ts5']
df = pd.DataFrame(np.random.randint(0,10000,size=(rows, ncolumns)), columns=column_names)
df_date_index = pd.date_range(end='2018-04-01', periods=rows, freq='MS')
df.index = df_date_index 

Используйте задержку dask для циклического прохождения каждого временного ряда в кадре данных и превращения во временной ряд

Работает

output_fc_R = []
for i in df:
    forecasted_series = r_vecs(df[i])
    output_fc_R.append(forecasted_series)

output_fc_R

Не работает :

#Try to forecast in parallel with Dask
output_fc_R = []
for i in df:
    forecasted_series = dask.delayed(r_vecs)(df[i])
    output_fc_R.append(forecasted_series)

total = dask.delayed(output_fc_R).compute()

1 Ответ

0 голосов
/ 30 апреля 2018

Я до сих пор не уверен, что именно вызывает проблему, но когда я впервые явно преобразую временной ряд в объект R Intvector, кажется, что все работает правильно.

def r_vecs(time_series):

    time_series = robjects.IntVector(time_series)
    rdata=ts(time_series,frequency=12)

    return rdata

В моем исходном сообщении также были различные проблемы, связанные с подгонкой модели R в пакете прогноза путем оценки строки python. Если вы хотите следовать полной теме см .: https://github.com/dask/distributed/issues/1939

...