Как переместить столбцы из одного пространства ключей в другое в Кассандре - PullRequest
2 голосов
/ 26 марта 2019

Могу ли я переместить некоторые столбцы (cc_payment, keyid), как указано ниже, из пространства клавиш Cassandra billing в другое пространство клавиш Cassandra payments?payment_info будет новым столом.

Есть ли способ, которым я могу двигаться?Или мне нужно скопировать файл csv и импортировать с опциями COPY FROM?Поскольку данные огромны, я ищу варианты для непосредственного перемещения из одного пространства ключей в другое.Мы используем dasastax cassandra.

Благодарим Вас за помощь.

    FROM
========

keyspace:  billing
create table if not exists billing_info (
      user_id text,
      billing_id timeuuid,
      cc_payment frozen<cc_payment>,
      keyid text;
      PRIMARY KEY((user_id), billing_id)
) WITH CLUSTERING ORDER BY (billing_id DESC);

    TO
======
keyspace:  payments
create table if not exists payment_info (
      user_id text,
      payment_id timeuuid,
      cc_payment frozen<cc_payment>,
      keyid text;
      PRIMARY KEY((user_id), payment_id)
) WITH CLUSTERING ORDER BY (payment_id DESC);

Ответы [ 2 ]

2 голосов
/ 26 марта 2019

Есть несколько способов сделать это:

Скопировать файлы напрямую и затем изменить структуру таблицы

Поскольку таблицы отличаются только одним именем столбца, это может быть многобыстрее копировать файлы, как показано ниже:

  • Создать таблицу payments.payment_info с точно той же структурой, что и billing.billing_info
  • прекратить запись в billing.billing_info

Затем на каждом узле кластера выполните следующие действия:

  • выполните его очистку: nodetool flush billing billing_info
  • перейдите в каталог данных Cassandra
  • под тем же пользователем, который запускает Cassandra копирование файлов billing/billing_info-<ID_of_the_table>/* в payments/payment_info-<ID_of_the_table>/
  • выполнение nodetool refresh payment.payment_info`
  • проверка в cqlshэти данные доступны
  • выполнить переименование столбца с помощью: ALTER TABLE payments.payment_info RENAME billing_id TO payment_id;

Перенос данных путем копирования, используя, например, DSBulk или Spark.

Если вы используете DSE, то вы можете использовать DSBulk (лучше взять последнюю версию) для выгрузки данных изодин стол и загрузить в другой.Эта команда может работать без создания промежуточной копии, записывая данные в стандартный вывод и считывая их из стандартного ввода через канал Unix, хотя в этом случае она будет медленнее, поскольку не может достичь необходимого параллелизма.

В простейшем случае он будет вызываться следующим образом, обеспечивая соответствие между измененными именами полей (подробности см. В документации:

dsbulk unload -k ks1 -t table1 -c json | dsbulk load -k ks2 -t table2 -c json -m "mapping_to_accomodate_changes_in_field_names"

Но задача будет более сложной, если вынужно копировать не только данные, но и другие вещи, такие как TTL и WriteTime - в этом случае вам необходимо явно экспортировать их, а затем загрузить данные в несколько пакетов, для каждого столбца в отдельности.

1 голос
/ 27 марта 2019

Spark, вы можете использовать этот маленький фрагмент. Вы можете делать то, что вам нужно, в updateColumns

val myKeyspace = "oldkeyspace" 
val myTable = "oldtable"
val newKeyspace = "newkeyspace" 
val newTable = "newtabl"

def updateColumns(row: CassandraRow): CassandraRow = { 
     val inputMap = row.toMap val newData = Map( "newColumn" -> "somevalue" ) 
     var outputMap = inputMap ++ newData CassandraRow.fromMap(outputMap) 
}

val result = sc.cassandraTable(myKeyspace, myTable) .map(updateColumns(_)) 
  .saveToCassandra(newKeyspace, newTable)
...