Вот мой ответ:
// Write to cassandra
CassandraSink.addSink(metricPredictions)
.setClusterBuilder(new ClusterBuilder() {
@Override
protected Cluster buildCluster(Cluster.Builder b) {
return b.addContactPoint((String) CASSANDRA_MAP.get(PropertiesEnum.HOST.getKey()))
.withPort((int) CASSANDRA_MAP.get(PropertiesEnum.PORT.getKey()))
.withAuthProvider(new PlainTextAuthProvider((String) CASSANDRA_MAP.get(PropertiesEnum.USERNAME.getKey()),
(String) CASSANDRA_MAP.get(PropertiesEnum.PASSWORD.getKey())))
.withReconnectionPolicy(new ConstantReconnectionPolicy((Integer) CASSANDRA_MAP.get(PropertiesEnum.RECONNECT_DELAY_IN_MS.getKey())))
.build();
}
})
.setMapperOptions(() -> new Mapper.Option[]{Mapper.Option.saveNullFields(true), Mapper.Option.ttl(60)})
.build()
.setParallelism((int) CASSANDRA_MAP.get(PropertiesEnum.PARALLELISM.getKey()));
Надеюсь, это поможет:)
Кстати, TTL в секундах.
С уважением, Али