Я новичок ie для Cassandra и хочу реализовать SCD Type-1 в Cassandra DB.
Это задание SCD Type1 будет выполнено из Spark.
Данные будет храниться как данные, разделенные по временным рядам. а именно: год / месяц / день
Пример : у меня есть записи за последние 300 дней, и в моих новых записях могут быть как новые, так и обновленные записи. Я хочу сравнить обновленные записи за последние 100 дней, и если записи новые, то он должен выполнить операцию вставки, иначе обновить.
Я не получаю никаких подсказок для выполнения этой операции, поэтому не делюсь никаким CQL: (
Пример структуры таблицы:
CREATE TABLE crossfit_gyms_by_city_New (
country_code text,
state_province text,
city text,
gym_name text,
PRIMARY KEY ((country_code, state_province), gym_name)
) WITH CLUSTERING ORDER BY (gym_name ASC );
Мой пример кода Spark:
object SparkUpdateCassandra {
System.setProperty("hadoop.home.dir", "C:\\hadoop\\")
def main(args: Array[String]): Unit = {
val spark = org.apache.spark.sql.SparkSession
.builder()
.master("local[*]")
.config("spark.cassandra.connection.host", "localhost")
.appName("Spark Cassandra Connector Example")
.getOrCreate()
import spark.implicits._
//Read Cassandra data using DataFrame
val FirstDF = Seq(("India", "WB", "Kolkata", "Cult Fit"),("India", "KA", "Bengaluru", "Cult Fit")).toDF("country_code", "state_province","city","gym_name")
FirstDF.show(10)
FirstDF.write
.format("org.apache.spark.sql.cassandra")
.mode("append")
.option("confirm.truncate", "true")
.option("spark.cassandra.connection.host", "localhost")
.option("spark.cassandra.connection.port", "9042")
.option("keyspace", "emc_test")
.option("table", "crossfit_gyms_by_city_new")
.save()
val loaddf1 = spark.read
.format("org.apache.spark.sql.cassandra")
.option("spark.cassandra.connection.host", "localhost")
.option("spark.cassandra.connection.port", "9042")
.options(Map( "table" -> "crossfit_gyms_by_city_new", "keyspace" -> "emc_test"))
.load()
loaddf1.show(10)
// spark.implicits.wait(5000)
val SecondDF = Seq(("India", "WB", "Siliguri", "CultFit"),("India", "KA", "Bengaluru", "CultFit")).toDF("country_code", "state_province","city","gym_name")
SecondDF.show(10)
SecondDF.write
.format("org.apache.spark.sql.cassandra")
.mode("append")
.option("confirm.truncate", "true")
.option("spark.cassandra.connection.host", "localhost")
.option("spark.cassandra.connection.port", "9042")
.option("keyspace", "emc_test")
.option("table", "crossfit_gyms_by_city_new")
.save()
val loaddf2 = spark.read
.format("org.apache.spark.sql.cassandra")
.option("spark.cassandra.connection.host", "localhost")
.option("spark.cassandra.connection.port", "9042")
.options(Map( "table" -> "crossfit_gyms_by_city_new", "keyspace" -> "emc_test"))
.load()
loaddf2.show(10)
}
}
Примечание. Я использую Scala для платформы Spark.