Как оптимизировать разность декартовых косинусов между векторами в PySpark? - PullRequest
0 голосов
/ 21 мая 2019


У меня есть PyDpark RDD, который содержит идентификаторы предложений и векторы:

Row(review_id='R1EGUNQON2J277_id_What', vector=DenseVector([0.033, 0.1455, -0.1428])), 
Row(review_id='R1FIU59Z4UXOIT_id_I wanted the gift car4 for MUSIC not movies', vector=DenseVector([-0.0121, 0.1022, -0.0883])), 
Row(review_id='R359VY6VMS5CKK_id_I had no idea that the amount is listed in US dollars', vector=DenseVector([0.0597, 0.0795, 0.1087])), 
Row(review_id='R359VY6VMS5CKK_id_I ended purchasing US $100 instead of CAD $100, which meant extra $30CAD', vector=DenseVector([0.1173, 0.1267, -0.0521]))]


Я хочу вычислить центральный вектор списка и первые 10 ближайших векторов.
Я использую UDF для разности косинусов и функции OOTB agg, min и т. Д. Следующим образом:

# Function for cosine similarity
def cos_sim(a,b):
    return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)))

# UDF to get cosine difference
def diff(row):
  return row[0][0],row[1][0],1- cos_sim(row[0][1],row[1][1])

# Function to get the Center of Vectors
def get_center(pca_embeddings) :

  # Get the cartesian products and calculate cosine difference between each pairs
  cartesian_product = pca_embeddings.cartesian(pca_embeddings)
  cosine_rdd = cartesian_product.map(diff)
  new_df = cosine_rdd.map(lambda x: (x[0], x[1], x[2])).toDF(["A", "B" , "CosineDiff"])

  # Get the Average cosine differences fro each Review A
  w = Window().partitionBy(new_df["ReviewA"])
  mean_df = (new_df.withColumn("mean", avg("CosineDiff").over(w)))

  # Get the ranks of Reviews B for each Review A
  window = Window.partitionBy(mean_df['A']).orderBy(mean_df['CosineDiff'])
  rank_df = mean_df.select('*', rank().over(window).alias('rank'))

  # Collect top 11 Review Bs for each Review A including itself
  final_df = rank_df.filter(rank_df['rank']<12).groupBy('ReviewA').agg(F.collect_list("B").alias("Nearest Neighbours"),F.max("mean").alias('Mean Cosine'))

  # Get the Review A with smallest Average Cosine Difference
  final_row = final_df.select(F.min(
    F.struct("Mean Cosine", *(x for x in final_df.columns if x != "Mean Cosine")))).first()

  return final_row.asDict()['min(named_struct(NamePlaceholder(), Mean Cosine, NamePlaceholder(), ReviewA, NamePlaceholder(), Nearest Neighbours))'].asDict()



Это займет вечность, чтобы запустить в списке 120000 векторов.
Как я могу оптимизировать код, чтобы дать вывод?
Кроме того, не могли бы вы дать мне несколько советов, чтобы ускорить выполнение кода на EMR?

Спасибо.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...