Время ожидания Spark JDBC-соединения с SQL Server истекает - PullRequest
0 голосов
/ 12 июня 2018

Я использую Spark v2.2.1 через sparklyr v0.6.2 и извлекаю данные из SQL Server через jdbc.Кажется, у меня возникают проблемы с сетью, потому что много раз (не каждый раз) мой исполнитель, выполняющий запись в SQL Server, завершается ошибкой:

Prelogin error: host <my server> port 1433 Error reading prelogin response: Connection timed out (Read failed) ClientConnectionId:...

Я запускаю сеанс sparklyr сследующие конфигурации:

spark_conf = spark_config()
spark_conf$spark.executor.cores <- 8
spark_conf$`sparklyr.shell.driver-memory` <- "8G"
spark_conf$`sparklyr.shell.executor-memory` <- "12G"
spark_conf$spark.serializer <- "org.apache.spark.serializer.KryoSerializer"
spark_conf$spark.network.timeout <- 400

Но интересно, что установленное выше время ожидания сети, похоже, не применяется на основе журналов исполнителя:

18/06/11 17:53:44 INFO BlockManager: Found block rdd_9_16 locally
18/06/11 17:53:45 WARN SQLServerConnection: ConnectionID:3 ClientConnectionId: d3568a9f-049f-4772-83d4-ed65b907fc8b Prelogin error: host nciensql14.nciwin.local port 1433 Error reading prelogin response: Connection timed out (Read failed) ClientConnectionId:d3568a9f-049f-4772-83d4-ed65b907fc8b
18/06/11 17:53:45 WARN SQLServerConnection: ConnectionID:2 ClientConnectionId: ecb084e6-99a8-49d1-9215-491324e8d133 Prelogin error: host nciensql14.nciwin.local port 1433 Error reading prelogin response: Connection timed out (Read failed) ClientConnectionId:ecb084e6-99a8-49d1-9215-491324e8d133
18/06/11 17:53:45 ERROR Executor: Exception in task 10.0 in stage 26.0 (TID 77)

Может кто-нибудь помочь мне понять, что такоеошибка prelogin есть и как этого избежать?Вот моя функция записи:

function (df, tbl, db, server = NULL, user, pass, mode = "error", 
    options = list(), ...) 
{
    sparklyr::spark_write_jdbc(
  df, 
  tbl, 
  options = c(
    list(url = paste0("jdbc:sqlserver://", server, ".nciwin.local;", 
         "databaseName=", db, ";", 
         "user=", user, ";", 
         "password=", pass, ";"), 
       driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"), 
    options), 
  mode = mode, ...)
}

Я только что обновил свой драйвер jdbc до версии 6.0, но я не думаю, что это что-то изменило.Я надеюсь, что я установил его правильно.Я просто поместил его в папку Spark/jars, а затем добавил в Spark/conf/spark-defaults.conf.

РЕДАКТИРОВАТЬ Я читаю 23M строк в 24 разделах в Spark.Мой кластер имеет 4 узла с 8 ядрами и памятью 18G.С моими текущими конфигурациями у меня есть 4 исполнителя с 8 ядрами в каждом и 12G на исполнителя.Моя функция для чтения данных выглядит так:

function (sc, tbl, db, server = NULL, user, pass, repartition = 0, options = list(), ...) 
{
    sparklyr::spark_read_jdbc(
      sc, 
      tbl, 
      options = c(
        list(url = paste0("jdbc:sqlserver://", server, ".nciwin.local;"), 
             user = user, 
             password = pass, 
             databaseName = db, 
             dbtable = tbl, 
             driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"), 
        options), 
      repartition = repartition, ...)
}

Я установил repartition в 24 во время работы.Поэтому я не вижу связи с предложенным сообщением.

РЕДАКТИРОВАТЬ 2
Мне удалось исправить мою проблему, избавившись от перераспределения.Кто-нибудь может объяснить, почему перераспределение с помощью sparklyr неэффективно в этом случае?

1 Ответ

0 голосов
/ 13 июня 2018

Как объяснено в другом вопросе , а также в некоторых других публикациях ( Что означает параметры partitionColumn, lowerBound, upperBound, numPartitions? , Преобразование таблицы mysql в sparkнабор данных очень медленный по сравнению с тем же из CSV-файла , Разделение в искре при чтении из RDBMS через JDBC , синхронное чтение данных из mysql параллельно ) и ресурсов вне сайта ( Распараллеливание чтения ), по умолчанию источник Spark JDBC считывает все данные последовательно в один узел.

Существует два способа распараллеливания чтения:

  • Разделение диапазона на основе числового столбца с необходимыми lowerBound, upperBound, partitionColumn и numPartitions options, где partitionColumn - стабильный числовой столбец ( псевдоколонки могут быть не лучшим выбором )

    spark_read_jdbc(
      ...,
      options = list(
        ...
        lowerBound = "0",                 # Adjust to fit your data 
        upperBound = "5000",              # Adjust to fit your data 
        numPartitions = "42",             # Adjust to fit your data and resources
        partitionColumn = "some_numeric_column"
      )
     )
    
  • predicates список - на данный момент не поддерживается в sparklyr.

Перераспределение (sparklyr::sdf_repartition нене решить проблему, потому что это происходит после данныхбыл загружен.Поскольку shuffle (требуется для repartition) относится к самым дорогим операциям в Spark, он может легко вызвать сбой узла.

В результате, используя:

  • repartitionпараметр spark_read_jdbc:

  • sdf_repartition

- всего лишь практика культа грузов, и большую часть времени приносит больше вреда, чем пользы,Если данные достаточно малы для передачи по одному узлу, то увеличение количества разделов обычно снижает производительность.В противном случае он просто рухнет.

При этом - если данные уже обрабатываются одним узлом, возникает вопрос, имеет ли смысл вообще использовать Apache Spark.Ответ будет зависеть от остальной части вашего конвейера, но, учитывая только рассматриваемый компонент, он, вероятно, будет отрицательным.

...