Spark FlatMapGroupsWithStateFunction throws не может разрешить named_struct () из-за несоответствия типов данных SerializeFromObject - PullRequest
0 голосов
/ 16 сентября 2018

Я использую FlatMapGroupsWithStateFunction в моем приложении для потоковой передачи.

 FlatMapGroupsWithStateFunction<String, Row, SessionInfo, SessionUpdate> idstateUpdateFunction =
  new FlatMapGroupsWithStateFunction<String, Row, SessionInfo, SessionUpdate>() {.....}

Класс обновления сеанса такой, как показано ниже;

  public static class SessionUpdate implements Serializable {

private static final long serialVersionUID = -3858977319192658483L;

private String instanceId;

private ArrayList<GenericRowWithSchema> milestones = new ArrayList<GenericRowWithSchema>();

private Timestamp processingTimeoutTimestamp;

public SessionUpdate() { 
  super();
}

public SessionUpdate(String instanceId, ArrayList<GenericRowWithSchema> milestones, Timestamp processingTimeoutTimestamp) {
  super();
  this.instanceId = instanceId;
  this.milestones = milestones;
  this.processingTimeoutTimestamp = processingTimeoutTimestamp;
}

public String getInstanceId() {
  return instanceId;
}

public void setInstanceId(String instanceId) {
  this.instanceId = instanceId;
}

public ArrayList<GenericRowWithSchema> getMilestones() {
  return milestones;
}

public void setMilestones(ArrayList<GenericRowWithSchema> milestones) {
  this.milestones = milestones;
}

public Timestamp getProcessingTimeoutTimestamp() {
  return processingTimeoutTimestamp;
}

public void setProcessingTimeoutTimestamp(Timestamp processingTimeoutTimestamp) {
  this.processingTimeoutTimestamp = processingTimeoutTimestamp;
}

}

После добавления ниже в классЯ получаю исключение, которое упоминается ниже:

private ArrayList<GenericRowWithSchema> milestones = new ArrayList<GenericRowWithSchema>();

Исключение:

ERROR  cannot resolve 'named_struct()' due to data type mismatch: input to function named_struct requires at least one argument;;

'SerializeFromObject [staticinvoke (класс org.apache.spark.unsafe.types.UTF8String, StringType,fromString, assertnotnull (input [0, oracle.insight.spark.event_processor.EventProcessor $ SessionUpdate, true]). getInstanceId, true, false) AS instanceId # 62, mapobjects (MapObjects_loopValue2, MapObjects_loopIapy.sql.catalyst.[0, oracle.insight.spark.event_processor.EventProcessor $ SessionUpdate, true]). GetMilestones, Нет) AS milestones # 63, staticinvoke (класс org.apache.spark.sql.catalyst.util.DateTimeUtils $, TimestampType, fromJavaTimestamp, assertnotnull (входные данные [0, oracle.insight.spark.event_processor.EventProcessor $ SessionUpdoutTextTime, true]. true).истина, ложь) AS processingTimeoutTimestamp # 64] + - FlatMapGroupsWithState, приведение (значение # 54 в виде строки) .toString, createexternalrow (EventTime # 23.toString, InstanceID # 24.toString, модель # 25.toString, Milestone # 26.toString,Region # 27.toString, SalesOrganization # 28.toString, ProductName # 29.toString, ReasonForQuoteReject # 30.toString, ReasonforRejectionBy # 31.toString, OpportunityAmount # 32.toJavaBigDecimal, Скидка # 33.toJavaBigDecimal, TotalQuoteAmount # 343QoteOmBount35.toJavaBigDecimal, ApprovedDiscount # 36.toJavaBigDecimal, TotalOrderAmount # 37.toJavaBigDecimal, StructField (EventTime, StringType, true), StructField (InstanceID, StringType, true), StructField (модель, StringTypeFject (StringTypeFyTFF), тип StringTypeFF (тип, StringTyFF)), StructField (Regio)n, StringType, true), StructField (SalesOrganization, StringType, true), StructField (ProductName, StringType, true), StructField (ReasonForQuoteReject, StringType, true), StructField (ReasonforRejectionBy, StringType, true)), еще 6 ... 6, [value # 54], [EventTime # 23, InstanceID # 24, Model # 25, Milestone # 26, Region # 27, SalesOrganization # 28, ProductName # 29, ReasonForQuoteReject # 30, ReasonforRejectionBy # 31, OpportunityAmount # 32, Discount #33, TotalQuoteAmount # 34, NetQuoteAmount # 35, ApprovedDiscount # 36, TotalOrderAmount # 37], объект № 61: oracle.insight.spark.event_processor.EventProcessor $ SessionUpdate, класс [instanceId [0]: строка, вехи [0]: массив>, processingTimeoutTimestamp [0]: timestamp], Append, false, ProcessingTimeTimeout

Любая подсказка, почему я получаю это исключение?

спасибо.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...