У меня есть такая таблица:
+----------------+--------------+------+-----------------------+-----------------------+
|cid |email_address |status|last_updated_dt |created_at |
+----------------+--------------+------------------------------+-----------------------+
|12345-ddd-ddd |abc@test.com |New |2019-12-17 22:52:02.289|2019-11-17 22:52:02.9 |
+----------------+--------------+------+-----------------------+-----------------------+
|12345-eft-xdrg |abc@test.com |New |2019-12-17 22:52:02.289|2019-10-17 22:52:02.395|
+----------------+--------------+------+-----------------------+-----------------------+
|23456-efes-xeg |bdc@test.com |New |2019-12-17 22:52:02.289|2019-12-10 22:52:02.210|
+----------------+--------------+------+-----------------------+-----------------------+
Ключ разделения - 'email_address', ключ кластеризации - 'create_at'.
Мне нужно извлечь все записи для электронной почты abc@test.com и обновить «status» на «Inactive» и «last_updated_dt» до текущей метки времени, и вот мой код:
val emails=List("abc@test.com")
sc.cassandraTable(my_keyspace, my_table)
.select("email_address", "last_updated_dt", "status")
.where("email_address IN ?", emails)
.map(r => (r.getString("email_address"), Instant.now().toString(), "Inactive"))
.saveToCassandra(my_keyspace, my_table)
, и я получил эту ошибку: key not found: cid
сделать я Нужно ли включать все столбцы в '.select()'
, чтобы обновить некоторые столбцы? это правильный способ обновить существующий столбец для строк в Кассандре?
Я использую Spark, Cassandra, Spark-Cassandra-Connector, scala.
Редактировать: выяснить, как это сделать:
val selectedRows = sc.cassandraTable(my_keyspace, my_table)
.select("email_address", "last_updated_dt", "status")
.where("email_address IN ?", emails)
val transformedRows = selectedRows.map{item =>
val email=item.getString("email_address")
val lastUpdatedDt=Instant.now().toString()
val updatedStatus="Inactive"
(email, lastUpdatedDt, updatedStatus)
}
transformedRows.saveToCassandra(my_keyspace, my_table, SomeColumns("email_address", "last_updated_dt", "status"))