Я застрял на этой надоедливой проблеме и не могу понять, что и почему?
Итак, вот формулировка проблемы:
У меня есть набор данных Java POJO (A), как в Dataset<A>
. В этом pojo есть пара полей всех примитивных типов (String, double и т. Д.)
Я хотел бы собрать все POJO на основе поля (ниже его opp_id_hash
) и применить некоторую агрегацию к нескольким его полям, таким как costAmount
.
Теперь, чтобы сделать это, я написал UDAF (пользовательскую статистическую функцию), которая выполняет эту задачу для меня.
Тем не менее, я застрял в буферной схеме, независимо от того, что и как определить, это не позволяет мне обновляться в методе Update.
Пример POJO:
private String oppIdHash;
private int mdsFamId;
private int itemNbr;
private String item1Desc;
private int storeNbr;
private String storeName;
private double costAmt;
private double retailAmt;
private int deletedItemQty;
private double avgWklySales;
private double costAmount;
private double retailAmount;
private double markDownRetail;
private double markDownAmount;
public static StructField[] getFields() {
StructField oppIdHash = DataTypes.createStructField(PojoFieldConstants.ItemStoreDetailModelFieldConstants.OPP_ID_HASH, DataTypes.StringType, false);
StructField mdsFamId = DataTypes.createStructField(PojoFieldConstants.ItemStoreDetailModelFieldConstants.MDS_FAM_ID, DataTypes.IntegerType, false);
StructField itemNbr = DataTypes.createStructField(PojoFieldConstants.ItemStoreDetailModelFieldConstants.ITEM_NBR, DataTypes.IntegerType, false);
StructField item1Desc = DataTypes.createStructField(PojoFieldConstants.ItemStoreDetailModelFieldConstants.ITEM_DESC, DataTypes.StringType, false);
StructField storeNbr = DataTypes.createStructField(PojoFieldConstants.ItemStoreDetailModelFieldConstants.STORE_NBR, DataTypes.IntegerType, false);
StructField storeName = DataTypes.createStructField(PojoFieldConstants.ItemStoreDetailModelFieldConstants.STORE_NAME, DataTypes.StringType, false);
StructField costAmt = DataTypes.createStructField(PojoFieldConstants.ItemStoreDetailModelFieldConstants.COST_AMT, DataTypes.DoubleType, false);
StructField retailAmt = DataTypes.createStructField(PojoFieldConstants.ItemStoreDetailModelFieldConstants.RETAIL_AMT, DataTypes.DoubleType, false);
StructField deletedItemQty = DataTypes.createStructField(PojoFieldConstants.ItemStoreDetailModelFieldConstants.DELETED_ITEM_QTY, DataTypes.IntegerType, false);
StructField avgWklySales = DataTypes.createStructField(PojoFieldConstants.ItemStoreDetailModelFieldConstants.AVG_WKLY_SALES, DataTypes.DoubleType, false);
StructField costAmount = DataTypes.createStructField(PojoFieldConstants.ItemStoreDetailModelFieldConstants.COST_AMOUNT, DataTypes.DoubleType, false);
StructField retailAmount = DataTypes.createStructField(PojoFieldConstants.ItemStoreDetailModelFieldConstants.RETAIL_AMOUNT, DataTypes.DoubleType, false);
StructField markDownRetail = DataTypes.createStructField(PojoFieldConstants.ItemStoreDetailModelFieldConstants.MARKDOWN_RETAIL, DataTypes.DoubleType, false);
StructField markDownAmount = DataTypes.createStructField(PojoFieldConstants.ItemStoreDetailModelFieldConstants.MARKDOWN_AMOUNT, DataTypes.DoubleType, false);
StructField[] structFields = new StructField[]{
oppIdHash, mdsFamId, itemNbr, item1Desc, storeNbr, storeName, costAmt, retailAmt, deletedItemQty, avgWklySales, costAmount, retailAmount, markDownRetail, markDownAmount
};
return structFields;
}
public static StructType getSchema() {
return DataTypes.createStructType(getFields());
}
Мой UDAF определяется следующим образом:
public class OppDetailsAggregatedMetricUdaf extends UserDefinedAggregateFunction implements Serializable {
@Autowired
@Qualifier(BeanNames.OPP_DETAILS_AGGREGATED_METRIC)
private OppDetailsMetricCalculator<Collection<OppDetailsAggregatedMetric>, ItemStoreMetricsModel> aggregatedMetricCalculator;
private final StructType inputSchema;
//TODO Buffer schema has issue
private final StructType bufferSchema;
public OppDetailsAggregatedMetricUdaf() {
inputSchema = ItemStoreMetricsModel.getSchema();
List<StructField> bufferFields = new ArrayList<>();
bufferFields.add(DataTypes.createStructField("list", inputSchema, false));
bufferSchema = DataTypes.createStructType(bufferFields); //get stuck here
}
/**
* This is the input fields for your aggregate function. Since we ware to perform on multiple fields,
* this object contains all the required fields
*
* @return
*/
@Override
public StructType inputSchema() {
return inputSchema;
}
/**
* This is the buffer fields for your aggregate function. Since we ware to perform on multiple fields,
* this object contains all the required fields. This is work as buffer schema
*
* @return
*/
@Override
public StructType bufferSchema() {
return bufferSchema;
}
/**
* This is the output schema of your aggregate function. Since our aggregation returns a object it self, this returns collection of them
*
* @return
*/
@Override
public DataType dataType() {
return DataTypes.createArrayType(Encoders.bean(ItemStoreMetricsModel.class).schema());
}
/**
* This describes whether the UDAF we are implementing is deterministic or not.
* Since, spark executes by splitting data, processing the chunks separately and combining them.
* If the UDAF logic is such that the result is independent of the order in which data is
* processed and combined then the UDAF is deterministic.
*
* @return
*/
@Override
public boolean deterministic() {
return false;
}
/**
* Whenever a new key come, we need to have a list to hold all the values for this key
*
* @return
*/
@Override
public void initialize(MutableAggregationBuffer buffer) {
buffer.update(0, new LinkedList<ItemStoreMetricsModel>());
}
/**
* Whenever for a same key, a new row come (new object) then this will invoke, it will add this new object to already maintained object list
* <p>
*
* @return
*/
@Override
public void update(MutableAggregationBuffer buffer, Row input) {
List<ItemStoreMetricsModel> newData = new LinkedList<>();
if (!buffer.isNullAt(0)) {
List<ItemStoreMetricsModel> list = JavaConverters.seqAsJavaListConverter(buffer.<Seq<ItemStoreMetricsModel>>getAs(0)).asJava();
if (!list.isEmpty()) {
newData.addAll(list);
}
ItemStoreMetricsModel old = RawToModel.getItemStoreMetricsModel(input);
newData.add(old);
buffer.update(0, newData); //Not able to perform this
}
}
/**
* Whenever for a same key, two different list come (already maintained and new list ),
* merge them to make a single list
*
* @return
*/
@Override
public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
List<ItemStoreMetricsModel> newData = new LinkedList<>();
List<ItemStoreMetricsModel> buffer1As = JavaConverters.seqAsJavaListConverter(buffer1.<Seq<ItemStoreMetricsModel>>getAs(0)).asJava();
List<ItemStoreMetricsModel> buffer2As = JavaConverters.seqAsJavaListConverter(buffer2.<Seq<ItemStoreMetricsModel>>getAs(0)).asJava();
newData.addAll(buffer1As);
newData.addAll(buffer2As);
buffer1.update(0, newData);
}
/**
* for a same key, apply aggregation operation in order to collect the output
*
* @return
*/
@Override
public Object evaluate(Row buffer) {
final List<ItemStoreMetricsModel> items = buffer.getAs(0);
return aggregatedMetricCalculator.doCalculate(items).orElse(new ArrayList<>());
}
}
Как определить схему буфера, чтобы я мог добавлять элементы в список и создавать из него буфер, в конце я должен получить коллекцию списка в функции evaluate
.