Я думаю, у вас неправильное представление о том, что такое BoxedUnit
, и поэтому настаиваете на использовании интерфейса Scala в Java, который слишком сложен из-за количества скрытой сложности в Scala, которая подвергается воздействию Java. scala.Function1<scala.collection.Iterator<T>, scala.runtime.BoxedUnit>
является реализацией (Iterator[T]) => Unit
- функции Scala, которая принимает Iterator[T]
и возвращает тип Unit
. Unit
в Scala эквивалентно Java void
. BoxedUnit
является коробочной версией Unit
- это объект кучи, содержащий единичное значение единицы в его элементе UNIT
, и это деталь реализации, которая почти никогда не появляется в программах Scala. Если набор данных является DataFrame
, то T
будет org.apache.spark.sql.Row
, и вам нужно обработать Scala итераторов для коллекций Row
объектов.
Чтобы определить что-то, что scala.Function1<scala.collection.Iterator<Row>, scala.runtime.BoxedUnit>
в Java, вам нужно создать экземпляр AbstractFunction1<scala.collection.Iterator<Row>, scala.runtime.BoxedUnit>
и переопределить его apply()
метод, где вы должны вернуть BoxedUnit.UNIT
. Вам также нужно сделать его сериализуемым, поэтому вы обычно объявляете свой собственный класс, который наследуется от AbstractFunction1
и реализует Serializable
. Вы также можете Java -фокусировать его, предоставив другой, более Java -одружественный абстрактный метод, который будет переопределен позже:
import org.apache.spark.sql.Row;
import scala.runtime.AbstractFunction1;
import scala.runtime.BoxedUnit;
import scala.collection.JavaConverters;
import java.util.Iterator;
class MyPartitionFunction<T> extends AbstractFunction1<scala.collection.Iterator<T>, BoxedUnit>
implements Serializable {
@Override
public BoxedUnit apply(scala.collection.Iterator<T> iterator) {
call(JavaConverters.asJavaIteratorConverter(iterator).asJava());
return BoxedUnit.UNIT;
}
public abstract void call(Iterator<T> iterator);
}
df.foreachPartition(new MyPartitionFunction<Row>() {
@Override
public void call(Iterator<Row> iterator) {
for (Row row : iterator) {
// do something with the row
}
}
});
Это довольно сложная реализация, поэтому существует Java -specifi c версия, которая принимает ForeachPartitionFunction<T>
взамен и приведенный выше код становится:
import org.apache.spark.sql.Row;
import org.apache.spark.api.java.function.ForeachPartitionFunction;
import java.util.Iterator;
df.foreachPartition(new ForeachPartitionFunction<Row>() {
public void call(Iterator<Row> iterator) throws Exception {
for (Row row : iterator) {
// do something with the row
}
}
}
Функциональность точно такая же , что и предоставленный интерфейсом Scala, просто Apache Spark выполняет преобразование итератора для вас, а также предоставляет дружественный класс Java, который не требует от вас импорта и реализации типов Scala.
Тем не менее, я думаю, у вас есть небольшое недопонимание того, как работает Spark. Вам не нужно использовать foreachPartition
для обработки потоковых данных в пакетном режиме. Это делается автоматически для вас потоковым движком Spark. Вы пишете потоковые запросы, которые задают преобразований и агрегатов , которые затем постепенно применяются по мере поступления новых данных из потока.
foreachPartition
- это форма foreach
зарезервировано для некоторых особых случаев пакетной обработки, например, когда вам нужно выполнить дорогостоящие экземпляры объекта в функции обработки, и выполнение этого для каждой строки влечет за собой огромные накладные расходы. С foreachPartition
ваша функция обработки вызывается только один раз для каждого раздела, так что вы можете создать экземпляр дорогих объектов один раз, а затем выполнить итерацию по данным раздела. Это уменьшает время обработки, потому что вы выполняете дорогостоящие операции только один раз.
Но тогда вы даже не можете вызвать foreach()
или foreachPartition()
для потокового источника, так как это приведет к AnalysisException
. Вместо этого вы должны использовать foreach()
или foreachBatch()
методы DataStreamWriter
. DataStreamWriter.foreach()
принимает экземпляр ForeachWriter
, а DataStreamWriter.foreachBatch()
принимает функцию void, которая получает набор данных и идентификатор партии. ForeachWriter
получает идентификатор эпохи в методе open()
. Опять же, foreachBatch()
поставляется в вариантах Scala и Java, которые эквивалентны по функциональности, поэтому, пожалуйста, используйте Java -specifi c, если вы собираетесь писать в Java.