Разъем Spark Cassandra: агрегат SCD типа 1 - PullRequest
1 голос
/ 22 января 2020

Я новичок ie для Cassandra и хочу реализовать SCD Type-1 в Cassandra DB.
Это задание SCD Type1 будет выполнено из Spark.
Данные будет храниться как данные, разделенные по временным рядам. а именно: год / месяц / день

Пример : у меня есть записи за последние 300 дней, и в моих новых записях могут быть как новые, так и обновленные записи. Я хочу сравнить обновленные записи за последние 100 дней, и если записи новые, то он должен выполнить операцию вставки, иначе обновить.

Я не получаю никаких подсказок для выполнения этой операции, поэтому не делюсь никаким CQL: (

Пример структуры таблицы:

CREATE TABLE crossfit_gyms_by_city_New (  
 country_code text,  
 state_province text,  
 city text,  
 gym_name text,  
 PRIMARY KEY ((country_code, state_province), gym_name)  
) WITH CLUSTERING ORDER BY (gym_name ASC );

Мой пример кода Spark:


object SparkUpdateCassandra {
  System.setProperty("hadoop.home.dir", "C:\\hadoop\\")

  def main(args: Array[String]): Unit = {
    val spark = org.apache.spark.sql.SparkSession
      .builder()
      .master("local[*]")
      .config("spark.cassandra.connection.host", "localhost")
      .appName("Spark Cassandra Connector Example")
      .getOrCreate()

    import spark.implicits._

    //Read Cassandra data using DataFrame
    val FirstDF = Seq(("India", "WB", "Kolkata", "Cult Fit"),("India", "KA", "Bengaluru", "Cult Fit")).toDF("country_code", "state_province","city","gym_name")
    FirstDF.show(10)
    FirstDF.write
          .format("org.apache.spark.sql.cassandra")
          .mode("append")
          .option("confirm.truncate", "true")
          .option("spark.cassandra.connection.host", "localhost")
          .option("spark.cassandra.connection.port", "9042")
          .option("keyspace", "emc_test")
          .option("table", "crossfit_gyms_by_city_new")
          .save()
    val loaddf1 = spark.read
      .format("org.apache.spark.sql.cassandra")
      .option("spark.cassandra.connection.host", "localhost")
      .option("spark.cassandra.connection.port", "9042")
      .options(Map( "table" -> "crossfit_gyms_by_city_new", "keyspace" -> "emc_test"))
      .load()
    loaddf1.show(10)

//    spark.implicits.wait(5000)

    val SecondDF = Seq(("India", "WB", "Siliguri", "CultFit"),("India", "KA", "Bengaluru", "CultFit")).toDF("country_code", "state_province","city","gym_name")
    SecondDF.show(10)

    SecondDF.write
      .format("org.apache.spark.sql.cassandra")
      .mode("append")
      .option("confirm.truncate", "true")
      .option("spark.cassandra.connection.host", "localhost")
      .option("spark.cassandra.connection.port", "9042")
      .option("keyspace", "emc_test")
      .option("table", "crossfit_gyms_by_city_new")
      .save()

    val loaddf2 = spark.read
      .format("org.apache.spark.sql.cassandra")
      .option("spark.cassandra.connection.host", "localhost")
      .option("spark.cassandra.connection.port", "9042")
      .options(Map( "table" -> "crossfit_gyms_by_city_new", "keyspace" -> "emc_test"))
      .load()
    loaddf2.show(10)


  }
}

Примечание. Я использую Scala для платформы Spark.

Ответы [ 2 ]

0 голосов
/ 22 января 2020

Для этого есть некоторые факты, которые помогут вам ориентироваться в примерах кода, с которыми вы столкнетесь

В предыдущем коде Spark 1 мы использовали
1 A SparkContext см. Документы
2 Для подключения к Cassandra используйте CassandraSQLContext, созданный с помощью SparkContext

Для Spark 2 это в основном изменилось
Создайте сеанс Spark и CassandraConnector [1]

Затем вы запустите свой собственный SQL с сеансом, как показано в [1]

После того, как вы настроите и работаете, вы можете просто выполнить соответствующий sql для операций SCD типа 1, хороших примеров участвующих sql можно найти.

0 голосов
/ 22 января 2020

В Cassandra все в порядке - если строка не существует, она будет вставлена, если она существует, то она будет обновлена, поэтому вам просто нужно перенести ваши данные в RDD или DataFrame и использовать соответствующую функцию Spark Cassandra Connector :

saveToCassandra для RDD API :

rdd.saveToCassandra("keyspace", "table")

Или просто write inDataFrame API :

df.write
  .format("org.apache.spark.sql.cassandra")
  .options(Map("table" -> "table_name", "keyspace" -> "keyspace_name"))
  .mode(SaveMode.Append)
  .save()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...