У меня есть задача periodi c spark- scala, которая предназначена для передачи данных из Hive в MySQL.
Структуру таблицы можно просто рассматривать как:
+------+------+
| id | name |
+------+------+
Тогда из-за того, что таблица улья слишком велика, я должен поделиться MySQL таблицами.
Итак, вот мое текущее решение:
- Подготовка MySQL таблиц:
mysql> show tables;
+-------------------+
| Tables_in_test_db |
+-------------------+
| shared_0 |
| shared_1 |
| shared_2 |
| shared_3 |
| shared_4 |
| shared_5 |
+-------------------+
Загрузка данных из Hive и выполнение некоторых операций преобразования, а затем создание нужного кадра данных
val data = List((0, "a"), (11, "b"), (22, "c"), (33, "d"), (44, "e"))
val total = spark.sparkContext.parallelize(data)
.toDF("id", "name")
.withColumn("hashCode", hash($"id")%5)
Сохранение данных в MySQL таблице в соответствии с
hashCode
столбец
(0 to 5).foreach(hashCode => {
val df = total.where($"hashCode" === hashCode).select("id", "name")
df.write
.mode(SaveMode.Append)
.jdbc(jdbcUrl, s"shared_$hashCode", connectionProperties)
})
И это прекрасно работает, но я новичок ie в искре, поэтому я хотел бы знаете, есть ли лучший способ реализовать то, что я хочу ??
ОБНОВЛЕНИЕ:
Вот мой полный код:
val jdbcHostname = "localhost"
val jdbcPort = 3306
val jdbcDatabase = "test_db"
val jdbcUrl = s"jdbc:mysql://${jdbcHostname}:${jdbcPort}/${jdbcDatabase}"
val connectionProperties = new Properties()
connectionProperties.put("user", "user")
connectionProperties.put("password", "password")
val spark = SparkSession.builder().master("local[*]").appName("test").getOrCreate()
import spark.implicits._
val data = List((0, "a"), (11, "b"), (22, "c"), (33, "d"), (44, "e"))
val total = spark.sparkContext.parallelize(data)
.toDF("id", "name")
.withColumn("hashCode", hash($"id")%5)
(0 to 5).foreach(hashCode => {
val df = total.where($"hashCode" === hashCode).select("id", "name")
df.write
.mode(SaveMode.Append)
.jdbc(jdbcUrl, s"shared_$hashCode", connectionProperties)
})