Письмо искры Кассандре с переменным TTL - PullRequest
0 голосов
/ 02 июля 2018

В Java Spark у меня есть фрейм данных со столбцом bucket_timestamp, который представляет время сегмента, к которому относится строка.

Я хочу записать кадр данных в БД Cassandra. Данные должны быть записаны в БД с TTL. TTL должен зависеть от временной метки сегмента - где TTL каждой строки должен быть рассчитан как ROW_TTL = CONST_TTL - (CurrentTime - bucket_timestamp), где CONST_TTL - это постоянный TTL, который я настроил.

В настоящее время я пишу Кассандре с искрой, используя постоянный TTL, со следующим кодом:

df.write().format("org.apache.spark.sql.cassandra")
            .options(new HashMap<String, String>() {
                {
                    put("keyspace", "key_space_name");
                    put("table, "table_name");
                    put("spark.cassandra.output.ttl, Long.toString(CONST_TTL)); // Should be depended on bucket_timestamp column
                }
            }).mode(SaveMode.Overwrite).save();

Один из возможных способов, о которых я думал, - для каждого возможного bucket_timestamp - фильтровать данные в соответствии с временной меткой, вычислять TTL и записывать отфильтрованные данные в Cassandra. но это кажется очень неэффективным и не искорки. Есть ли способ в Java Spark предоставить столбец спарк в качестве параметра TTL, так что TTL будет отличаться для каждой строки?

Решение должно работать с Java и набором данных : я столкнулся с некоторыми решениями для выполнения этого с RDD в scala, но не нашел решения для использования Java и dataframe.

Спасибо!

Ответы [ 2 ]

0 голосов
/ 02 июля 2018

Для API DataFrame такая функциональность не поддерживается, но ... Для него есть JIRA - https://datastax -oss.atlassian.net / browse / SPARKC-416 , вы можете посмотреть его получить уведомление, когда оно будет реализовано ...

Таким образом, у вас есть только один выбор - использовать RDD API, как описано в ответе @ bartosz25 ...

0 голосов
/ 02 июля 2018

Из опций разъема Spark-Cassandra (https://github.com/datastax/spark-cassandra-connector/blob/v2.3.0/spark-cassandra-connector/src/main/java/com/datastax/spark/connector/japi/RDDAndDStreamCommonJavaFunctions.java) вы можете установить TTL как:

  • постоянное значение (withConstantTTL)
  • автоматически разрешенное значение (withAutoTTL)
  • значение на основе столбца (withPerRowTTL)

В вашем случае вы можете попробовать последний вариант и вычислить TTL как новый столбец начального Dataset с правилом, которое вы указали в вопросе.

Для случая использования вы можете увидеть тест здесь: https://github.com/datastax/spark-cassandra-connector/blob/master/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/writer/TableWriterSpec.scala#L612

...