спарк-кассандра-коннектор scala обновление колонок - PullRequest
0 голосов
/ 08 января 2020

У меня есть такая таблица:

+----------------+--------------+------+-----------------------+-----------------------+
|cid             |email_address |status|last_updated_dt        |created_at             |    
+----------------+--------------+------------------------------+-----------------------+
|12345-ddd-ddd   |abc@test.com  |New   |2019-12-17 22:52:02.289|2019-11-17 22:52:02.9  |
+----------------+--------------+------+-----------------------+-----------------------+
|12345-eft-xdrg  |abc@test.com  |New   |2019-12-17 22:52:02.289|2019-10-17 22:52:02.395|
+----------------+--------------+------+-----------------------+-----------------------+
|23456-efes-xeg  |bdc@test.com  |New   |2019-12-17 22:52:02.289|2019-12-10 22:52:02.210|
+----------------+--------------+------+-----------------------+-----------------------+

Ключ разделения - 'email_address', ключ кластеризации - 'create_at'.

Мне нужно извлечь все записи для электронной почты abc@test.com и обновить «status» на «Inactive» и «last_updated_dt» до текущей метки времени, и вот мой код:

val emails=List("abc@test.com")
sc.cassandraTable(my_keyspace, my_table)
  .select("email_address", "last_updated_dt", "status")
  .where("email_address IN ?", emails)
  .map(r => (r.getString("email_address"), Instant.now().toString(), "Inactive"))
  .saveToCassandra(my_keyspace, my_table)

, и я получил эту ошибку: key not found: cid

сделать я Нужно ли включать все столбцы в '.select()', чтобы обновить некоторые столбцы? это правильный способ обновить существующий столбец для строк в Кассандре?

Я использую Spark, Cassandra, Spark-Cassandra-Connector, scala.

Редактировать: выяснить, как это сделать:

val selectedRows = sc.cassandraTable(my_keyspace, my_table)
  .select("email_address", "last_updated_dt", "status")
  .where("email_address IN ?", emails)

val transformedRows = selectedRows.map{item => 
  val email=item.getString("email_address")
  val lastUpdatedDt=Instant.now().toString()
  val updatedStatus="Inactive"
  (email, lastUpdatedDt, updatedStatus)
  }
transformedRows.saveToCassandra(my_keyspace, my_table, SomeColumns("email_address", "last_updated_dt", "status"))
...