При потоковой передаче Spark создается исключение java .util.ConcurrentModificationException при добавлении JsonArray. - PullRequest
0 голосов
/ 23 апреля 2020

Я получил ошибку во время следующего процесса. Мне известно, что кажется, что эта ошибка выдается, потому что она пыталась прочитать целые записи в разделе (re c), но пыталась присвоить ее строке (Str = jsonArray.toJSONString ();) одновременно Я использую интервал 5 * se c в конфигурации потоковой передачи искры. Какие-либо предложения для этого кода? Пожалуйста, помогите. Спасибо

Ошибка в этой строке:

 Str=jsonArray.toJSONString();

Ниже моя полная функция:

MapRowRDD.foreachRDD(rdd ->{
            rdd.foreachPartition(
                    rec-> {
                        while(rec.hasNext()) {
                            JSONObject record = rec.next();
                            i=i+1;
                          if(TimeUnit.MINUTES.convert(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
                                  .parse((String) record.get("DATE_TRANSACTION"))
                                  .getTime()-DateUtils.addMinutes(new Date(), -5)
                                  .getTime(),TimeUnit.MILLISECONDS)>=0 || Integer.valueOf((String) record.get("EVENT_TYPE"))<0) {
                              jsonArray.add(record);
                            if(i % v_BATCH_WINDOW == 0)
                            {   
                                try {
                                    Str=jsonArray.toJSONString();
                                    HttpResponse<String> Response = ui.post(v_REST_API_ENDPOINT).body(Str).asString();
                                    out_JSON=Response.getBody();
                                    log.warn("Response : " + out_JSON.toString());
                                }
                                catch(UnirestConfigException e){
                                    System.out.println("UnirestConfigException occured "+ e.toString());
                                    e.printStackTrace();
                                }
                                jsonArray.clear();
                                i=0;
                            }
                          }
                        publishToKafka(record.toString(), outputTopic, props);
                        }
                        Str=jsonArray.toJSONString();
                        if (!Str.equals("[]") && Str!=null && !Str.isEmpty()) {
                            HttpResponse<String> Response = ui.post(v_REST_API_ENDPOINT).body(Str).asString();
                        }
                        jsonArray.clear();
                        i=0;
                    }   
                    );
        });

1 Ответ

2 голосов
/ 25 апреля 2020

Как вы знаете, это исключение возникает, когда вы изменяете и повторяете одну и ту же коллекцию одновременно в разных потоках. jsonArray не является потокобезопасным, замените его на некоторые многопоточные коллекции, такие как Vector, и посмотрите, как это работает

...