Я использую FlatMapGroupsWithStateFunction в моем приложении для потоковой передачи.
FlatMapGroupsWithStateFunction<String, Row, SessionInfo, SessionUpdate> idstateUpdateFunction =
new FlatMapGroupsWithStateFunction<String, Row, SessionInfo, SessionUpdate>() {.....}
Класс обновления сеанса такой, как показано ниже;
public static class SessionUpdate implements Serializable {
private static final long serialVersionUID = -3858977319192658483L;
private String instanceId;
private ArrayList<GenericRowWithSchema> milestones = new ArrayList<GenericRowWithSchema>();
private Timestamp processingTimeoutTimestamp;
public SessionUpdate() {
super();
}
public SessionUpdate(String instanceId, ArrayList<GenericRowWithSchema> milestones, Timestamp processingTimeoutTimestamp) {
super();
this.instanceId = instanceId;
this.milestones = milestones;
this.processingTimeoutTimestamp = processingTimeoutTimestamp;
}
public String getInstanceId() {
return instanceId;
}
public void setInstanceId(String instanceId) {
this.instanceId = instanceId;
}
public ArrayList<GenericRowWithSchema> getMilestones() {
return milestones;
}
public void setMilestones(ArrayList<GenericRowWithSchema> milestones) {
this.milestones = milestones;
}
public Timestamp getProcessingTimeoutTimestamp() {
return processingTimeoutTimestamp;
}
public void setProcessingTimeoutTimestamp(Timestamp processingTimeoutTimestamp) {
this.processingTimeoutTimestamp = processingTimeoutTimestamp;
}
}
После добавления ниже в классЯ получаю исключение, которое упоминается ниже:
private ArrayList<GenericRowWithSchema> milestones = new ArrayList<GenericRowWithSchema>();
Исключение:
ERROR cannot resolve 'named_struct()' due to data type mismatch: input to function named_struct requires at least one argument;;
'SerializeFromObject [staticinvoke (класс org.apache.spark.unsafe.types.UTF8String, StringType,fromString, assertnotnull (input [0, oracle.insight.spark.event_processor.EventProcessor $ SessionUpdate, true]). getInstanceId, true, false) AS instanceId # 62, mapobjects (MapObjects_loopValue2, MapObjects_loopIapy.sql.catalyst.[0, oracle.insight.spark.event_processor.EventProcessor $ SessionUpdate, true]). GetMilestones, Нет) AS milestones # 63, staticinvoke (класс org.apache.spark.sql.catalyst.util.DateTimeUtils $, TimestampType, fromJavaTimestamp, assertnotnull (входные данные [0, oracle.insight.spark.event_processor.EventProcessor $ SessionUpdoutTextTime, true]. true).истина, ложь) AS processingTimeoutTimestamp # 64] + - FlatMapGroupsWithState, приведение (значение # 54 в виде строки) .toString, createexternalrow (EventTime # 23.toString, InstanceID # 24.toString, модель # 25.toString, Milestone # 26.toString,Region # 27.toString, SalesOrganization # 28.toString, ProductName # 29.toString, ReasonForQuoteReject # 30.toString, ReasonforRejectionBy # 31.toString, OpportunityAmount # 32.toJavaBigDecimal, Скидка # 33.toJavaBigDecimal, TotalQuoteAmount # 343QoteOmBount35.toJavaBigDecimal, ApprovedDiscount # 36.toJavaBigDecimal, TotalOrderAmount # 37.toJavaBigDecimal, StructField (EventTime, StringType, true), StructField (InstanceID, StringType, true), StructField (модель, StringTypeFject (StringTypeFyTFF), тип StringTypeFF (тип, StringTyFF)), StructField (Regio)n, StringType, true), StructField (SalesOrganization, StringType, true), StructField (ProductName, StringType, true), StructField (ReasonForQuoteReject, StringType, true), StructField (ReasonforRejectionBy, StringType, true)), еще 6 ... 6, [value # 54], [EventTime # 23, InstanceID # 24, Model # 25, Milestone # 26, Region # 27, SalesOrganization # 28, ProductName # 29, ReasonForQuoteReject # 30, ReasonforRejectionBy # 31, OpportunityAmount # 32, Discount #33, TotalQuoteAmount # 34, NetQuoteAmount # 35, ApprovedDiscount # 36, TotalOrderAmount # 37], объект № 61: oracle.insight.spark.event_processor.EventProcessor $ SessionUpdate, класс [instanceId [0]: строка, вехи [0]: массив>, processingTimeoutTimestamp [0]: timestamp], Append, false, ProcessingTimeTimeout
Любая подсказка, почему я получаю это исключение?
спасибо.