Как загрузить CSV пару строк на пару строк - PullRequest
0 голосов
/ 08 апреля 2020

Я подключаю Spark к Cassandra, и мне удалось напечатать строки моего CSV, используя обычный метод COPY. Однако, если CSV был очень большим, как это обычно бывает в больших данных, как можно загрузить пару строк файла CSV на пару строк, чтобы избежать проблем, связанных с зависанием и т. Д. c. ?

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import com.datastax.spark.connector._

object SparkCassandra {

  def main(args: Array[String]): Unit = {

      val conf = new SparkConf().setAppName("SparkCassandra").setMaster("local").set("spark.cassandra.connection.host", "localhost")
      val sc = new SparkContext(conf)
      val my_rdd = sc.cassandraTable("my_keyspace", "my_csv")
      my_rdd.take(20).foreach(println)
      sc.stop()
  }
}

Следует ли использовать переменную времени или что-то в этом роде?

1 Ответ

0 голосов
/ 08 апреля 2020

Если вы хотите просто загрузить данные в Cassandra или выгрузить данные из Cassandra с помощью командной строки, я бы порекомендовал посмотреть на DataStax Bulk Loader (DSBulk) - он сильно оптимизирован для загрузки данных в / из Кассандры / DSE. Он работает как с открытым исходным кодом Cassandra и DSE.

В простейшем случае загрузка и выгрузка из таблицы будет выглядеть (формат по умолчанию - CSV):

dsbulk load -k keyspace -t table -url my_file.csv
dsbulk unload -k keyspace -t table -url my_file.csv

В более сложных случаях может потребоваться предоставить дополнительные параметры. Вы можете найти больше информации в следующих сериях постов в блоге .

Если вы хотите сделать это с помощью Spark, то я рекомендую использовать Dataframe API вместо RDD. В этом случае вы просто будете использовать стандартные функции read & write.

для экспорта данных из Cassandra в CSV:

import org.apache.spark.sql.cassandra._
val data = spark.read.cassandraFormat("tbl", "ks").load()
data.write.format("csv").save("my_file.csv")

или для чтения из CSV и сохранения в Cassandra:

import org.apache.spark.sql.cassandra._
import org.apache.spark.sql.SaveMode
val data = spark.read.format("csv").save("my_file.csv")
data.cassandraFormat("tbl", "ks").mode(SaveMode.Append).save()
...