запись данных в общую таблицу mysql - PullRequest
0 голосов
/ 07 апреля 2020

У меня есть задача periodi c spark- scala, которая предназначена для передачи данных из Hive в MySQL.

Структуру таблицы можно просто рассматривать как:

+------+------+
| id   | name |
+------+------+

Тогда из-за того, что таблица улья слишком велика, я должен поделиться MySQL таблицами.

Итак, вот мое текущее решение:

  1. Подготовка MySQL таблиц:
mysql> show tables;
+-------------------+
| Tables_in_test_db |
+-------------------+
| shared_0          |
| shared_1          |
| shared_2          |
| shared_3          |
| shared_4          |
| shared_5          |
+-------------------+
Загрузка данных из Hive и выполнение некоторых операций преобразования, а затем создание нужного кадра данных
val data = List((0, "a"), (11, "b"), (22, "c"), (33, "d"), (44, "e"))
val total = spark.sparkContext.parallelize(data)
  .toDF("id", "name")
  .withColumn("hashCode", hash($"id")%5)
Сохранение данных в MySQL таблице в соответствии с hashCode столбец
(0 to 5).foreach(hashCode => {
  val df = total.where($"hashCode" === hashCode).select("id", "name")
  df.write
    .mode(SaveMode.Append)
    .jdbc(jdbcUrl, s"shared_$hashCode", connectionProperties)
})

И это прекрасно работает, но я новичок ie в искре, поэтому я хотел бы знаете, есть ли лучший способ реализовать то, что я хочу ??


ОБНОВЛЕНИЕ:

Вот мой полный код:

val jdbcHostname = "localhost"
val jdbcPort = 3306
val jdbcDatabase = "test_db"
val jdbcUrl = s"jdbc:mysql://${jdbcHostname}:${jdbcPort}/${jdbcDatabase}"
val connectionProperties = new Properties()
connectionProperties.put("user", "user")
connectionProperties.put("password", "password")

val spark = SparkSession.builder().master("local[*]").appName("test").getOrCreate()
import spark.implicits._

val data = List((0, "a"), (11, "b"), (22, "c"), (33, "d"), (44, "e"))
val total = spark.sparkContext.parallelize(data)
  .toDF("id", "name")
  .withColumn("hashCode", hash($"id")%5)

(0 to 5).foreach(hashCode => {
  val df = total.where($"hashCode" === hashCode).select("id", "name")
  df.write
    .mode(SaveMode.Append)
    .jdbc(jdbcUrl, s"shared_$hashCode", connectionProperties)
})

1 Ответ

0 голосов
/ 07 апреля 2020

Поскольку я не вижу аспектов ошибок сериализации, вы просто делаете 5 последовательных вызовов DF через al oop, но foreach не привязан к базовым RDD.

Я могу сделать следующие замечания: улучшение .cache или .persist может быть улучшением. Кроме того, перераспределение по столбцам для общего значения df позволило бы ускорить доступ.

...