Сохранение Dataframe в цикле Foreachpartition на Cassandra - PullRequest
0 голосов
/ 04 февраля 2019

Я ищу, чтобы сохранить весь Dataframe в цикле foreachpartition на Cassandra.

Я знаю, что могу получить Cassandra Connector в пределах foreachpartition и выполнить операторы CRUD на cassandra, используя следующий код:

val conf: SparkConf = new SparkConf(true)
.set("spark.cassandra.connection.host", "IP")
.set("spark.cassandra.auth.username", "username")
.set("spark.cassandra.auth.password", "pwd")
val cdbConnector = CassandraConnector(conf)

cdbConnector.withSessionDo(session =>
session.execute(//Insert statement)
)

Но меня больше интересует хранение всего Dataframe в Cassandraтаблица за один раз в пределах foreachpartition.

Кроме того, когда я выполняю приведенную ниже инструкцию для создания фрейма данных и записываю его в Cassandra в пределах foreachpartition, мое приложение зависает, и все потоки находятся в состоянии ожидания.

df.write.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "***", "keyspace" -> "***")).save()

1 Ответ

0 голосов
/ 04 февраля 2019

Вы должны иметь возможность звонить с использованием Cassandra API напрямую, вы можете попробовать и посмотреть, имеет ли это значение

    <dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector_2.11</artifactId>
        <version>${spark-cassandra-connector.version}</version>
    </dependency>
    <dependency>
        <groupId>com.datastax.cassandra</groupId>
        <artifactId>cassandra-driver-core</artifactId>
        <version>${cassandra-driver-core.version}</version>
    </dependency>

, и вы сможете сохранять данные, и вам не нужноявно вызвать ForEachPartition

 .saveToCassandra("schema", "tableName")
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...