Фильтрация пользовательской структуры данных в Spark - PullRequest
0 голосов
/ 10 мая 2019

Я пытаюсь прочитать CSV-файл в JavaRDD. Для этого я написал код ниже:

SparkConf conf = new SparkConf().setAppName("NameOfApp").setMaster("spark://Ip here:7077");
JavaSparkContext sc = new JavaSparkContext(conf);

JavaRDD<CurrencyPair> rdd_records = sc.textFile(System.getProperty("user.dir") + "/data/data.csv", 2).map(
        new Function<String, CurrencyPair>() {
            public CurrencyPair call(String line) throws Exception {
                String[] fields = line.split(",");
                CurrencyPair sd = new CurrencyPair(Integer.parseInt(fields[0].trim()), Double.parseDouble(fields[1].trim()),
                        Double.parseDouble(fields[2].trim()), Double.parseDouble(fields[3]), new Date(fields[4]));
                return sd;
            }
        }
);

Мой файл данных выглядит так:

1,0.034968,212285,7457.23,"2019-03-08 18:36:18"

Здесь, чтобы проверить, правильно ли загружены мои данные, я попытался напечатать некоторые из них:

System.out.println("Count: " + rdd_records.count());
List<CurrencyPair> list = rdd_records.top(5);
System.out.println(list.toString());

Но у меня произошла следующая ошибка на обеих линиях системы. Я попробовал каждый из них в отдельности, а не количество и список одновременно.

Caused by: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance of org.apache.spark.rdd.MapPartitionsRDD

Мой пользовательский объект выглядит так:

public class CurrencyPair implements Serializable {

private int id;
private double value;
private double baseVolume;
private double quoteVolume;
private Date timeStamp;

public CurrencyPair(int id, double value, double baseVolume, double quoteVolume, Date timeStamp) {
    this.id = id;
    this.value = value;
    this.baseVolume = baseVolume;
    this.quoteVolume = quoteVolume;
    this.timeStamp = timeStamp;
}

public int getId() {
    return id;
}

public void setId(int id) {
    this.id = id;
}

public double getValue() {
    return value;
}

public void setValue(double value) {
    this.value = value;
}

public double getBaseVolume() {
    return baseVolume;
}

public void setBaseVolume(double baseVolume) {
    this.baseVolume = baseVolume;
}

public double getQuoteVolume() {
    return quoteVolume;
}

public void setQuoteVolume(double quoteVolume) {
    this.quoteVolume = quoteVolume;
}

public Date getTimeStamp() {
    return timeStamp;
}

public void setTimeStamp(Date timeStamp) {
    this.timeStamp = timeStamp;
}
}

Так что я не мог понять, что здесь не так. Что я делаю не так?


Редактировать: Это хорошо работает, когда я пишу локально, а не по собственному IP-адресу. Но мне нужно запустить это на моем собственном IP. Так что может быть не так с моим главным узлом?

1 Ответ

0 голосов
/ 11 мая 2019

Возможно, проблема в анонимном определении класса new Function<String, CurrencyPair>() {, которое заставляет Spark также попытаться сериализовать родительский класс.Вместо этого попробуйте использовать лямбду:

rdd_records.map(
  (Function<String, CurrencyPair>) line -> {
    ...

Примечание. Вместо этого можно прочитать файл как CSV и использовать API набора данных с кодировщиком компонентов, чтобы полностью пропустить ручной анализ.

...