Создание пользовательской (не временной) функции в Spark-SQL для блоков данных Azure - PullRequest
1 голос
/ 10 июля 2019

Может быть, это глупо, я являюсь разработчиком Microsoft SQL / C # и никогда раньше не использовал никакие другие IDE / написанные JAVA / SCALA.Я переношу некоторые SQL-запросы Azure в решение Azure Databricks.

Похоже, что нет эквивалента функции TSQL DATEDIFF_BIG (https://docs.microsoft.com/en-us/sql/t-sql/functions/datediff-transact-sql?view=sql-server-2017)

Решения, которые вы найдете, - запрограммируйте свой собственный UDF.

То, что я сделал (см. Ниже) в записной книжке SCALA - отлично работает для временной функции. (https://docs.databricks.com/spark/latest/spark-sql/language-manual/create-function.html)

Это был самый полезный образец, который я нашел https://github.com/johnmuller87/spark-udf.

Существует множество примеров временных функций, но я не нашел постоянных функций для разработчиков, не являющихся JAVA / SCALA.

Я установил SBT (Последняя версия для Windows - https://www.scala-sbt.org/1.x/docs/Install-sbt-on-Windows.html) Я также установил Intellj

Я запускаю SBT BUILT для образца IBAN, но не могу получитьФункция SQL, после загрузки JAR в мой Clusterd и регистрации функции для работы.

CREATE FUNCTION ValidateIBAN AS 'com.ing.wbaa.spark.udf.ValidateIBAN' USING JAR 'spark_udf_assembly_0_2_0' --without extension

SELECT ValidateIBAN('NL20INGB0001234567')

Ошибка всегда была "Ошибка в выражении SQL: AnalysisException: Нет обработчика для UDF / UDAF / UDTF 'com.ing.wbaa.spark.udf.ValidateIBAN ';строка 1 поз 7 "

//import org.apache.spark.sql.types._                         // include the Spark Types to define our schema
import org.apache.spark.sql.types.LongType
import org.apache.spark.sql.functions.udf
import java.time.temporal.ChronoUnit;

// Define function to calculate local time offset
def getTimestampDifference(interval: java.lang.String, date1: java.sql.Timestamp, date2: java.sql.Timestamp) : java.lang.Long = {

  //https://docs.oracle.com/javase/8/docs/api/java/sql/Timestamp.html
  //https://spark.apache.org/docs/2.4.0/sql-reference.html
  //https://alvinalexander.com/scala/how-to-use-scala-match-expression-like-switch-case-statement

  interval match
  {
    case "NANOSECOND"=> return ChronoUnit.NANOS.between(date1.toInstant(), date2.toInstant());
    case "MICROSECOND"=> return ChronoUnit.MICROS.between(date1.toInstant(), date2.toInstant());
    case "MILLISECOND"=> return ChronoUnit.MILLIS.between(date1.toInstant(), date2.toInstant()); // date2.getTime() - date1.getTime();
    case "SECOND"=> return ChronoUnit.SECONDS.between(date1.toInstant(), date2.toInstant());
    case "MINUTE"=> return ChronoUnit.MINUTES.between(date1.toInstant(), date2.toInstant());
    case "HOUR"=> return ChronoUnit.HOURS.between(date1.toInstant(), date2.toInstant());
    case "DAY"=> return ChronoUnit.DAYS.between(date1.toInstant(), date2.toInstant());
    case "WEEK"=> return ChronoUnit.WEEKS.between(date1.toInstant(), date2.toInstant());
    case "MONTH"=> return ChronoUnit.MONTHS.between(date1.toInstant(), date2.toInstant());
    case "YEAR"=> return ChronoUnit.YEARS.between(date1.toInstant(), date2.toInstant());
  }
}

spark.udf.register("DATETIMEDIFF", udf(getTimestampDifference(_:java.lang.String, _:java.sql.Timestamp,_:java.sql.Timestamp),LongType))

На самом деле мне нужно - как преобразовать ноутбук SCALA в функцию SQL, чтобы я мог использовать его в постоянном представлении SQL в кластере баз данных Azure версии 5.4 (включая Apache)Spark 2.4.3, Scala 2.11)

  • Какой класс для реализации
  • Какой метод реализовать (переопределить в c #) - есть также разные статьи о HIVE или SPARK
  • Как настроить встроенный SBT или любой другой способ скомпилировать его в Java-архиве, чтобы я мог успешно создавать и запускать функцию SQL (только в SQL, а не в коде Pyhton или в Scala-коде - в блокноте SQL)

Спасибо за помощь

Ответы [ 2 ]

1 голос
/ 10 июля 2019

Spark не предлагает никаких постоянных возможностей, рассчитанных на более чем один сеанс искры ( Блоки данных - создание постоянных пользовательских функций (UDF) или время жизни кластера в жаргоне Databricks).Если вам нужны длительные спарк-сессии (только часть SQL), вы можете рассмотреть возможность добавления этих UDF в Hive и вызывать их из Spark.В противном случае (подумайте о переходных кластерах) вам нужно будет добавлять его каждый раз, когда вы запускаете кластер.

Код вашего UDF неоптимальный: нет обработки пустых / нулевых значений / он выдастисключение

Для базового (стандартного) искрового UDF см. https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-udfs.html, реальные интерфейсы не требуются (в отличие от Hive)

Относительно: Функция SQL (только в SQL) / SBT:

Если вам это действительно нужно (для этого простого варианта использования), https://github.com/geoHeil/sparkSimpleProjectTemplate.g8 может быть примером для вас.

Но для этого кода дополнительных зависимостей не требуется.Этого должно быть достаточно, чтобы создать текстовый файл / файл Scala, содержащий <100 строк кода, необходимых для вашей функции.Этот файл (Notebook?) Может затем вызываться при создании кластера с использованием API, то есть через <a href="https://docs.databricks.com/user-guide/dev-tools/databricks-cli.html" rel="nofollow noreferrer">https://docs.databricks.com/user-guide/dev-tools/databricks-cli.html и некоторых сценариев, поэтому он ведет себя как постоянный.

Более того: всегда рассматривайте возможность использования искровой нативной (оптимизированный катализатор)) функции. DATEDIFF в SPARK SQl обычный datediff может уже сделать многое из того, что нужно вашему большому datediff, а также вычесть столбцы типа простой метки времени.Если я правильно понимаю из-за того, что я кратко взглянул на него, отсутствует только форматирование вывода до желаемой степени детализации (т. Е. Будет предоставлено из коробки из функции t-SQL) и может быть достигнуто путем вложения его с различными функциями, такими как:

  • год
  • день
  • неделя
  • или ручное деление возвращенной разницы
0 голосов
/ 10 июля 2019

Оператор CREATE FUNCTION в ссылочных блоках данных, на которые вы ссылаетесь, на самом деле является командой Hive, а не Spark, и он ожидает, что класс UDF будет UDF Hive.

Это также является причиной ошибки «Нет обработчика для UDF / UDAF / UDTF». Приведенный вами пример реализует Spark UDF , в то время как вам нужно реализовать Hive UDF .

Чтобы создать Hive UDF, вам нужно реализовать класс, который расширяет класс org.apache.hadoop.hive.ql.exec.UDF и реализует функцию под названиемvalu. В вашем случае весь класс должен выглядеть так:

class GetTimestampDifference extends UDF {

  def evaluate(interval: java.lang.String, date1: java.sql.Timestamp, date2: java.sql.Timestamp) : java.lang.Long = {

  //https://docs.oracle.com/javase/8/docs/api/java/sql/Timestamp.html
  //https://spark.apache.org/docs/2.4.0/sql-reference.html
  //https://alvinalexander.com/scala/how-to-use-scala-match-expression-like-switch-case-statement

  interval match
  {
    case "NANOSECOND"=> return ChronoUnit.NANOS.between(date1.toInstant(), date2.toInstant());
    case "MICROSECOND"=> return ChronoUnit.MICROS.between(date1.toInstant(), date2.toInstant());
    case "MILLISECOND"=> return ChronoUnit.MILLIS.between(date1.toInstant(), date2.toInstant()); // date2.getTime() - date1.getTime();
    case "SECOND"=> return ChronoUnit.SECONDS.between(date1.toInstant(), date2.toInstant());
    case "MINUTE"=> return ChronoUnit.MINUTES.between(date1.toInstant(), date2.toInstant());
    case "HOUR"=> return ChronoUnit.HOURS.between(date1.toInstant(), date2.toInstant());
    case "DAY"=> return ChronoUnit.DAYS.between(date1.toInstant(), date2.toInstant());
    case "WEEK"=> return ChronoUnit.WEEKS.between(date1.toInstant(), date2.toInstant());
    case "MONTH"=> return ChronoUnit.MONTHS.between(date1.toInstant(), date2.toInstant());
    case "YEAR"=> return ChronoUnit.YEARS.between(date1.toInstant(), date2.toInstant());
  }
}

}

Затем вам нужно скомпилировать его в файл JAR, скопировать его куда-нибудь в файловую систему блоков данных и создать постоянную функцию, используя ту же команду, что вы делали ранее (при условии, что вы сохраняете пространство имен примера IBAN):

CREATE FUNCTION GetTimestampDifference AS 'com.ing.wbaa.spark.udf.GetTimestampDifference' USING JAR '[path to your jar in dbfs]'

SELECT GetTimestampDifference ("MILLISECOND",cast("2019-07-08 16:07:03.246" as timestamp), cast("2019-07-08 16:07:03.248" as timestamp))

Предполагая, что вы все еще изменяете пример проекта IBAN, с которого вы начали, для создания файла jar вам нужно будет добавить следующую зависимость пакета в файл build.sbt:

"org.apache.spark" %% "spark-hive" % "2.4.3"
...