Spark To Cassandra: Написание разреженных строк без нулевых значений для Cassandra - PullRequest
0 голосов
/ 05 ноября 2018

В: Как мне записать в Cassanrda только столбцы со значениями из Spark DataFrame и сделать это эффективно? (эффективно, как в минимальных строках кода Scala, и не создавать кучу надгробий в Кассандре, быстро запускать их и т. д.)

У меня есть таблица Кассандры с двумя ключевыми столбцами и 300 потенциальными значениями дескриптора.

create table sample {
    key1   text,
    key2   text,
    0      text,
    ............
    299    text,
    PRIMARY KEY (key1, key2)
}

У меня есть фрейм данных Spark, соответствующий базовой таблице, но каждая строка в кадре данных очень разрежена - кроме двух ключевых значений, конкретная строка может иметь только 4-5 «дескрипторов» (столбцы 0-> 299) со значением.

В настоящее время я преобразовываю фрейм данных Spark в RDD и использую saveRdd для записи данных.

Это работает, но «ноль» сохраняется в столбцах, когда значение отсутствует.

Например:

  val saveRdd = sample.rdd

  saveRdd.map(line => (
    line(0), line(1), line(2),
    line(3), line(4), line(5),
    line(6), line(7), line(8),
    line(9), line(10), line(11),
    line(12), line(13), line(14),
    line(15), line(16), line(17),
    line(18), line(19), line(20))).saveToCassandra..........

Создает это в Кассандре:

XYZ | 10 | 49849 | F | | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | TO11142017_Import | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | 20 | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | Скотт Дик-Педди | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | 13.07.2014 0:00 | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | 0 | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | 8 | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | МЕСТО | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | МЕСТО | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | ноль | нуль

Настройка SparkSession для spark.cassandra.output.ignoreNulls не работает:

spark.conf.set("spark.cassandra.output.ignoreNulls", "true")
spark.conf.get("spark.cassandra.output.ignoreNulls")

Это тоже не работает:

spark-shell  --conf spark.cassandra.output.ignoreNulls=true

(пробовал разные способы установить это, и, кажется, это не работает так, как я его установил)

withColumn и фильтр не являются подходящими решениями. Неустановленное понятие может быть правильным, но не уверен, как использовать это в этом случае.

cassandra.3.11.2

искровая Cassandra-разъем: 2.3.0-s_2.11 * * тысяча тридцать-две

искра 2.2.0.2.6.3.0-235

Спасибо!

1 Ответ

0 голосов
/ 06 ноября 2018

Вы уверены, что ignoreNulls не работает для вас? Кассандра выводит null, когда в данной ячейке нет значения. Вы можете проверить, действительно ли данные записаны в SSTable, используя инструмент sstabledump - вы обязательно увидите ячейки с прикрепленной информацией об удалении (именно так хранятся нули).

Вот пример запуска Spark без ignoreNulls (по умолчанию), а ignoreNulls имеет значение true. Тестирование проводилось на DSE 5.1.11, который имеет более старую версию коннектора, но соответствует Cassandra 3.11.

Давайте создадим тестовую таблицу следующим образом:

create table test.t3 (id int primary key, t1 text, t2 text, t3 text);

без ignoreNulls - нам нужен следующий код для тестирования:

case class T3(id: Int, t1: Option[String], t2: Option[String], t3: Option[String])
val rdd = sc.parallelize(Seq(new T3(1, None, Some("t2"), None)))
rdd.saveToCassandra("test", "t3")

Если мы посмотрим на данные, используя cqlsh, мы увидим следующее:

cqlsh:test> SELECT * from test.t3;

 id | t1   | t2 | t3
----+------+----+------
  1 | null | t2 | null

(1 rows)

После выполнения nodetool flush мы можем заглянуть в SSTables. Вот что мы увидим здесь:

>sstabledump mc-1-big-Data.db
[
  {
    "partition" : {
      "key" : [ "1" ],
      "position" : 0
    },
    "rows" : [
      {
        "type" : "row",
        "position" : 30,
        "liveness_info" : { "tstamp" : "2018-11-06T07:53:38.418171Z" },
        "cells" : [
          { "name" : "t1", "deletion_info" : { "local_delete_time" : "2018-11-06T07:53:38Z" }
          },
          { "name" : "t2", "value" : "t2" },
          { "name" : "t3", "deletion_info" : { "local_delete_time" : "2018-11-06T07:53:38Z" }
          }
        ]
      }
    ]
  }
]

Вы можете видеть, что для столбцов t1 & t3, которые были нулевыми, есть поле deletion_info.

Теперь удалите данные с помощью TRUNCATE test.t3 и снова запустите spark-shell, установив для ignoreNulls значение true:

dse spark --conf spark.cassandra.output.ignoreNulls=true

После выполнения того же кода Spark мы увидим те же результаты в cqlsh:

cqlsh:test> SELECT * from test.t3;

 id | t1   | t2 | t3
----+------+----+------
  1 | null | t2 | null

Но после выполнения сброса sstabledump показывает совершенно другую картину:

>sstabledump mc-3-big-Data.db
[
  {
    "partition" : {
      "key" : [ "1" ],
      "position" : 0
    },
    "rows" : [
      {
        "type" : "row",
        "position" : 27,
        "liveness_info" : { "tstamp" : "2018-11-06T07:56:27.035600Z" },
        "cells" : [
          { "name" : "t2", "value" : "t2" }
        ]
      }
    ]
  }
]

Как видите, у нас есть только данные для столбца t2, и нет упоминаний о столбцах t3 & t1, которые были бы нулевыми.

...