Cassandra joinWithCassandraTable не работает в кластере cassandra, но работает в автономном режиме - PullRequest
0 голосов
/ 02 августа 2020

У меня есть 2 экземпляра cassandra (локальный автономный), кластер с 3 узлами Я создал таблицу

CREATE TABLE dev.test (
a text,
b int,
c text,
PRIMARY KEY ((a, b), c)) WITH CLUSTERING ORDER BY (c ASC)

Я ввел данные

a | b | c
---+---+---
A | 1 | C
B | 2 | D

Я подключаюсь к искровой оболочке, используя ниже команда. Для кластера это i / p одной из машин в кластере. то ниже приведены отдельные команды

./spark-shell  --conf spark.cassandra.connection.host=127.0.0.1 --packages com.datastax.spark:spark-cassandra-connector_2.12:3.0.0-beta 

scala> import com.datastax.spark.connector._
import com.datastax.spark.connector._

scala> case class PrimaryKey(a: String,b: Int)
defined class PrimaryKey

scala> val idsOfInterest = sc.sparkContext.parallelize(Seq(PrimaryKey("B",2)))
idsOfInterest: org.apache.spark.rdd.RDD[PrimaryKey] = ParallelCollectionRDD[0] at parallelize at <console>:29

scala> val repartitioned =  idsOfInterest.repartitionByCassandraReplica("dev", "test" )
repartitioned: com.datastax.spark.connector.rdd.partitioner.CassandraPartitionedRDD[PrimaryKey] = CassandraPartitionedRDD[6] at RDD at CassandraPartitionedRDD.scala:18

Теперь я вижу разницу на этом шаге ** На моем локальном **

scala> repartitioned.partitions
res1: Array[org.apache.spark.Partition] = Array(ReplicaPartition(0,[Ljava.lang.String;@6d4693ee), ReplicaPartition(1,[Ljava.lang.String;@75038f62), ReplicaPartition(2,[Ljava.lang.String;@31bb5652), ReplicaPartition(3,[Ljava.lang.String;@77b46743), ReplicaPartition(4,[Ljava.lang.String;@2eb453b8), ReplicaPartition(5,[Ljava.lang.String;@4725a193), ReplicaPartition(6,[Ljava.lang.String;@2c4766e3), ReplicaPartition(7,[Ljava.lang.String;@781d9257), ReplicaPartition(8,[Ljava.lang.String;@7438377f), ReplicaPartition(9,[Ljava.lang.String;@1e7b3952))

Те же шаги при подключении к кластеру (одна из точек контакта) 1013

scala> repartitioned.partitions
res0: Array[org.apache.spark.Partition] = Array()
Обновление То же самое работает, если я вхожу в любую машину cassandra и выполняю эти шаги. Однако, когда я убегаю от своего мастера искрового кластера, он не работает
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...