Имея следующую схему и данные:
create table test.ct1 (
id text primary key,
country text,
cnt int);
insert into test.ct1(id, country, cnt) values('A', 'RU', 1);
insert into test.ct1(id, country, cnt) values('B', 'EN', 2);
insert into test.ct1(id, country, cnt) values('C', 'IQ', 1);
insert into test.ct1(id, country, cnt) values('D', 'RU', 3);
create table test.ct2 (
main_id text primary key,
country_main text,
cnt int);
insert into test.ct2(main_id, cnt) values('A', 1);
insert into test.ct2(main_id, country_main, cnt) values('B', 'EN', 2);
insert into test.ct2(main_id, country_main, cnt) values('C', 'IQ', 1);
insert into test.ct2(main_id, country_main, cnt) values('D', 'RU', 3);
Это должно быть примерно так:
from pyspark.sql.functions import *
ct1 = spark.read.format("org.apache.spark.sql.cassandra")\
.option("table", "ct1").option("keyspace", "test").load()
ct2 = spark.read.format("org.apache.spark.sql.cassandra")\
.option("table", "ct2").option("keyspace", "test").load()\
.where(col("country_main").isNull())
res = ct1.join(ct2, ct1.id == ct2.main_id).select(col("main_id"),
col("country").alias("country_main"))
res.write.format("org.apache.spark.sql.cassandra")\
.option("table", "ct2").option("keyspace", "test")\
.mode("append").save()
Что делает код:
- выбирает все строки из
ct2
(соответствует вашей таблице main
), где country_main
равно null
; - выполняет соединение с
ct1
(соответствует вашей таблице test
), чтобы получить из нее значение страны (оптимизация может заключаться в том, чтобы выбрать только необходимые столбцы из обеих таблиц). Также обратите внимание, что объединение выполняется Spark, а не на уровне Cassandra - объединения на уровне Cassandra будут поддерживаться только в следующей версии Spark Cassandra Connector (3.0, но альфа-версии уже опубликованы); - переименовывает столбцы в структура соответствия таблицы
ct2
; - запись данных обратно.
Результат:
cqlsh> select * from test.ct2;
main_id | cnt | country_main
---------+-----+--------------
C | 1 | IQ
B | 2 | EN
A | 1 | RU
D | 3 | RU
для исходных данных:
cqlsh> select * from test.ct2;
main_id | cnt | country_main
---------+-----+--------------
C | 1 | IQ
B | 2 | EN
A | 1 | null
D | 3 | RU