Наличие данных, организованных в ежедневных разделах, не очень хороший дизайн - в этом случае только радиочастотные узлы будут активны в течение дня записи данных, а затем во время генерации отчета.
Поскольку вы будете получать доступ к этим данным только из Spark, вы можете использовать следующий подход - иметь некоторое поле сегмента в качестве ключа раздела, например, с равномерно сгенерированным случайным числом, и метку времени в качестве столбца кластеризации, и, возможно, другой uuid
столбец для гарантии уникальности записей, примерно так:
create table test.sdtest (
b int,
ts timestamp,
uid uuid,
v1 int,
primary key(b, ts, uid));
Где максимальное значение для генерации b
должно быть выбрано так, чтобы иметь не слишком большие и не очень маленькие разделы, чтобы мы могли эффективно их прочитать.
И тогда мы можем запустить код Spark следующим образом:
import org.apache.spark.sql.cassandra._
val data = spark.read.cassandraFormat("sdtest", "test").load()
val filtered = data.filter("ts >= cast('2019-03-10T00:00:00+0000' as timestamp) AND ts < cast('2019-03-11T00:00:00+0000' as timestamp)")
Хитрость в том, что мы распределяем данные по узлам с помощью случайного ключа разделения, поэтому все узлы будут обрабатывать нагрузку во время записи данных и во время генерации отчета.
Если мы посмотрим на физический план для этого кода Spark (отформатированный для удобства чтения):
== Physical Plan ==
*Scan org.apache.spark.sql.cassandra.CassandraSourceRelation [b#23,ts#24,v1#25]
PushedFilters: [*GreaterThanOrEqual(ts,2019-03-10 00:00:00.0),
*LessThan(ts,2019-03-11 00:00:00.0)], ReadSchema: struct<b:int,ts:timestamp,v1:int>
Мы видим, что оба условия будут перенесены в DSE на уровне CQL - это означает, что Spark не загрузит все данные в память и не отфильтрует их, а вместо этого вся фильтрация произойдет в Cassandra, и только необходимые данные будут быть возвращенным назад. И поскольку мы распределяем запросы между несколькими узлами, чтение может быть быстрее (необходимо проверить), чем чтение одного гигантского раздела. Еще одним преимуществом этого дизайна является то, что будет легко выполнить удаление старых данных с помощью Spark, примерно так:
val toDel = sc.cassandraTable("test", "sdtest").where("ts < '2019-08-10T00:00:00+0000'")
toDel.deleteFromCassandra("test", "sdtest", keyColumns = SomeColumns("b", "ts"))
В этом случае Spark выполнит очень эффективное удаление диапазона / строки, что приведет к уменьшению количества надгробий.
P.S. Рекомендуется использовать версию DSE коннектора Spark, так как он может иметь больше оптимизаций.
P.P.S. теоретически мы можем объединить ts
и uid
в один столбец timeuuid
, но я не уверен, что он будет работать с Dataframes.