Spark UDAF пользовательского типа объекта в Java, вызывающий ошибку соответствия Scala - PullRequest
0 голосов
/ 05 марта 2019

Я застрял на этой надоедливой проблеме и не могу понять, что и почему?

Итак, вот формулировка проблемы: У меня есть набор данных Java POJO (A), как в Dataset<A>. В этом pojo есть пара полей всех примитивных типов (String, double и т. Д.)

Я хотел бы собрать все POJO на основе поля (ниже его opp_id_hash) и применить некоторую агрегацию к нескольким его полям, таким как costAmount.

Теперь, чтобы сделать это, я написал UDAF (пользовательскую статистическую функцию), которая выполняет эту задачу для меня.

Тем не менее, я застрял в буферной схеме, независимо от того, что и как определить, это не позволяет мне обновляться в методе Update.

Пример POJO:

private String oppIdHash;
private int mdsFamId;
private int itemNbr;
private String item1Desc;
private int storeNbr;
private String storeName;
private double costAmt;
private double retailAmt;
private int deletedItemQty;
private double avgWklySales;
private double costAmount;
private double retailAmount;
private double markDownRetail;
private double markDownAmount;

public static StructField[] getFields() {
    StructField oppIdHash = DataTypes.createStructField(PojoFieldConstants.ItemStoreDetailModelFieldConstants.OPP_ID_HASH, DataTypes.StringType, false);
    StructField mdsFamId = DataTypes.createStructField(PojoFieldConstants.ItemStoreDetailModelFieldConstants.MDS_FAM_ID, DataTypes.IntegerType, false);
    StructField itemNbr = DataTypes.createStructField(PojoFieldConstants.ItemStoreDetailModelFieldConstants.ITEM_NBR, DataTypes.IntegerType, false);
    StructField item1Desc = DataTypes.createStructField(PojoFieldConstants.ItemStoreDetailModelFieldConstants.ITEM_DESC, DataTypes.StringType, false);
    StructField storeNbr = DataTypes.createStructField(PojoFieldConstants.ItemStoreDetailModelFieldConstants.STORE_NBR, DataTypes.IntegerType, false);
    StructField storeName = DataTypes.createStructField(PojoFieldConstants.ItemStoreDetailModelFieldConstants.STORE_NAME, DataTypes.StringType, false);
    StructField costAmt = DataTypes.createStructField(PojoFieldConstants.ItemStoreDetailModelFieldConstants.COST_AMT, DataTypes.DoubleType, false);
    StructField retailAmt = DataTypes.createStructField(PojoFieldConstants.ItemStoreDetailModelFieldConstants.RETAIL_AMT, DataTypes.DoubleType, false);
    StructField deletedItemQty = DataTypes.createStructField(PojoFieldConstants.ItemStoreDetailModelFieldConstants.DELETED_ITEM_QTY, DataTypes.IntegerType, false);
    StructField avgWklySales = DataTypes.createStructField(PojoFieldConstants.ItemStoreDetailModelFieldConstants.AVG_WKLY_SALES, DataTypes.DoubleType, false);
    StructField costAmount = DataTypes.createStructField(PojoFieldConstants.ItemStoreDetailModelFieldConstants.COST_AMOUNT, DataTypes.DoubleType, false);
    StructField retailAmount = DataTypes.createStructField(PojoFieldConstants.ItemStoreDetailModelFieldConstants.RETAIL_AMOUNT, DataTypes.DoubleType, false);
    StructField markDownRetail = DataTypes.createStructField(PojoFieldConstants.ItemStoreDetailModelFieldConstants.MARKDOWN_RETAIL, DataTypes.DoubleType, false);
    StructField markDownAmount = DataTypes.createStructField(PojoFieldConstants.ItemStoreDetailModelFieldConstants.MARKDOWN_AMOUNT, DataTypes.DoubleType, false);


    StructField[] structFields = new StructField[]{
            oppIdHash, mdsFamId, itemNbr, item1Desc, storeNbr, storeName, costAmt, retailAmt, deletedItemQty, avgWklySales, costAmount, retailAmount, markDownRetail, markDownAmount
    };
    return structFields;
}

public static StructType getSchema() {


    return DataTypes.createStructType(getFields());
}

Мой UDAF определяется следующим образом:

public class OppDetailsAggregatedMetricUdaf extends UserDefinedAggregateFunction implements Serializable {

@Autowired
@Qualifier(BeanNames.OPP_DETAILS_AGGREGATED_METRIC)
private OppDetailsMetricCalculator<Collection<OppDetailsAggregatedMetric>, ItemStoreMetricsModel> aggregatedMetricCalculator;


private final StructType inputSchema;

//TODO Buffer schema has issue
private final StructType bufferSchema;


public OppDetailsAggregatedMetricUdaf() {

    inputSchema = ItemStoreMetricsModel.getSchema();

    List<StructField> bufferFields = new ArrayList<>();
    bufferFields.add(DataTypes.createStructField("list", inputSchema, false));


    bufferSchema = DataTypes.createStructType(bufferFields); //get stuck here



}


/**
 * This is the input fields for your aggregate function. Since we ware to perform on multiple fields,
 * this object contains all the required fields
 *
 * @return
 */
@Override
public StructType inputSchema() {
    return inputSchema;
}

/**
 * This is the buffer fields for your aggregate function. Since we ware to perform on multiple fields,
 * this object contains all the required fields. This is work as buffer schema
 *
 * @return
 */
@Override
public StructType bufferSchema() {
    return bufferSchema;


}

/**
 * This is the output schema of your aggregate function. Since our aggregation returns a object it self, this returns collection of them
 *
 * @return
 */
@Override
public DataType dataType() {
    return DataTypes.createArrayType(Encoders.bean(ItemStoreMetricsModel.class).schema());
}

/**
 * This describes whether the UDAF we are implementing is deterministic or not.
 * Since, spark executes by splitting data, processing the chunks separately and combining them.
 * If the UDAF logic is such that the result is independent of the order in which data is
 * processed and combined then the UDAF is deterministic.
 *
 * @return
 */
@Override
public boolean deterministic() {
    return false;
}

/**
 * Whenever a new key come, we need to have a list to hold all the values for this key
 *
 * @return
 */
@Override
public void initialize(MutableAggregationBuffer buffer) {
    buffer.update(0, new LinkedList<ItemStoreMetricsModel>());
}

/**
 * Whenever for a same key, a new row come (new object) then this will invoke, it will add this new object to already maintained object list
 * <p>
 *
 * @return
 */
@Override
public void update(MutableAggregationBuffer buffer, Row input) {

    List<ItemStoreMetricsModel> newData = new LinkedList<>();

    if (!buffer.isNullAt(0)) {
        List<ItemStoreMetricsModel> list = JavaConverters.seqAsJavaListConverter(buffer.<Seq<ItemStoreMetricsModel>>getAs(0)).asJava();

        if (!list.isEmpty()) {
            newData.addAll(list);
        }

        ItemStoreMetricsModel old = RawToModel.getItemStoreMetricsModel(input);
        newData.add(old);


        buffer.update(0, newData); //Not able to perform this

    }


}


/**
 * Whenever for a same key, two different list come (already maintained and new list ),
 * merge them to make a single list
 *
 * @return
 */
@Override
public void merge(MutableAggregationBuffer buffer1, Row buffer2) {

    List<ItemStoreMetricsModel> newData = new LinkedList<>();

    List<ItemStoreMetricsModel> buffer1As = JavaConverters.seqAsJavaListConverter(buffer1.<Seq<ItemStoreMetricsModel>>getAs(0)).asJava();

    List<ItemStoreMetricsModel> buffer2As = JavaConverters.seqAsJavaListConverter(buffer2.<Seq<ItemStoreMetricsModel>>getAs(0)).asJava();

    newData.addAll(buffer1As);
    newData.addAll(buffer2As);

    buffer1.update(0, newData);


}

/**
 * for a same key, apply aggregation operation in order to collect the output
 *
 * @return
 */
@Override
public Object evaluate(Row buffer) {
    final List<ItemStoreMetricsModel> items = buffer.getAs(0);
    return aggregatedMetricCalculator.doCalculate(items).orElse(new ArrayList<>());

}

}

Как определить схему буфера, чтобы я мог добавлять элементы в список и создавать из него буфер, в конце я должен получить коллекцию списка в функции evaluate.

...