Как я могу передать сложную внешнюю переменную, скажем, значение карты в UDF из программы драйвера в Spark с Java? - PullRequest
0 голосов
/ 05 марта 2020

У меня была большая проблема, когда мне нужно было передать хэш-карту Java в UDF, которая была определена как отдельный класс, а не как некоторая встроенная лямбда-функция, которая может получить доступ к переменным окружения, определенным как переменные широковещания. Я начал этот вопрос здесь и для этой цели:

Как передать широковещательную переменную Spark в UDF в Java?

Не было получено удовлетворительного ответа поскольку люди только давали мне ответы, которые содержали простые UDF, которые можно определить как маленькие лямбды и, таким образом, получить доступ к переменным вещания из программы драйвера.

Тогда я начал исследовать типизированные списки, как я подробно описал в другом вопросе, и это мне показалось, что это путь вперед, но почти ничего в отношении документации для этого метода не существует в Java, хотя примеры и учебные пособия существуют для того же самого в Scala. Поэтому мой вопрос заключается в том, как передать значение комплексной переменной в UDF с помощью typedlit?

1 Ответ

0 голосов
/ 05 марта 2020

Я пришел к ответу на этот вопрос по длинному и сложному пути и выкладываю его здесь в качестве помощи любому, кто может столкнуться с такой же проблемой.

Официальные Spark Javadocs здесь дает определение метода typedLit следующим образом:

typedLit(T literal, scala.reflect.api.TypeTags.TypeTag<T> evidence$1)

Практически нигде не дано, как использовать этот метод в Java, и, наконец, я случайно наткнулся на этот вопрос:

Как получить TypeTag для класса в Java

Здесь мы получаем, как мы можем создать пользовательский Scala объект для нашего желаемого класса, который мы хотим отправить в UDF. Используя ответ, я создал свой пользовательский объект Scala для карты Scala:

import scala.reflect.runtime.universe._
import scala.collection.convert._
object TypeTags {
  val MapString = typeTag[scala.collection.Map[String, String]]
}

Чтобы использовать этот объект в своем проекте Java Maven, я следовал структуре, заданной этим blog:

https://dzone.com/articles/scala-in-java-maven-project

Зависимость, которую я должен был включить в свой pom, выглядит следующим образом:

<dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>2.11.7</version>
    </dependency>

Однако * Теги жизненного цикла 1080 *, которые присутствовали в поме, не компилировались для меня. Это был первый фрагмент pom:

<plugin>
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <executions>
            <execution>
                <id>scala-compile-first</id>
                <phase>process-resources</phase>
                <goals>
                    <goal>add-source</goal>
                    <goal>compile</goal>
                </goals>
            </execution>
            <execution>
                <id>scala-test-compile</id>
                <phase>process-test-resources</phase>
                <goals>
                    <goal>testCompile</goal>
                </goals>
            </execution>
        </executions>
    </plugin>

Затем я нашел этот вопрос, который содержит отдельный набор тегов жизненного цикла:

Мой смешанный проект Scala / Java Maven проект не ' t compile

Также я скачал смешанный проект Java / Scala по следующей ссылке:

https://github.com/danyaljj/sampleMixedScalaJavaMavenProject/blob/master/pom.xml

Помпа из этого проекта, наконец, сработала для меня, и я мог опередить проблему компиляции из-за тегов жизненного цикла. Новый фрагмент pom выглядит следующим образом:

      <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>3.2.2</version>
            <executions>
                <execution>
                    <id>compile</id>
                    <goals>
                        <goal>compile</goal>
                    </goals>
                    <phase>compile</phase>
                </execution>
                <execution>
                    <id>test-compile</id>
                    <goals>
                        <goal>testCompile</goal>
                    </goals>
                    <phase>test-compile</phase>
                </execution>
                <execution>
                    <phase>process-resources</phase>
                    <goals>
                        <goal>compile</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>

Было много ошибок компиляции, когда я пытался использовать TypeTag, который я определил в своем основном классе. Наконец, я использовал ответ на этот вопрос для своих целей:

Преобразование Java Карта в Scala Карта

Сначала мне пришлось вручную импортировать в свой main class объект Scala, который я определил в файле TypeTags. scala:

import com.esrx.dqm.datasync.TypeTags$;

Я определил фиктивную карту для отправки в мой UDF:

 Map<String, String> testMap = new HashMap<>();
 testMap.put("1", "One");

Затем я преобразовал хэш-карту в Scala карту:

List<Tuple2<String, String>> tuples = testMap.entrySet().stream()
            .map(e -> Tuple2.apply(e.getKey(), e.getValue()))
            .collect(Collectors.toList());

scala.collection.Map scalaMap = scala.collection.Map$.MODULE$.apply(JavaConversions.asScalaBuffer(tuples).toSeq());

Затем я отправил карту в свой UDF, который я определил ранее:

TypeTags$ type = TypeTags$.MODULE$;
data = data.withColumn("broadcast", functions.callUDF("TestUDF", functions.typedLit(scalaMap, type.MapString())));

Я не смог для отправки значения MapString val в UDF, поскольку компилятор всегда жаловался, что у него есть частный доступ в TypeDefs. По ссылке здесь я обнаружил, что в Java доступ к val осуществляется с помощью вызова метода, такого как getters, а не непосредственно самого val.

TestUDF I определил следующим образом:

public class TestUDF implements UDF1<scala.collection.immutable.Map<String, String>,String> {

@Override
public String call(scala.collection.immutable.Map<String, String> t1) throws Exception {
    // TODO Auto-generated method stub
    System.out.println(t1);
    AsJava<Map<String, String>> asJavaMap = JavaConverters.mapAsJavaMapConverter(t1);
    Map<String, String> javaMap = asJavaMap.asJava();
    System.out.println("Value of 1: " + javaMap.get("1"));      
    return null;
}

}

Наконец-то это сработало, и я смог получить доступ к карте из UDF.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...