Я получил ошибку во время следующего процесса. Мне известно, что кажется, что эта ошибка выдается, потому что она пыталась прочитать целые записи в разделе (re c), но пыталась присвоить ее строке (Str = jsonArray.toJSONString ();) одновременно Я использую интервал 5 * se c в конфигурации потоковой передачи искры. Какие-либо предложения для этого кода? Пожалуйста, помогите. Спасибо
Ошибка в этой строке:
Str=jsonArray.toJSONString();
Ниже моя полная функция:
MapRowRDD.foreachRDD(rdd ->{
rdd.foreachPartition(
rec-> {
while(rec.hasNext()) {
JSONObject record = rec.next();
i=i+1;
if(TimeUnit.MINUTES.convert(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
.parse((String) record.get("DATE_TRANSACTION"))
.getTime()-DateUtils.addMinutes(new Date(), -5)
.getTime(),TimeUnit.MILLISECONDS)>=0 || Integer.valueOf((String) record.get("EVENT_TYPE"))<0) {
jsonArray.add(record);
if(i % v_BATCH_WINDOW == 0)
{
try {
Str=jsonArray.toJSONString();
HttpResponse<String> Response = ui.post(v_REST_API_ENDPOINT).body(Str).asString();
out_JSON=Response.getBody();
log.warn("Response : " + out_JSON.toString());
}
catch(UnirestConfigException e){
System.out.println("UnirestConfigException occured "+ e.toString());
e.printStackTrace();
}
jsonArray.clear();
i=0;
}
}
publishToKafka(record.toString(), outputTopic, props);
}
Str=jsonArray.toJSONString();
if (!Str.equals("[]") && Str!=null && !Str.isEmpty()) {
HttpResponse<String> Response = ui.post(v_REST_API_ENDPOINT).body(Str).asString();
}
jsonArray.clear();
i=0;
}
);
});