Как отфильтровать набор данных по значениям даты и времени в Spark - PullRequest
11 голосов
/ 08 мая 2019

Я пытаюсь отфильтровать данные в соответствии с полем даты и времени. Образец из моих данных:

303,0.00001747,4351040,75.9054,"2019-03-08 19:29:18"

Вот как я инициализирую искру:

    SparkConf conf = new SparkConf().setAppName("app name").setMaster("spark://192.168.1.124:7077");
    JavaSparkContext sc = JavaSparkContext.fromSparkContext(SparkContext.getOrCreate(conf));

Во-первых, я прочитал данные выше в свой пользовательский объект, как показано ниже:

    // Read data from file into custom object
    JavaRDD<CurrencyPair> rdd = sc.textFile(System.getProperty("user.dir") + "/data/data.csv", 2).map(
        new Function<String, CurrencyPair>() {
            public CurrencyPair call(String line) throws Exception {
                String[] fields = line.split(","); // Split line from commas

                // read each data into custom object
                CurrencyPair cp = new CurrencyPair();
                cp.setId(Integer.parseInt(fields[0].trim()));
                cp.setValue(Double.parseDouble(fields[1].trim()));
                cp.setBaseVolume(Double.parseDouble(fields[2].trim()));
                cp.setQuoteVolume(Double.parseDouble(fields[3].trim()));
                cp.setTimeStamp(new Date(fields[4].trim()));

                System.out.println("Date:" + fields[4].trim()); // To see if it will print or not

                return cp;
            }
        }
     );

Чтобы получить данные, метка времени которых превышает определенное время, я написал этот фильтр:

    Calendar calendar = Calendar.getInstance();
    calendar.add(Calendar.DAY_OF_MONTH, -10); // This is for test issue

    // My filter to get the data for a certain time range
    Function<CurrencyPair, Boolean> filter = new Function<CurrencyPair, Boolean>() {
        @Override
        public Boolean call(CurrencyPair currencyPair) throws Exception {
            if(calendar.getTime().compareTo(currencyPair.getTimeStamp()) > 0){
                return false;
            }else{
                return true;
            }
        }
    };

Вот так выглядит мой пользовательский объект:

public class CurrencyPair implements java.io.Serializable {

    private int id;
    private double value;
    private double baseVolume;
    private double quoteVolume;
    private Date timeStamp;

    // all getters and setters are here, but no constructor
}

Чтобы проверить результаты моего фильтра, я попытался просмотреть некоторые из них (первые 100 здесь):

Iterator<CurrencyPair> result = rdd.repartition(100).filter(filter).toLocalIterator();
int counter = 0;
while (counter < 100 && result.hasNext()){
    System.out.println("Here: " + result.next());
    counter++;
}

Но проблема в том, что когда я запускаю свой код, я получаю следующее исключение в строке, где я записываю первые 100 результатов (Здесь: System.out.println ("Здесь:" + result.next ());)

Ошибка:

19/05/12 00:05:47 INFO TaskSetManager: Lost task 0.3 in stage 0.0 (TID 6) on 192.168.1.124, executor 0: java.lang.ClassCastException (cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance of org.apache.spark.rdd.MapPartitionsRDD

В моем фильтре я также записываю строку datetime в консоль с помощью System.out.println, но я не могу также увидеть результат в консоли. Что я делаю неправильно? Как мне этого добиться?


Редактировать: Я заметил, что на самом деле я скачал версию spark 2.3.0, но в своем файле maven я использовал 2.4.2. Поэтому я изменил файл maven на версию 2.3.0.

На этот раз я получаю следующую ошибку:

19/05/14 00:35:35 INFO BlockManager: BlockManager stopped
19/05/14 00:35:35 INFO BlockManagerMaster: BlockManagerMaster stopped
19/05/14 00:35:35 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
19/05/14 00:35:36 ERROR SparkContext: Error initializing SparkContext.
java.lang.IllegalArgumentException: requirement failed: Can only call getServletHandlers on a running MetricsSystem
at scala.Predef$.require(Predef.scala:281)
at org.apache.spark.metrics.MetricsSystem.getServletHandlers(MetricsSystem.scala:91)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:516)
at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2520)
at org.apache.spark.SparkContext.getOrCreate(SparkContext.scala)
at spark.BasicSpark.readDataFile(BasicSpark.java:107)
at spark.BasicSpark.getWholeData(BasicSpark.java:39)
at controller.TableScreenController$2.handle(TableScreenController.java:66)
at controller.TableScreenController$2.handle(TableScreenController.java:62)
at com.sun.javafx.event.CompositeEventHandler.dispatchBubblingEvent(CompositeEventHandler.java:86)
at com.sun.javafx.event.EventHandlerManager.dispatchBubblingEvent(EventHandlerManager.java:238)
at com.sun.javafx.event.EventHandlerManager.dispatchBubblingEvent(EventHandlerManager.java:191)
at com.sun.javafx.event.CompositeEventDispatcher.dispatchBubblingEvent(CompositeEventDispatcher.java:59)
at com.sun.javafx.event.BasicEventDispatcher.dispatchEvent(BasicEventDispatcher.java:58)
at com.sun.javafx.event.EventDispatchChainImpl.dispatchEvent(EventDispatchChainImpl.java:114)
at com.sun.javafx.event.BasicEventDispatcher.dispatchEvent(BasicEventDispatcher.java:56)
at com.sun.javafx.event.EventDispatchChainImpl.dispatchEvent(EventDispatchChainImpl.java:114)
at com.sun.javafx.event.BasicEventDispatcher.dispatchEvent(BasicEventDispatcher.java:56)
at com.sun.javafx.event.EventDispatchChainImpl.dispatchEvent(EventDispatchChainImpl.java:114)
at com.sun.javafx.event.EventUtil.fireEventImpl(EventUtil.java:74)
at com.sun.javafx.event.EventUtil.fireEvent(EventUtil.java:49)
at javafx.event.Event.fireEvent(Event.java:198)
at javafx.scene.Node.fireEvent(Node.java:8411)
at javafx.scene.control.Button.fire(Button.java:185)
at com.sun.javafx.scene.control.behavior.ButtonBehavior.mouseReleased(ButtonBehavior.java:182)
at com.sun.javafx.scene.control.skin.BehaviorSkinBase$1.handle(BehaviorSkinBase.java:96)
at com.sun.javafx.scene.control.skin.BehaviorSkinBase$1.handle(BehaviorSkinBase.java:89)
at com.sun.javafx.event.CompositeEventHandler$NormalEventHandlerRecord.handleBubblingEvent(CompositeEventHandler.java:218)
at com.sun.javafx.event.CompositeEventHandler.dispatchBubblingEvent(CompositeEventHandler.java:80)
at com.sun.javafx.event.EventHandlerManager.dispatchBubblingEvent(EventHandlerManager.java:238)
at com.sun.javafx.event.EventHandlerManager.dispatchBubblingEvent(EventHandlerManager.java:191)
at com.sun.javafx.event.CompositeEventDispatcher.dispatchBubblingEvent(CompositeEventDispatcher.java:59)
at com.sun.javafx.event.BasicEventDispatcher.dispatchEvent(BasicEventDispatcher.java:58)
at com.sun.javafx.event.EventDispatchChainImpl.dispatchEvent(EventDispatchChainImpl.java:114)
at com.sun.javafx.event.BasicEventDispatcher.dispatchEvent(BasicEventDispatcher.java:56)
at com.sun.javafx.event.EventDispatchChainImpl.dispatchEvent(EventDispatchChainImpl.java:114)
at com.sun.javafx.event.BasicEventDispatcher.dispatchEvent(BasicEventDispatcher.java:56)
at com.sun.javafx.event.EventDispatchChainImpl.dispatchEvent(EventDispatchChainImpl.java:114)
at com.sun.javafx.event.EventUtil.fireEventImpl(EventUtil.java:74)
at com.sun.javafx.event.EventUtil.fireEvent(EventUtil.java:54)
at javafx.event.Event.fireEvent(Event.java:198)
at javafx.scene.Scene$MouseHandler.process(Scene.java:3757)
at javafx.scene.Scene$MouseHandler.access$1500(Scene.java:3485)
at javafx.scene.Scene.impl_processMouseEvent(Scene.java:1762)
at javafx.scene.Scene$ScenePeerListener.mouseEvent(Scene.java:2494)
at com.sun.javafx.tk.quantum.GlassViewEventHandler$MouseEventNotification.run(GlassViewEventHandler.java:394)
at com.sun.javafx.tk.quantum.GlassViewEventHandler$MouseEventNotification.run(GlassViewEventHandler.java:295)
at java.security.AccessController.doPrivileged(Native Method)
at com.sun.javafx.tk.quantum.GlassViewEventHandler.lambda$handleMouseEvent$350(GlassViewEventHandler.java:432)
at com.sun.javafx.tk.quantum.QuantumToolkit.runWithoutRenderLock(QuantumToolkit.java:389)
at com.sun.javafx.tk.quantum.GlassViewEventHandler.handleMouseEvent(GlassViewEventHandler.java:431)
at com.sun.glass.ui.View.handleMouseEvent(View.java:555)
at com.sun.glass.ui.View.notifyMouse(View.java:937)
at com.sun.glass.ui.gtk.GtkApplication._runLoop(Native Method)
at com.sun.glass.ui.gtk.GtkApplication.lambda$null$208(GtkApplication.java:245)
at java.lang.Thread.run(Thread.java:748)
19/05/14 00:35:36 INFO SparkContext: SparkContext already stopped.
19/05/14 00:35:36 INFO SparkContext: Successfully stopped SparkContext
java.lang.IllegalArgumentException: requirement failed: Can only call getServletHandlers on a running MetricsSystem
at scala.Predef$.require(Predef.scala:281)
at org.apache.spark.metrics.MetricsSystem.getServletHandlers(MetricsSystem.scala:91)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:516)
at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2520)
at org.apache.spark.SparkContext.getOrCreate(SparkContext.scala)
at spark.BasicSpark.readDataFile(BasicSpark.java:107)
at spark.BasicSpark.getWholeData(BasicSpark.java:39)
at controller.TableScreenController$2.handle(TableScreenController.java:66)
at controller.TableScreenController$2.handle(TableScreenController.java:62)
at com.sun.javafx.event.CompositeEventHandler.dispatchBubblingEvent(CompositeEventHandler.java:86)
at com.sun.javafx.event.EventHandlerManager.dispatchBubblingEvent(EventHandlerManager.java:238)
at com.sun.javafx.event.EventHandlerManager.dispatchBubblingEvent(EventHandlerManager.java:191)
at com.sun.javafx.event.CompositeEventDispatcher.dispatchBubblingEvent(CompositeEventDispatcher.java:59)
at com.sun.javafx.event.BasicEventDispatcher.dispatchEvent(BasicEventDispatcher.java:58)
at com.sun.javafx.event.EventDispatchChainImpl.dispatchEvent(EventDispatchChainImpl.java:114)
at com.sun.javafx.event.BasicEventDispatcher.dispatchEvent(BasicEventDispatcher.java:56)
at com.sun.javafx.event.EventDispatchChainImpl.dispatchEvent(EventDispatchChainImpl.java:114)
at com.sun.javafx.event.BasicEventDispatcher.dispatchEvent(BasicEventDispatcher.java:56)
at com.sun.javafx.event.EventDispatchChainImpl.dispatchEvent(EventDispatchChainImpl.java:114)
at com.sun.javafx.event.EventUtil.fireEventImpl(EventUtil.java:74)
at com.sun.javafx.event.EventUtil.fireEvent(EventUtil.java:49)
at javafx.event.Event.fireEvent(Event.java:198)
at javafx.scene.Node.fireEvent(Node.java:8411)
at javafx.scene.control.Button.fire(Button.java:185)
at com.sun.javafx.scene.control.behavior.ButtonBehavior.mouseReleased(ButtonBehavior.java:182)
at com.sun.javafx.scene.control.skin.BehaviorSkinBase$1.handle(BehaviorSkinBase.java:96)
at com.sun.javafx.scene.control.skin.BehaviorSkinBase$1.handle(BehaviorSkinBase.java:89)
at com.sun.javafx.event.CompositeEventHandler$NormalEventHandlerRecord.handleBubblingEvent(CompositeEventHandler.java:218)
at com.sun.javafx.event.CompositeEventHandler.dispatchBubblingEvent(CompositeEventHandler.java:80)
at com.sun.javafx.event.EventHandlerManager.dispatchBubblingEvent(EventHandlerManager.java:238)
at com.sun.javafx.event.EventHandlerManager.dispatchBubblingEvent(EventHandlerManager.java:191)
at com.sun.javafx.event.CompositeEventDispatcher.dispatchBubblingEvent(CompositeEventDispatcher.java:59)
at com.sun.javafx.event.BasicEventDispatcher.dispatchEvent(BasicEventDispatcher.java:58)
at com.sun.javafx.event.EventDispatchChainImpl.dispatchEvent(EventDispatchChainImpl.java:114)
at com.sun.javafx.event.BasicEventDispatcher.dispatchEvent(BasicEventDispatcher.java:56)
at com.sun.javafx.event.EventDispatchChainImpl.dispatchEvent(EventDispatchChainImpl.java:114)
at com.sun.javafx.event.BasicEventDispatcher.dispatchEvent(BasicEventDispatcher.java:56)
at com.sun.javafx.event.EventDispatchChainImpl.dispatchEvent(EventDispatchChainImpl.java:114)
at com.sun.javafx.event.EventUtil.fireEventImpl(EventUtil.java:74)
at com.sun.javafx.event.EventUtil.fireEvent(EventUtil.java:54)
at javafx.event.Event.fireEvent(Event.java:198)
at javafx.scene.Scene$MouseHandler.process(Scene.java:3757)
at javafx.scene.Scene$MouseHandler.access$1500(Scene.java:3485)
at javafx.scene.Scene.impl_processMouseEvent(Scene.java:1762)
at javafx.scene.Scene$ScenePeerListener.mouseEvent(Scene.java:2494)
at com.sun.javafx.tk.quantum.GlassViewEventHandler$MouseEventNotification.run(GlassViewEventHandler.java:394)
at com.sun.javafx.tk.quantum.GlassViewEventHandler$MouseEventNotification.run(GlassViewEventHandler.java:295)
at java.security.AccessController.doPrivileged(Native Method)
at com.sun.javafx.tk.quantum.GlassViewEventHandler.lambda$handleMouseEvent$350(GlassViewEventHandler.java:432)
at com.sun.javafx.tk.quantum.QuantumToolkit.runWithoutRenderLock(QuantumToolkit.java:389)
at com.sun.javafx.tk.quantum.GlassViewEventHandler.handleMouseEvent(GlassViewEventHandler.java:431)
at com.sun.glass.ui.View.handleMouseEvent(View.java:555)
at com.sun.glass.ui.View.notifyMouse(View.java:937)
at com.sun.glass.ui.gtk.GtkApplication._runLoop(Native Method)
at com.sun.glass.ui.gtk.GtkApplication.lambda$null$208(GtkApplication.java:245)
at java.lang.Thread.run(Thread.java:748)

Я получаю эту ошибку в следующей строке кода, где я инициализирую контекст spark:

JavaSparkContext sc = JavaSparkContext.fromSparkContext(SparkContext.getOrCreate(conf));

Редактировать 2: Это хорошо работает, когда я пишу локально, а не по собственному IP-адресу искры. Но мне нужно запустить это на моем собственном IP. Так что может быть не так с моим главным узлом?


Редактировать 3: я загрузил весь стек ошибок во фрагмент кода, который находится под первым редактором.

1 Ответ

2 голосов
/ 16 мая 2019

Это происходит, когда класс, который определяет лямбда-выражение, недоступен во время выполнения.Если вы пытаетесь запустить задание на удаленном кластере из локальной среды IDE, вам нужно добавить setJars

SparkConf conf = new SparkConf()
                .setAppName("app name")
                .setJars(new String[]{"/fatJarPath/jar.path"})
                .setMaster("spark://Remote_spark_ip:port");

В качестве альтернативы вы можете создать толстый файл jar и отправить работу, используя spark-submit.

...