Невозможно выполнить итерацию несколько раз при выводе преобразования GroupByKey - PullRequest
1 голос
/ 24 сентября 2019

Результатом преобразования Apache-Beam GroupByKey.create () является PCollection >>.

Когда я пытаюсь повторить эту итерацию более одного раза, с помощью SparkRunner , я получаю исключение:

Caused by: java.lang.IllegalStateException: ValueIterator can't be iterated more than once,otherwise there could be data lost
    at org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions$GroupByKeyIterator$ValueIterator.iterator(GroupNonMergingWindowsFunctions.java:163)
    at java.lang.Iterable.spliterator(Iterable.java:101)

Просмотр кода ValueIterator показывает, что ValueIterator запрещаетсоздать более одного итератора для этого Itrable .

Почему я не могу создать несколько итераторов из этого Iterable?какие данные могут быть потеряны?

Пример кода:

import com.google.common.base.MoreObjects;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.values.KV;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.StreamSupport;

public class Main implements Serializable {


    public static void main(String[] args) {

        new Main().runPipeline(args);
    }

    private void runPipeline(String[] args) {

        PipelineOptions options =
                PipelineOptionsFactory.fromArgs(args).withValidation().create();
        Pipeline pipeline = Pipeline.create(options);

        pipeline
                .apply(Create.of(
                        "trader1,10.0",
                        "trader1,20.0",
                        "trader1,5.0",
                        "trader2,7.0",
                        "trader2,30.0",
                        "trader2,2.0",
                        "trader3,10.0"))
                .apply(ParDo.of(extractKey()))
                .apply(GroupByKey.<String, Trade>create())
                .apply(ParDo.of(calculateMax()))
                .apply(ParDo.of(calculateMin()))
                .apply(ToString.elements())
                .apply(TextIO.write().to("output.txt"));

        pipeline.run();
    }

    private static DoFn<KV<String, Iterable<Trade>>, KV<String, IterableAndCalculationWrapper>> calculateMax() {
        return new DoFn<KV<String, Iterable<Trade>>, KV<String, IterableAndCalculationWrapper>>() {
            @ProcessElement
            public void processElement(@Element KV<String, Iterable<Trade>> element, ProcessContext context) {
                String key = element.getKey();
                Iterable<Trade> iterable = element.getValue();
                Double max = StreamSupport.stream(iterable.spliterator(), false).mapToDouble(Trade::getTransactionAmount).max().getAsDouble();

                // Un-commenting this line throws exception:
                //Double min = StreamSupport.stream(iterable.spliterator(), false).mapToDouble(Trade::getTransactionAmount).min().getAsDouble();

                Map<String, Double> caclulated = new HashMap<>();
                caclulated.put("max", max);
                context.output(KV.of(key, new IterableAndCalculationWrapper(iterable, caclulated)));
            }

        };
    }

    private static DoFn<KV<String, IterableAndCalculationWrapper>, KV<String, IterableAndCalculationWrapper>> calculateMin() {
        return new DoFn<KV<String, IterableAndCalculationWrapper>, KV<String, IterableAndCalculationWrapper>>() {
            @ProcessElement
            public void processElement(@Element KV<String, IterableAndCalculationWrapper> element, ProcessContext context) {
                String key = element.getKey();
                IterableAndCalculationWrapper iterableAndCalculationWrapper = element.getValue();
                Iterable<Trade> iterable = iterableAndCalculationWrapper.getIterable();

                // This line throws exception:
                Double min = StreamSupport.stream(iterable.spliterator(), false).mapToDouble(Trade::getTransactionAmount).min().getAsDouble();

                iterableAndCalculationWrapper.getMap().put("min", min);
                context.output(KV.of(key, iterableAndCalculationWrapper));
            }
        };
    }


    public static DoFn<String, KV<String, Trade>> extractKey() {
        return new DoFn<String, KV<String, Trade>>() {
            @ProcessElement
            public void processElement(@Element String element, ProcessContext context) {
                String[] row = element.split(",");
                Trade trade = new Trade(row[0], Double.valueOf(row[1]));
                context.output(KV.of(trade.traderId, trade));
            }
        };
    }

    private static class IterableAndCalculationWrapper implements Serializable {
        private Iterable<Trade> iterable;
        private Map<String, Double> map;

        public IterableAndCalculationWrapper(Iterable<Trade> iterable, Map<String, Double> map) {
            this.iterable = iterable;
            this.map = map;
        }

        public Iterable<Trade> getIterable() {
            return iterable;
        }

        public void setIterable(Iterable<Trade> iterable) {
            this.iterable = iterable;
        }

        public Map<String, Double> getMap() {
            return map;
        }

        public void setMap(Map<String, Double> map) {
            this.map = map;
        }


        @Override
        public String toString() {
            return MoreObjects.toStringHelper(this)
                    .add("iterable", iterable)
                    .add("map", map)
                    .toString();
        }
    }

    private static class Trade implements Serializable {
        private String traderId;
        private Double transactionAmount;

        public Trade(String traderId, Double transactionAmount) {
            this.traderId = traderId;
            this.transactionAmount = transactionAmount;
        }

        public String getTraderId() {
            return traderId;
        }

        public void setTraderId(String traderId) {
            this.traderId = traderId;
        }

        public Double getTransactionAmount() {
            return transactionAmount;
        }

        public void setTransactionAmount(Double transactionAmount) {
            this.transactionAmount = transactionAmount;
        }


        @Override
        public String toString() {
            return MoreObjects.toStringHelper(this)
                    .add("traderId", traderId)
                    .add("transactionAmount", transactionAmount)
                    .toString();
        }
    }
}


1 Ответ

2 голосов
/ 26 сентября 2019

Вы можете сохранить промежуточные результаты вашего конвейера в PCollections.

PCollection<KV<String, Iterable<Trade>>> tmpCollection = 
    pipeline.apply(Create.of(
            "trader1,10.0",
            "trader1,20.0",
            "trader1,5.0",
            "trader2,7.0",
            "trader2,30.0",
            "trader2,2.0",
            "trader3,10.0"))
    .apply(ParDo.of(extractKey()))
    .apply(GroupByKey.<String, Trade>create());


PCollection<KV<String, IterableAndCalculationWrapper>> collectionMax = tmpCollection.apply(ParDo.of(calculateMax()));
PCollection<KV<String, IterableAndCalculationWrapper>> collectionMin = tmpCollection.apply(ParDo.of(calculateMin()));

Это позволит вам выполнить итерацию один раз для CalculateMax, и один раз для calcMin.Тогда вам нужно будет обрабатывать обе коллекции по отдельности, пока вы не объедините их.

...