java.lang.StringIndexOutOfBoundsException: индекс строки вне диапазона: -650599791 - PullRequest
1 голос
/ 08 октября 2019

Ошибка здесь:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 132.0 failed 4 times, most recent failure : Lost task 0.3 in stage 132.0: 
java.lang.StringIndexOutOfBoundsException: String index out of range: -650599791
     at java.lang.String.<init>(String.java:196)
     at com.esotericsoftware.kryo.io.Input.readString(Input.java:484)
     at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:195)
     at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:184)
     at com.esotericsoftware.kryo.readClassAndObject(kryo.java:790)
     at arg.apache.spark.Serializer.kryoDeserializationStream.readObject(kryoSerializer.scala:244)
     at arg.apache.spark.Serializer.DeserializationStream.readKey(Serializer.scala:157)
     at arg.apache.spark.Serializer.DeserializationStream.$$anon$2.getNext(Serializer.scala:189)
     at arg.apache.spark.Serializer.DeserializationStream.$$anon$2.getNext(Serializer.scala:186)
     at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
     at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438)
     at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
     at org.apache.spark.util.completionIterator.hasNext(CompletionIterator.scala:32)
     at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
     at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:154)
     at org.apache.spark.Aggregator.combineValuesBykey(Aggregator.scala:41)
     at org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKeyWithClassTag$1$$anonfun$apply$11.apply(PairRDDFunctions.scala:99)
     org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKeyWithClassTag$1$$anonfun$apply$11.apply(PairRDDFunctions.scala:97)

Ошибка произошла, когда я вызвал метод группировки свечей в Java. Версия Spark - 2.1.0, а версия Java - 1.8.

JavaPairRDD<String, List<String>> combineRdd = pairRDD.partitionBy(new HashPartitioner(mission.getCombineCount()))
                .combineByKey(new Function<String, List<String>>() {
                    private static final long serialVersionUID = 6592724289900217307L;

                    @Override
                    public List<String> call(String v1) throws Exception {
                        List<String> re = new ArrayList<>();
                        re.add(v1);
                        return re;
                    }
                }, new Function2<List<String>, String, List<String>>() {
                    private static final long serialVersionUID = -5882646966686065803L;

                    @Override
                    public List<String> call(List<String> v1, String v2) throws Exception {
                        v1.add(v2);
                        return v1;
                    }
                }, new Function2<List<String>, List<String>, List<String>>() {
                    private static final long serialVersionUID = -1624057077103693447L;

                    @Override
                    public List<String> call(List<String> v1, List<String> v2) throws Exception {
                        v1.addAll(v2);
                        return v1;
                    }
                });
  System.out.println("group rdd count: " + combineRdd.count());

Причина, по которой я могу придумать, - слишком много данных. Должен ли я сделать что-то перед групповыми данными. Любая другая причина?

1 Ответ

1 голос
/ 08 октября 2019

Я думаю, что есть проблема с v1.addAll(v2);, вам нужно создать и вернуть новый список:

@Override
public List<String> call(List<String> v1, List<String> v2) throws Exception {
   List<String> list = new ArrayList<String>(v1);
   list.addAll(v2);
   return list;
   // in java 8
   // return Stream.concat(a.stream(), b.stream()).collect(Collectors.toList())
}

Более того, если вы используете Java 8, вы можете использовать лямбда-выражения и делать это в одну строку,см. пример ниже:

JavaPairRDD<String, List<String>> pair = spark.range(10L)
        .toJavaRDD()
        .mapToPair(s -> Tuple2.apply(
                Long.valueOf(s % 3).toString(),
                Arrays.asList(s % 2, s, s + 1)
                        .stream()
                        .map(z -> z.toString())
                        .collect(Collectors.toList())
                )
        );

pair.foreach(s -> System.out.println(s._1 + "," + s._2.toString()));

pair.reduceByKey((a, b) ->
        Stream.concat(a.stream(), b.stream()).collect(Collectors.toList())
).foreach(s -> System.out.println(s._1 + "," + s._2.toString() + "gr. count: " + s._2.size()));

вывод:

0,[0, 0, 1]
1,[1, 1, 2]
2,[0, 2, 3]
0,[1, 3, 4]
1,[0, 4, 5]
2,[1, 5, 6]
0,[0, 6, 7]
1,[1, 7, 8]
2,[0, 8, 9]
0,[1, 9, 10]

0,[0, 0, 1, 1, 3, 4, 0, 6, 7, 1, 9, 10]gr. count: 12
2,[0, 2, 3, 1, 5, 6, 0, 8, 9]gr. count: 9
1,[1, 1, 2, 0, 4, 5, 1, 7, 8]gr. count: 9
...