Пустой набор после collectAsList, даже если он не пустой внутри оператора преобразования - PullRequest
0 голосов
/ 23 февраля 2019

Я пытаюсь выяснить, могу ли я работать с Kotlin и Spark, и использовать классы данных первого вместо классов дел Scala.

У меня есть следующий класс данных:

data class Transaction(var context: String = "", var epoch: Long = -1L, var items: HashSet<String> = HashSet()) :
    Serializable {
    companion object {
        @JvmStatic
        private val serialVersionUID = 1L
    }
}

И соответствующая часть основной процедуры выглядит следующим образом:

val transactionEncoder = Encoders.bean(Transaction::class.java)
val transactions = inputDataset
    .groupByKey(KeyExtractor(), KeyExtractor.getKeyEncoder())
    .mapGroups(TransactionCreator(), transactionEncoder)
    .collectAsList()

transactions.forEach { println("collected Transaction=$it") }

с TransactionCreator, определенным как:

class TransactionCreator : MapGroupsFunction<Tuple2<String, Timestamp>, Row, Transaction> {
    companion object {
        @JvmStatic
        private val serialVersionUID = 1L
    }

    override fun call(key: Tuple2<String, Timestamp>, values: MutableIterator<Row>): Transaction {
        val seq = generateSequence { if (values.hasNext()) values.next().getString(2) else null }
        val items = seq.toCollection(HashSet())
        return Transaction(key._1, key._2.time, items).also { println("inside call Transaction=$it") }
    }
}

Однако я думаю, что сталкиваюсь с какой-топроблемы сериализации, потому что набор заканчивается пустым после сбора.Я вижу следующий вывод:

inside call Transaction=Transaction(context=context1, epoch=1000, items=[c])
inside call Transaction=Transaction(context=context1, epoch=0, items=[a, b])
collected Transaction=Transaction(context=context1, epoch=0, items=[])
collected Transaction=Transaction(context=context1, epoch=1000, items=[])

Я попробовал пользовательский KryoRegistrator, чтобы посмотреть, была ли это проблема с HashSet Котлина:

class MyRegistrator : KryoRegistrator {
    override fun registerClasses(kryo: Kryo) {
        kryo.register(HashSet::class.java, JavaSerializer()) // kotlin's HashSet
    }
}

Но это не таккажется, чтобы помочь.Любые другие идеи?

Полный код здесь .

1 Ответ

0 голосов
/ 24 февраля 2019

Кажется, это проблема сериализации.Документация Encoders.bean состояний (Spark v2.4.0):

типы коллекций: только массив и java.util.List в настоящее время, поддержка карты в процессе

Перенос класса данных Transaction на Java и изменение items на java.util.List, похоже, помогут.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...