Вычисление и вывод многих оценок плотности искрового ядра параллельно - PullRequest
0 голосов
/ 04 апреля 2019

Я бы хотел сделать

  1. оценка плотности ядра сегментированного или группового pypark
  2. Соедините полученные оценки плотности с другим фреймом данных и сделайте вывод

Например, представьте, что у меня есть кадр данных, который выглядит следующим образом:

data=[{'id':1, 'samples':[3,56,40]},
      {'id':2, 'samples':[-3,80,45,45,2]}]

Эти данные получены из чего-то вроде

df.groupBy('id').agg(F.collect_list('sample').alias('samples'))

где df большой. Тогда представьте, что у меня есть еще один большой массив данных, например:

data2 = [{'id':1, 'val': 10},
         {'id':1, 'val': 39},
         {'id':2, 'val': 5}]

Я хотел бы получить вероятности для этих трех значений, 10, 39, 5 по отношению к двум оценкам плотности, которые я получил выше.

Например, программа на Python, которая будет делать это, будет

import scipy.stats

data_to_define_pdfs=[{'id':1, 'samples':[3,56,40]},
                     {'id':2, 'samples':[-3,80,45,45,2]}]
kdes = {}
for row in data_to_define_pdfs:
    kdes[row['id']] = scipy.stats.gaussian_kde(row['samples'])

inferrence_data = [
    {'id': 1, 'val': 10},
    {'id': 1, 'val': 39},
    {'id': 2, 'val': 5}]

for row in inferrence_data:
    kde = kdes[row['id']]
    row['prob'] = kde.pdf(x=row['val'])[0]

import pprint
pprint.pprint(inferrence_data)

что бы вывести

[{'id': 1, 'prob': 0.008817584549791962, 'val': 10},
 {'id': 1, 'prob': 0.012149240532443975, 'val': 39},
 {'id': 2, 'prob': 0.008013522166302479, 'val': 5}]

Ответы [ 2 ]

0 голосов
/ 05 апреля 2019

Если вы можете хранить все сэмплы для KDE локально, у меня есть решение для pandas dataframe - по крайней мере, с этим игрушечным примером. Кажется, всегда трудно заставить работать с данными pandas масштабно из-за того, как он использует память.

В этом решении все kde формируются на главном узле и отправляются на все узлы задачи - каждый kde является функцией всех образцов данных для этого идентификатора, поэтому мне придется отбирать данные, которые делают df_samples * * 1004

def do_inference_pd(df_infer, df_samples):
    rows = df_samples.collect()
    kdes = {}
    for row in rows:
        row = row.asDict(True)
        kdes[row['id']] = scipy.stats.gaussian_kde(np.array(row['samples']))

    def kde_prob(pdf):
        kde = kdes[pdf.id[0]]
        x = pdf.val
        return pdf.assign(prob=kde(x))

    df_infer_prob = df_infer.withColumn('prob', F.lit(0.0))
    sch_str = df_infer2.schema.simpleString()
    f = F.pandas_udf(f=kde_prob, returnType=sch_str, functionType=F.PandasUDFType.GROUPED_MAP)
    df_infer_prob = df_infer_prob.groupBy('id').apply(f)
    return df_infer_prob


df_infer_prob = do_inference_pd(df_infer=df_infer, df_samples=df_samples)
0 голосов
/ 04 апреля 2019

У меня есть решение, в котором я объединяю все выборки с данными для вывода, это неоптимально, так как я могу реплицировать множество выборок, и я регенерирую объект python scipy kde для каждой строки в данных, к которым я применяюkde - но это начало, я могу представить, что делаю что-то умнее:

data_to_define_pdfs_flat = []
for row in data:
    for sample in row['samples']:
        data_to_define_pdfs_flat.append({'id':row['id'], 'sample': sample})

df_sample = spark.createDataFrame(data=data_to_define_pdfs_flat,
                                  schema=T.StructType([T.StructField('id', T.IntegerType(), False),
                                                       T.StructField('sample', T.FloatType(), False)]))
df_samples = df_sample.groupBy('id').agg(F.collect_list('sample').alias('samples'))

df_infer = spark.createDataFrame(data=data2,
                                 schema=T.StructType([T.StructField('id', T.IntegerType(), False),
                                                      T.StructField('val', T.FloatType(), False)]))

df_infer2 = df_infer.join(df_samples, on='id')


def do_inference(df):
    def f(samples, val):
        kde = scipy.stats.gaussian_kde(samples)
        return float(kde.pdf(val)[0])

    udf_f = F.udf(f, T.FloatType())
    return df.withColumn('prob', udf_f(F.col('samples'), F.col('val')))


df_infer2 = do_inference(df=df_infer2)
df_samples.show()
df_infer2.show()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...