Я пытаюсь отфильтровать данные в соответствии с полем даты и времени. Образец из моих данных:
303,0.00001747,4351040,75.9054,"2019-03-08 19:29:18"
Вот как я инициализирую искру:
SparkConf conf = new SparkConf().setAppName("app name").setMaster("spark://192.168.1.124:7077");
JavaSparkContext sc = JavaSparkContext.fromSparkContext(SparkContext.getOrCreate(conf));
Во-первых, я прочитал данные выше в свой пользовательский объект, как показано ниже:
// Read data from file into custom object
JavaRDD<CurrencyPair> rdd = sc.textFile(System.getProperty("user.dir") + "/data/data.csv", 2).map(
new Function<String, CurrencyPair>() {
public CurrencyPair call(String line) throws Exception {
String[] fields = line.split(","); // Split line from commas
// read each data into custom object
CurrencyPair cp = new CurrencyPair();
cp.setId(Integer.parseInt(fields[0].trim()));
cp.setValue(Double.parseDouble(fields[1].trim()));
cp.setBaseVolume(Double.parseDouble(fields[2].trim()));
cp.setQuoteVolume(Double.parseDouble(fields[3].trim()));
cp.setTimeStamp(new Date(fields[4].trim()));
System.out.println("Date:" + fields[4].trim()); // To see if it will print or not
return cp;
}
}
);
Чтобы получить данные, метка времени которых превышает определенное время, я написал этот фильтр:
Calendar calendar = Calendar.getInstance();
calendar.add(Calendar.DAY_OF_MONTH, -10); // This is for test issue
// My filter to get the data for a certain time range
Function<CurrencyPair, Boolean> filter = new Function<CurrencyPair, Boolean>() {
@Override
public Boolean call(CurrencyPair currencyPair) throws Exception {
if(calendar.getTime().compareTo(currencyPair.getTimeStamp()) > 0){
return false;
}else{
return true;
}
}
};
Вот так выглядит мой пользовательский объект:
public class CurrencyPair implements java.io.Serializable {
private int id;
private double value;
private double baseVolume;
private double quoteVolume;
private Date timeStamp;
// all getters and setters are here, but no constructor
}
Чтобы проверить результаты моего фильтра, я попытался просмотреть некоторые из них (первые 100 здесь):
Iterator<CurrencyPair> result = rdd.repartition(100).filter(filter).toLocalIterator();
int counter = 0;
while (counter < 100 && result.hasNext()){
System.out.println("Here: " + result.next());
counter++;
}
Но проблема в том, что когда я запускаю свой код, я получаю следующее исключение в строке, где я записываю первые 100 результатов (Здесь: System.out.println ("Здесь:" + result.next ());)
Ошибка:
19/05/12 00:05:47 INFO TaskSetManager: Lost task 0.3 in stage 0.0 (TID 6) on 192.168.1.124, executor 0: java.lang.ClassCastException (cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance of org.apache.spark.rdd.MapPartitionsRDD
В моем фильтре я также записываю строку datetime в консоль с помощью System.out.println, но я не могу также увидеть результат в консоли. Что я делаю неправильно? Как мне этого добиться?
Редактировать: Я заметил, что на самом деле я скачал версию spark 2.3.0, но в своем файле maven я использовал 2.4.2. Поэтому я изменил файл maven на версию 2.3.0.
На этот раз я получаю следующую ошибку:
19/05/14 00:35:35 INFO BlockManager: BlockManager stopped
19/05/14 00:35:35 INFO BlockManagerMaster: BlockManagerMaster stopped
19/05/14 00:35:35 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
19/05/14 00:35:36 ERROR SparkContext: Error initializing SparkContext.
java.lang.IllegalArgumentException: requirement failed: Can only call getServletHandlers on a running MetricsSystem
at scala.Predef$.require(Predef.scala:281)
at org.apache.spark.metrics.MetricsSystem.getServletHandlers(MetricsSystem.scala:91)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:516)
at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2520)
at org.apache.spark.SparkContext.getOrCreate(SparkContext.scala)
at spark.BasicSpark.readDataFile(BasicSpark.java:107)
at spark.BasicSpark.getWholeData(BasicSpark.java:39)
at controller.TableScreenController$2.handle(TableScreenController.java:66)
at controller.TableScreenController$2.handle(TableScreenController.java:62)
at com.sun.javafx.event.CompositeEventHandler.dispatchBubblingEvent(CompositeEventHandler.java:86)
at com.sun.javafx.event.EventHandlerManager.dispatchBubblingEvent(EventHandlerManager.java:238)
at com.sun.javafx.event.EventHandlerManager.dispatchBubblingEvent(EventHandlerManager.java:191)
at com.sun.javafx.event.CompositeEventDispatcher.dispatchBubblingEvent(CompositeEventDispatcher.java:59)
at com.sun.javafx.event.BasicEventDispatcher.dispatchEvent(BasicEventDispatcher.java:58)
at com.sun.javafx.event.EventDispatchChainImpl.dispatchEvent(EventDispatchChainImpl.java:114)
at com.sun.javafx.event.BasicEventDispatcher.dispatchEvent(BasicEventDispatcher.java:56)
at com.sun.javafx.event.EventDispatchChainImpl.dispatchEvent(EventDispatchChainImpl.java:114)
at com.sun.javafx.event.BasicEventDispatcher.dispatchEvent(BasicEventDispatcher.java:56)
at com.sun.javafx.event.EventDispatchChainImpl.dispatchEvent(EventDispatchChainImpl.java:114)
at com.sun.javafx.event.EventUtil.fireEventImpl(EventUtil.java:74)
at com.sun.javafx.event.EventUtil.fireEvent(EventUtil.java:49)
at javafx.event.Event.fireEvent(Event.java:198)
at javafx.scene.Node.fireEvent(Node.java:8411)
at javafx.scene.control.Button.fire(Button.java:185)
at com.sun.javafx.scene.control.behavior.ButtonBehavior.mouseReleased(ButtonBehavior.java:182)
at com.sun.javafx.scene.control.skin.BehaviorSkinBase$1.handle(BehaviorSkinBase.java:96)
at com.sun.javafx.scene.control.skin.BehaviorSkinBase$1.handle(BehaviorSkinBase.java:89)
at com.sun.javafx.event.CompositeEventHandler$NormalEventHandlerRecord.handleBubblingEvent(CompositeEventHandler.java:218)
at com.sun.javafx.event.CompositeEventHandler.dispatchBubblingEvent(CompositeEventHandler.java:80)
at com.sun.javafx.event.EventHandlerManager.dispatchBubblingEvent(EventHandlerManager.java:238)
at com.sun.javafx.event.EventHandlerManager.dispatchBubblingEvent(EventHandlerManager.java:191)
at com.sun.javafx.event.CompositeEventDispatcher.dispatchBubblingEvent(CompositeEventDispatcher.java:59)
at com.sun.javafx.event.BasicEventDispatcher.dispatchEvent(BasicEventDispatcher.java:58)
at com.sun.javafx.event.EventDispatchChainImpl.dispatchEvent(EventDispatchChainImpl.java:114)
at com.sun.javafx.event.BasicEventDispatcher.dispatchEvent(BasicEventDispatcher.java:56)
at com.sun.javafx.event.EventDispatchChainImpl.dispatchEvent(EventDispatchChainImpl.java:114)
at com.sun.javafx.event.BasicEventDispatcher.dispatchEvent(BasicEventDispatcher.java:56)
at com.sun.javafx.event.EventDispatchChainImpl.dispatchEvent(EventDispatchChainImpl.java:114)
at com.sun.javafx.event.EventUtil.fireEventImpl(EventUtil.java:74)
at com.sun.javafx.event.EventUtil.fireEvent(EventUtil.java:54)
at javafx.event.Event.fireEvent(Event.java:198)
at javafx.scene.Scene$MouseHandler.process(Scene.java:3757)
at javafx.scene.Scene$MouseHandler.access$1500(Scene.java:3485)
at javafx.scene.Scene.impl_processMouseEvent(Scene.java:1762)
at javafx.scene.Scene$ScenePeerListener.mouseEvent(Scene.java:2494)
at com.sun.javafx.tk.quantum.GlassViewEventHandler$MouseEventNotification.run(GlassViewEventHandler.java:394)
at com.sun.javafx.tk.quantum.GlassViewEventHandler$MouseEventNotification.run(GlassViewEventHandler.java:295)
at java.security.AccessController.doPrivileged(Native Method)
at com.sun.javafx.tk.quantum.GlassViewEventHandler.lambda$handleMouseEvent$350(GlassViewEventHandler.java:432)
at com.sun.javafx.tk.quantum.QuantumToolkit.runWithoutRenderLock(QuantumToolkit.java:389)
at com.sun.javafx.tk.quantum.GlassViewEventHandler.handleMouseEvent(GlassViewEventHandler.java:431)
at com.sun.glass.ui.View.handleMouseEvent(View.java:555)
at com.sun.glass.ui.View.notifyMouse(View.java:937)
at com.sun.glass.ui.gtk.GtkApplication._runLoop(Native Method)
at com.sun.glass.ui.gtk.GtkApplication.lambda$null$208(GtkApplication.java:245)
at java.lang.Thread.run(Thread.java:748)
19/05/14 00:35:36 INFO SparkContext: SparkContext already stopped.
19/05/14 00:35:36 INFO SparkContext: Successfully stopped SparkContext
java.lang.IllegalArgumentException: requirement failed: Can only call getServletHandlers on a running MetricsSystem
at scala.Predef$.require(Predef.scala:281)
at org.apache.spark.metrics.MetricsSystem.getServletHandlers(MetricsSystem.scala:91)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:516)
at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2520)
at org.apache.spark.SparkContext.getOrCreate(SparkContext.scala)
at spark.BasicSpark.readDataFile(BasicSpark.java:107)
at spark.BasicSpark.getWholeData(BasicSpark.java:39)
at controller.TableScreenController$2.handle(TableScreenController.java:66)
at controller.TableScreenController$2.handle(TableScreenController.java:62)
at com.sun.javafx.event.CompositeEventHandler.dispatchBubblingEvent(CompositeEventHandler.java:86)
at com.sun.javafx.event.EventHandlerManager.dispatchBubblingEvent(EventHandlerManager.java:238)
at com.sun.javafx.event.EventHandlerManager.dispatchBubblingEvent(EventHandlerManager.java:191)
at com.sun.javafx.event.CompositeEventDispatcher.dispatchBubblingEvent(CompositeEventDispatcher.java:59)
at com.sun.javafx.event.BasicEventDispatcher.dispatchEvent(BasicEventDispatcher.java:58)
at com.sun.javafx.event.EventDispatchChainImpl.dispatchEvent(EventDispatchChainImpl.java:114)
at com.sun.javafx.event.BasicEventDispatcher.dispatchEvent(BasicEventDispatcher.java:56)
at com.sun.javafx.event.EventDispatchChainImpl.dispatchEvent(EventDispatchChainImpl.java:114)
at com.sun.javafx.event.BasicEventDispatcher.dispatchEvent(BasicEventDispatcher.java:56)
at com.sun.javafx.event.EventDispatchChainImpl.dispatchEvent(EventDispatchChainImpl.java:114)
at com.sun.javafx.event.EventUtil.fireEventImpl(EventUtil.java:74)
at com.sun.javafx.event.EventUtil.fireEvent(EventUtil.java:49)
at javafx.event.Event.fireEvent(Event.java:198)
at javafx.scene.Node.fireEvent(Node.java:8411)
at javafx.scene.control.Button.fire(Button.java:185)
at com.sun.javafx.scene.control.behavior.ButtonBehavior.mouseReleased(ButtonBehavior.java:182)
at com.sun.javafx.scene.control.skin.BehaviorSkinBase$1.handle(BehaviorSkinBase.java:96)
at com.sun.javafx.scene.control.skin.BehaviorSkinBase$1.handle(BehaviorSkinBase.java:89)
at com.sun.javafx.event.CompositeEventHandler$NormalEventHandlerRecord.handleBubblingEvent(CompositeEventHandler.java:218)
at com.sun.javafx.event.CompositeEventHandler.dispatchBubblingEvent(CompositeEventHandler.java:80)
at com.sun.javafx.event.EventHandlerManager.dispatchBubblingEvent(EventHandlerManager.java:238)
at com.sun.javafx.event.EventHandlerManager.dispatchBubblingEvent(EventHandlerManager.java:191)
at com.sun.javafx.event.CompositeEventDispatcher.dispatchBubblingEvent(CompositeEventDispatcher.java:59)
at com.sun.javafx.event.BasicEventDispatcher.dispatchEvent(BasicEventDispatcher.java:58)
at com.sun.javafx.event.EventDispatchChainImpl.dispatchEvent(EventDispatchChainImpl.java:114)
at com.sun.javafx.event.BasicEventDispatcher.dispatchEvent(BasicEventDispatcher.java:56)
at com.sun.javafx.event.EventDispatchChainImpl.dispatchEvent(EventDispatchChainImpl.java:114)
at com.sun.javafx.event.BasicEventDispatcher.dispatchEvent(BasicEventDispatcher.java:56)
at com.sun.javafx.event.EventDispatchChainImpl.dispatchEvent(EventDispatchChainImpl.java:114)
at com.sun.javafx.event.EventUtil.fireEventImpl(EventUtil.java:74)
at com.sun.javafx.event.EventUtil.fireEvent(EventUtil.java:54)
at javafx.event.Event.fireEvent(Event.java:198)
at javafx.scene.Scene$MouseHandler.process(Scene.java:3757)
at javafx.scene.Scene$MouseHandler.access$1500(Scene.java:3485)
at javafx.scene.Scene.impl_processMouseEvent(Scene.java:1762)
at javafx.scene.Scene$ScenePeerListener.mouseEvent(Scene.java:2494)
at com.sun.javafx.tk.quantum.GlassViewEventHandler$MouseEventNotification.run(GlassViewEventHandler.java:394)
at com.sun.javafx.tk.quantum.GlassViewEventHandler$MouseEventNotification.run(GlassViewEventHandler.java:295)
at java.security.AccessController.doPrivileged(Native Method)
at com.sun.javafx.tk.quantum.GlassViewEventHandler.lambda$handleMouseEvent$350(GlassViewEventHandler.java:432)
at com.sun.javafx.tk.quantum.QuantumToolkit.runWithoutRenderLock(QuantumToolkit.java:389)
at com.sun.javafx.tk.quantum.GlassViewEventHandler.handleMouseEvent(GlassViewEventHandler.java:431)
at com.sun.glass.ui.View.handleMouseEvent(View.java:555)
at com.sun.glass.ui.View.notifyMouse(View.java:937)
at com.sun.glass.ui.gtk.GtkApplication._runLoop(Native Method)
at com.sun.glass.ui.gtk.GtkApplication.lambda$null$208(GtkApplication.java:245)
at java.lang.Thread.run(Thread.java:748)
Я получаю эту ошибку в следующей строке кода, где я инициализирую контекст spark:
JavaSparkContext sc = JavaSparkContext.fromSparkContext(SparkContext.getOrCreate(conf));
Редактировать 2: Это хорошо работает, когда я пишу локально, а не по собственному IP-адресу искры. Но мне нужно запустить это на моем собственном IP. Так что может быть не так с моим главным узлом?
Редактировать 3: я загрузил весь стек ошибок во фрагмент кода, который находится под первым редактором.