Spark SQL - кодеры для кортежа, содержащие список или массив в качестве элемента - PullRequest
0 голосов
/ 02 мая 2018

Использование Spark 2.2 + Java 1.8

У меня есть два пользовательских типа данных "Foo" и "Bar". Каждый из них реализует serializable. У 'Foo' есть отношение один ко многим с 'Bar', поэтому их отношения представлены в виде кортежа:

Tuple2<Foo, List<Bar>>

Как правило, когда у меня есть отношение 1: 1, я могу кодировать свои собственные типы следующим образом:

Encoder<Tuple2<Foo,Bar>> fooBarEncoder = Encoders.tuple(Encoders.bean(Foo.class),Encoders.bean(Bar.class));

и затем используйте для кодирования моего набора данных

Dataset<Tuple2<Foo,Bar>> fooBarSet = getSomeData().as(fooBarEncoder);

Но у меня возникают проблемы с поиском способа кодирования для сценария, когда у меня есть список (или массив) в качестве элемента Tuple2. Я хотел бы предоставить кодировщик для второго элемента, например:

Encoder<Tuple2<Foo,List<Bar>>> fooBarEncoder = Encoders.tuple(Encoders.bean(Foo.class), List<Bar>.class);

и затем закодировать в мой набор данных:

Dataset<Tuple2<Foo,List<Bar>>> fooBarSet = getSomeData().as(fooBarEncoder)

Но, очевидно, я не могу вызвать .class для параметризованного типа, такого как List

Я знаю, что для строковых и примитивных типов массивы поддерживаются искровыми следствиями, например ::1010 *

sparkSession.implicits().newStringArrayEncoder()

Но как бы я создал кодировщик для List или Array пользовательского типа класса?

Ответы [ 2 ]

0 голосов
/ 16 мая 2018

Я не знаю, возможно ли это. Я попробовал следующую Scala, пытаясь помочь, подумав, что я могу создать кодировщик, сначала научив спекуляции, как кодировать X, затем List [X] и, наконец, кортеж, содержащий List [X] (не показан ниже):

import org.apache.spark.sql.Encoders
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import scala.beans.BeanProperty

class X(@BeanProperty var field: String) extends Serializable
case class Z(field: String)

implicit val XEncoder1 = Encoders.bean(classOf[X])

implicit val ZEncoder = Encoders.product[Z]

val listXEncoder = ExpressionEncoder[List[X]] // doesn't work
val listZEncoder = ExpressionEncoder[List[Z]]

listZEncoder отлично работает

Переключение на использование

implicit val XEncoder2 = org.apache.spark.sql.Encoders.kryo[X]

Все еще не работает для listXEncoder

Ошибка заканчивается в месте катализатора ScalaReflection, которое за мной.

0 голосов
/ 16 мая 2018

Я не уверен, насколько хорошо этот метод может быть реализован в вашей настройке, но здесь идет. Создайте класс-оболочку для вашего списка и попробуйте.

public class BarList implements Serializable {
    List<Bar> list;

    public List<Bar> getList() {
        return list;
    }
    public void setList(List<Bar> l) {
        list = l;
    }
}
...