Распределение гамма-излучения в пучке Apache - PullRequest
0 голосов
/ 18 мая 2018

Я пытаюсь реализовать гамма-распределение в Apache Beam.Сначала я читаю CSV-файл CSV-файла, используя класс TextIO Apache beam:

Pipeline p = Pipeline.create();  
p.apply(TextIO.read().from("gs://path/to/file.csv"));  

После этого я применяю преобразование, которое будет анализировать каждую строку в CSV-файле и возвращать объект.Вот только я пытаюсь выполнить операцию Gamma Distribution:

.apply(ParDo.of(new DoFn<String, Entity>() {
@ProcessElement
public void processElement(ProcessContext c) {
    String[] strArr = c.element().split(",");
    ClassxNorms xn = new ClassxNorms();
    xn.setDuration(Double.parseDouble(strArr[0]));
    xn.setAlpha(Double.parseDouble(strArr[1]));
    xn.setBeta(Double.parseDouble(strArr[2]));
    GammaDistribution gdValue = new GammaDistribution(Double.parseDouble(strArr[0]), Double.parseDouble(strArr[1]), Double.parseDouble(strArr[2]));
    System.out.println("gdValue : " + gdValue);
    c.output(xn);
}
}));

Я создаю beamRecord, и на следующем шаге я конвертирую запись луча в строку, чтобы записать окончательный вывод в хранилище Google:

PCollection<String> gs_output_final = xnorm_trig.apply(ParDo.of(new DoFn<BeamRecord, String>() {
                    private static final long serialVersionUID = 1L;
                    @ProcessElement
                    public void processElement(ProcessContext c) {
                        c.output(c.element().toString());
                        System.out.println(c.element().toString());
                    }
                })); 
   gs_output_final.apply(TextIO.write().to("gs://output/op_1/Q40test111"));  

Я получаю вывод, но операция гамма-распределения не выполняется.Любая помощь будет очень признательна.

1 Ответ

0 голосов
/ 01 июня 2018

Мне удалось реализовать гамма-распределение в Apache Beam.Ниже приведен фрагмент кода для справки:

.apply(ParDo.of(new DoFn<String, ClassxNorms>() { 
    @ProcessElement
    public void processElement(ProcessContext c) throws ParseException {
      String[] strArr = c.element().split(",");
      ClassxNorms xn = new ClassxNorms();
      double sample = new GammaDistribution(Double.parseDouble(strArr[11]), Double.parseDouble(strArr[12])).cumulativeProbability(Double.parseDouble(strArr[6]));
      xn.setDuration(Double.parseDouble(strArr[6]));
      xn.setAlpha(Double.parseDouble(strArr[11]));
      xn.setBeta(Double.parseDouble(strArr[12]));
      xn.setVolume(Double.parseDouble(strArr[13]));
      xn.setSpend(Double.parseDouble(strArr[14]));
      xn.setEfficiency(Double.parseDouble(strArr[15]));
      xn.setXnorm(Double.parseDouble(strArr[16]));
      xn.setKey(strArr[17]);
      xn.setGamma(sample);
      c.output(xn);
    }
  }));
...