ClassCastException при преобразовании RDD в DataFrame Spark Streaming - PullRequest
0 голосов
/ 28 марта 2019

Привет, ребята, у меня следующая проблема. Я использую Apache Spark Streaming v1.6.0 с Java, чтобы получать сообщения от IBM MQ. Я сделал свой собственный приемник для MQ, но проблема в том, что мне нужно преобразовать RDD из JavaDStream в DataFrame. Для этого я повторяю JavaDStream с foreachRDD, и я определил схему для DataFrame, но когда я запускаю задание, первые сообщения выводят следующее исключение:

java.lang.ClassCastException: org.apache.spark.rdd.BlockRDDPartition не может быть приведен к org.apache.spark.rdd.ParallelCollectionPartition в org.apache.spark.rdd.ParallelCollectionRDD.compute (ParallelCollectionRDD.scala: 102) в org.apache.spark.rdd.RDD.computeOrReadCheckpoint (RDD.scala: 306) в org.apache.spark.rdd.RDD.iterator (RDD.scala: 270) в org.apache.spark.scheduler.ResultTask.runTask (ResultTask.scala: 66) в org.apache.spark.scheduler.Task.run (Task.scala: 89) в org.apache.spark.executor.Executor $ TaskRunner.run (Executor.scala: 213) в java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1149) в java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:624) на java.lang.Thread.run (Thread.java:748) 19/03/28 12:53:26 ПРЕДУПРЕЖДЕНИЕ TaskSetManager: потерянная задача 0.0 на этапе 0.0 (TID 0, localhost): java.lang.ClassCastException: org.apache.spark.rdd.BlockRDDPartition нельзя привести к org.apache.spark .rdd.ParallelCollectionPartition в org.apache.spark.rdd.ParallelCollectionRDD.compute (ParallelCollectionRDD.scala: 102) в org.apache.spark.rdd.RDD.computeOrReadCheckpoint (RDD.scala: 306) в org.apache.spark.rdd.RDD.iterator (RDD.scala: 270) в org.apache.spark.scheduler.ResultTask.runTask (ResultTask.scala: 66) в org.apache.spark.scheduler.Task.run (Task.scala: 89) в org.apache.spark.executor.Executor $ TaskRunner.run (Executor.scala: 213) в java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1149) в java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:624) at java.lang.Thread.run (Thread.java:748)

Тогда код выполняется очень хорошо. Даже если у меня нет сообщений в MQ, это только первые сообщения, когда я запускаю работу.

Вот мой CustomMQReceiver

public CustomMQReceiver() {

        super(StorageLevel.MEMORY_ONLY_2());

    }

    @Override
    public void onStart() {

        new Thread() {
            @Override
            public void run() {
                try {
                    initConnection();
                    receive();
                } catch (JMSException ex) {
                    ex.printStackTrace();
                }
            }
        }.start();

    }

    @Override
    public void onStop() {

    }

    private void receive() {

        System.out.print("Started receiving messages from MQ");

        try {

            Message receivedMessage = null;

            while (!isStopped() && (receivedMessage = consumer.receiveNoWait()) != null) {

                String userInput = convertStreamToString(receivedMessage);
                System.out.println("Received data :'" + userInput + "'");
                store(userInput);
            }

            stop("No More Messages To read !");
            qCon.close();
            System.out.println("Queue Connection is Closed");

        } catch (Exception e) {
            e.printStackTrace();
            restart("Trying to connect again");
        } catch (Throwable t) {

            restart("Error receiving data", t);
        }

    }

    public void initConnection() throws JMSException {

        MQQueueConnectionFactory conFactory = new MQQueueConnectionFactory();
        conFactory.setHostName(HOST);
        conFactory.setPort(PORT);
        conFactory.setIntProperty(WMQConstants.WMQ_CONNECTION_MODE, WMQConstants.WMQ_CM_CLIENT);
        conFactory.setQueueManager(QMGR);
        conFactory.setChannel(CHANNEL);
        conFactory.setBooleanProperty(WMQConstants.USER_AUTHENTICATION_MQCSP, true);
        conFactory.setStringProperty(WMQConstants.USERID, APP_USER);
        conFactory.setStringProperty(WMQConstants.PASSWORD, APP_PASSWORD);

        qCon = (MQQueueConnection) conFactory.createConnection();
        MQQueueSession qSession = (MQQueueSession) qCon.createQueueSession(false, 1);
        MQQueue queue = (MQQueue) qSession.createQueue(QUEUE_NAME);
        consumer = (MQMessageConsumer) qSession.createConsumer(queue);
        qCon.start();

    }

    @Override
    public StorageLevel storageLevel() {
        return StorageLevel.MEMORY_ONLY_2();
    }

    private static String convertStreamToString(final Message jmsMsg) throws Exception {

        String stringMessage = "";
        JMSTextMessage msg = (JMSTextMessage) jmsMsg;
        stringMessage = msg.getText();

        return stringMessage;
    }

А вот и мой код искры

SparkConf sparkConf = new SparkConf()
                    .setAppName("MQStreaming")
                    .set("spark.driver.allowMultipleContexts", "true")
                    .setMaster("local[*]");

            JavaSparkContext jsc = new JavaSparkContext(sparkConf);
            final SQLContext sqlContext = new SQLContext(jsc);
            JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(Long.parseLong(propertiesConf.getProperty("duration"))));

            JavaDStream<String> customReceiverStream = ssc.receiverStream(new CustomMQReceiver());

            customReceiverStream.foreachRDD(new VoidFunction<JavaRDD<String>>() {

                @Override
                public void call(JavaRDD<String> rdd) throws Exception {

                    JavaRDD<Row> rddRow = rdd.map(new Function<String, Row>() {

                        @Override
                        public Row call(String v1) throws Exception {

                            return RowFactory.create(v1);

                        }

                    });

                    try {

                        StructType schema = new StructType(new StructField[]{
                            new StructField("trama", DataTypes.StringType, true, Metadata.empty())
                        });

                        DataFrame frame = sqlContext.createDataFrame(rddRow, schema);

                        if (frame.count() > 0) {
                            //Here is where the first messages throw the exception
                            frame.show();
                            frame.write().mode(SaveMode.Append).json("file:///C:/tmp/");

                        }

                    } catch (Exception ex) {

                        System.out.println(" INFO " + ex.getMessage());

                    }

                }

            });

            ssc.start();
            ssc.awaitTermination();

Я не могу изменить версию spark, потому что это задание будет выполняться в старом кластере cloudera с spark 1.6. Я не знаю, делаю ли я что-то неправильно или это просто ошибка. Помогите !!!! * * 1013

1 Ответ

0 голосов
/ 22 апреля 2019

Я решил свою собственную проблему, это исключение вызвано тем, как я создаю SQLContext, правильный способ - создать sqlContext с JavaStreamingContext

//JavaStreamingContext jsc = ...
SQLContext sqlContext = new SQLContext(jsc.sparkContext());
...