Как я могу удалить строку в Cassandra, используя Apache Flink? - PullRequest
0 голосов
/ 09 ноября 2019

В Apache Flink с помощью CassandraSink легко вставить строку в Cassandra. Но я не смог найти способ удалить строку.

Я также пытался написать собственный приемник, но получил NotSerializableException. Как я могу построить код для операции удаления?

public class MyCassandraSink implements SinkFunction<String> {

    private Cluster cluster = Cluster.builder()
            .addContactPoint("127.0.0.1")
            .build();

    private Session cassandra = cluster.connect("mykeyspace");

    @Override
    public void invoke(String value, Context context) throws Exception {
        cassandra.execute("SOME DELETE QUERY");
    }
}
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: [com.datastax.driver.core.SessionManager@3b0fe47a] is not serializable. The object probably contains or references non serializable fields.
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574)
    at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)
    at org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1227)
    at com.meshkan.streaming.entry.EventListener.main(EventListener.java:42)
Caused by: java.io.NotSerializableException: com.datastax.driver.core.SessionManager
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at java.util.concurrent.CopyOnWriteArrayList.writeObject(CopyOnWriteArrayList.java:973)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:586)
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:133)
    ... 9 more

1 Ответ

0 голосов
/ 10 ноября 2019

Чтобы реализовать собственную логику insert-vs-delete, создайте приемник, расширяющий CassandraSinkBase, и реализуйте метод send(). Смотрите AbstractCassandraTupleSink как пример этого. Обратите внимание, как CassandraSinkBase позволяет избежать проблем сериализации с клиентом Cassandra, сделав его временным и создав его в вызове open().

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...