Я выполняю некоторые упражнения на виртуальной машине DataStax.Дается таблица CassandraTable, и я буду выполнять некоторую фильтрацию и повторное удаление 5 верхних элементов, используя функции Spark API, а не функции cassandra-querie.
Там я делаю следующее:
val cassRdd = sc.cassandraTable("killr_video", "videos_by_year_title")
val cassRdd2 = cassRdd.filter(r=>r.getString("title") >= "T")
println("1" : + cassRdd2)
println("2" : + cassRdd2.count)
println("3" : + cassRdd2.take(5))
println("4" : + cassRdd2.take(5).count)
Результат:
- 1: MapPartitionsRDD [185] при фильтре в: 19
- 2: 2250
- 3: [Lcom.datastax.spark.connector.CassandraRow; @ 56fd2e09
- 4: ошибка компиляции (отсутствуют аргументы для счетчика методов в traitable TranceableOnce
Что я ожидал:
- 1: и 2:работать как ожидалось
- 3: возвращает только одну строку? Я бы ожидал СДР из 5 строк Кассандры
- 4: это не число rdd после 3:, следовательно, я не ожидал, что это сработает,похоже, это своего рода метод cassandraRow-count, который я не собирался вызывать
Решение, данное Datastax, использует RDD и выполняет преобразование карты для него, чтобы взять только заголовок иэтот новый title-rdd выполняет фильтрацию и команду take.
Хорошо, работает, но я не понимаю, почему take не работает на RDD-of CassandraRow или каков может быть результат.
val cassRdd2 = cassRdd.map(r=>r.getString("title")).filter(t >= "T")
Я думал, что команда take на любом RDD (независимо от его содержимого) будет всегда делать то же самое, беря первые элементы x, что приводит к созданию нового СДР того же типа с размером элементов x.