Я пытаюсь выяснить, могу ли я работать с Kotlin и Spark, и использовать классы данных первого вместо классов дел Scala.
У меня есть следующий класс данных:
data class Transaction(var context: String = "", var epoch: Long = -1L, var items: HashSet<String> = HashSet()) :
Serializable {
companion object {
@JvmStatic
private val serialVersionUID = 1L
}
}
И соответствующая часть основной процедуры выглядит следующим образом:
val transactionEncoder = Encoders.bean(Transaction::class.java)
val transactions = inputDataset
.groupByKey(KeyExtractor(), KeyExtractor.getKeyEncoder())
.mapGroups(TransactionCreator(), transactionEncoder)
.collectAsList()
transactions.forEach { println("collected Transaction=$it") }
с TransactionCreator
, определенным как:
class TransactionCreator : MapGroupsFunction<Tuple2<String, Timestamp>, Row, Transaction> {
companion object {
@JvmStatic
private val serialVersionUID = 1L
}
override fun call(key: Tuple2<String, Timestamp>, values: MutableIterator<Row>): Transaction {
val seq = generateSequence { if (values.hasNext()) values.next().getString(2) else null }
val items = seq.toCollection(HashSet())
return Transaction(key._1, key._2.time, items).also { println("inside call Transaction=$it") }
}
}
Однако я думаю, что сталкиваюсь с какой-топроблемы сериализации, потому что набор заканчивается пустым после сбора.Я вижу следующий вывод:
inside call Transaction=Transaction(context=context1, epoch=1000, items=[c])
inside call Transaction=Transaction(context=context1, epoch=0, items=[a, b])
collected Transaction=Transaction(context=context1, epoch=0, items=[])
collected Transaction=Transaction(context=context1, epoch=1000, items=[])
Я попробовал пользовательский KryoRegistrator
, чтобы посмотреть, была ли это проблема с HashSet
Котлина:
class MyRegistrator : KryoRegistrator {
override fun registerClasses(kryo: Kryo) {
kryo.register(HashSet::class.java, JavaSerializer()) // kotlin's HashSet
}
}
Но это не таккажется, чтобы помочь.Любые другие идеи?
Полный код здесь .