Вставка данных из фрейма данных pyspark в другую таблицу cassandra с помощью pyspark - PullRequest
2 голосов
/ 21 апреля 2020

У меня есть Кассандра таблица - test :

+----+---------+---------+
| id | country | counter |
+====+=========+=========+
|  A |      RU |       1 |
+----+---------+---------+
|  B |      EN |       2 |
+----+---------+---------+
|  C |      IQ |       1 |
+----+---------+---------+
|  D |      RU |       3 |
+----+---------+---------+

Также у меня есть таблица main в том же пространстве с колонкой "country_main" и "main_id". В столбце main_id у меня есть те же идентификаторы, что и в тестовой таблице, а также у меня есть несколько уникальных идентификаторов. country_main имеет пустые значения и такие же, как в тесте. Например:

+---------+--------------+---------+
| main_id | country_main |      ...|
+=========+==============+=========+
|  A      |              |      ...|
+---------+--------------+---------+
|  B      |      EN      |      ...|
+---------+--------------+---------+
|  Y      |      IQ      |      ...|
+---------+--------------+---------+
|  Z      |      RU      |      ...|
+---------+--------------+---------+

Как вставить данные из тестовой таблицы в основную с помощью pyspark для заполнения пустых значений в country_main в соответствии с идентификаторами?

1 Ответ

1 голос
/ 21 апреля 2020

Имея следующую схему и данные:

create table test.ct1 (
  id text primary key,
  country text,
  cnt int);

insert into test.ct1(id, country, cnt) values('A', 'RU', 1);
insert into test.ct1(id, country, cnt) values('B', 'EN', 2);
insert into test.ct1(id, country, cnt) values('C', 'IQ', 1);
insert into test.ct1(id, country, cnt) values('D', 'RU', 3);


create table test.ct2 (
  main_id text primary key,
  country_main text,
  cnt int);

insert into test.ct2(main_id, cnt) values('A', 1);
insert into test.ct2(main_id, country_main, cnt) values('B', 'EN', 2);
insert into test.ct2(main_id, country_main, cnt) values('C', 'IQ', 1);
insert into test.ct2(main_id, country_main, cnt) values('D', 'RU', 3);

Это должно быть примерно так:

from pyspark.sql.functions import *

ct1 = spark.read.format("org.apache.spark.sql.cassandra")\
   .option("table", "ct1").option("keyspace", "test").load()

ct2 = spark.read.format("org.apache.spark.sql.cassandra")\
  .option("table", "ct2").option("keyspace", "test").load()\
  .where(col("country_main").isNull())

res = ct1.join(ct2, ct1.id == ct2.main_id).select(col("main_id"), 
  col("country").alias("country_main"))
res.write.format("org.apache.spark.sql.cassandra")\
   .option("table", "ct2").option("keyspace", "test")\
   .mode("append").save()

Что делает код:

  1. выбирает все строки из ct2 (соответствует вашей таблице main), где country_main равно null;
  2. выполняет соединение с ct1 (соответствует вашей таблице test), чтобы получить из нее значение страны (оптимизация может заключаться в том, чтобы выбрать только необходимые столбцы из обеих таблиц). Также обратите внимание, что объединение выполняется Spark, а не на уровне Cassandra - объединения на уровне Cassandra будут поддерживаться только в следующей версии Spark Cassandra Connector (3.0, но альфа-версии уже опубликованы);
  3. переименовывает столбцы в структура соответствия таблицы ct2;
  4. запись данных обратно.

Результат:

cqlsh> select * from test.ct2;

 main_id | cnt | country_main
---------+-----+--------------
       C |   1 |           IQ
       B |   2 |           EN
       A |   1 |           RU
       D |   3 |           RU

для исходных данных:

cqlsh> select * from test.ct2;
main_id | cnt | country_main
---------+-----+--------------                                       
       C |   1 |           IQ                                  
       B |   2 |           EN                                                                                         
       A |   1 |         null                                      
       D |   3 |           RU
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...