Как выполнить накопленное среднее значение для нескольких компаний, использующих искру, на основе результатов, сохраненных в Cassandra? - PullRequest
0 голосов
/ 02 апреля 2019

Мне нужно получить среднее значение и количество для данного кадра данных, а также получить ранее сохраненное среднее значение и количество из значений таблицы Cassandra для каждой компании.

Затем нужно вычислить среднее значение, рассчитать и сохранить обратно в таблицу Кассандры.

Как я могу сделать это для каждой компании?

У меня есть две схемы данных, как показано ниже

ingested_df
 |-- company_id: string (nullable = true)
 |-- max_dd: date (nullable = true)
 |-- min_dd: date (nullable = true)
 |-- mean: double (nullable = true)
 |-- count: long (nullable = false)

cassandra_df 
 |-- company_id: string (nullable = true)
 |-- max_dd: date (nullable = true)
 |-- mean: double (nullable = true)
 |-- count: long (nullable = false)

Для каждой company_id мне нужно сохранить «среднее» и «количество» и вычислить "new_mean" и "new_count" и сохранить обратно в Кассандре ...

т.е.

    new_mean = ( ingested_df.mean  + cassandra_df.mean) / (ingested_df.count + cassandra_df.count)

   new_count  = (ingested_df.count + cassandra_df.count)

Как это можно сделать для каждой компании?

Второй раз:

Когда я попробовал ниже присоединиться к той же логике, упомянутой выше

 val resultDf = cassandra_df.join(ingested_df , 
                            ( cassandra_df("company_id") === ingested_df ("company_id") )
                            ( ingested_df ("min_dd") > cassandra_df("max_dd") )
                        , "left")

Это ошибка, как показано ниже: org.apache.spark.sql.AnalysisException: ссылка 'cassandra_df' является неоднозначной, может быть: company_id, company_id .; в org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve (LogicalPlan.scala: 213)

что здесь не так?

1 Ответ

1 голос
/ 02 апреля 2019

Пожалуйста, попробуйте следующий подход:

import spark.implicits._

val ingested_df = Seq(("1", "10", "3")).toDF("company_id", "mean", "count")
val cassandra_df = Seq(("1", "123123", "20", "10")).toDF("company_id", "max_dd", "mean", "count")

val preparedIngestedDf = ingested_df.select("company_id", "mean", "count")

val resultDf = cassandra_df.join(preparedIngestedDf, Seq("company_id"), "left")
  .withColumn("new_mean", (ingested_df("mean") + cassandra_df("mean")) / (ingested_df("count") + cassandra_df("count")))
  .withColumn("new_count", ingested_df("count") + cassandra_df("count"))
  .select(
    col("company_id"),
    col("max_dd"),
    col("new_mean").as("mean"),
    col("new_count").as("new_count")
  )
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...