Apache кодирование луча с Java - PullRequest
0 голосов
/ 18 февраля 2020

Я получаю ошибку при использовании метода "into" с FlatMapElements и MapElements. Пожалуйста, помогите мне решить проблему, так как я получаю сообщение об ошибке «Метод в (TypeDescriptor) не определен для типа FlatMapElements»

Он не определен для вышеуказанного TypeDescriptor, но не может заменить что-либо вместо это и я новичок в Apache Beam.

Plz help !!!!

Ниже приведен источник код :

<code>import org.apache.beam.examples.common.ExampleUtils;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.FlatMapElements;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;

/**
 * An example that counts words in Shakespeare and includes Beam best practices.
 *
 * <p>This class, {@link WordCount}, is the second in a series of four successively more detailed
 * 'word count' examples. You may first want to take a look at {@link MinimalWordCount}. After
 * you've looked at this example, then see the {@link DebuggingWordCount} pipeline, for introduction
 * of additional concepts.
 *
 * <p>For a detailed walkthrough of this example, see <a
 * href="https://beam.apache.org/get-started/wordcount-example/">
 * https://beam.apache.org/get-started/wordcount-example/ </a>
 *
 * <p>Basic concepts, also in the MinimalWordCount example: Reading text files; counting a
 * PCollection; writing to text files
 *
 * <p>New Concepts:
 *
 * <pre>
 *   1. Executing a Pipeline both locally and using the selected runner
 *   2. Using ParDo with static DoFns defined out-of-line
 *   3. Building a composite transform
 *   4. Defining your own pipeline options
 * 
* *

Концепция # 1: вы можете выполнить этот конвейер либо локально, либо используя, выбрав другого бегуна. * Теперь это параметры командной строки, а не жестко заданные, как в примере MinimalWordCount *. * *

Чтобы изменить исполнитель, укажите: * *

{@code
 * --runner=YOUR_SELECTED_RUNNER
 * }
* *

Чтобы выполнить этот конвейер, укажите локальный выходной файл (если используется {@code DirectRunner}) или * префикс вывода в поддерживаемом распределенном распределенном файле. файловая система. * *

{@code
 * --output=[YOUR_LOCAL_FILE | YOUR_OUTPUT_PREFIX]
 * }
* *

Входным файлом по умолчанию является набор данных c, содержащий текст «Короля Лира» Уильяма * Шекспира. Вы можете переопределить его и выбрать свой собственный ввод с помощью {@code --inputFile}. * / publi c class BeamPipeline {publi c stati c void main (String args []) {PipelineOptions options = PipelineOptionsFactory.create (); Pipeline p = Pipeline.create (параметры); PCollection csvRows = p.apply ("Читать из CSV", TextIO.Read.from ("./ reviews.csv")); // Шаг 2 - Извлекаем рейтинги и подсчитываем их. PCollection > RatingCounts = csvRows .apply («Извлечь рейтинги», FlatMapElements.into (TypeDescriptors.strings ()) .via (csvRow -> Arrays.asList (csvRow.split (",") [1]))) .apply (" Count Ratings ", Count. perElement ()); // Шаг 3 - Записать результаты в CSV RatingCounts .apply ("FormatResults", MapElements.into (TypeDescriptors.strings ()) .via ((KV RatingCount) -> RatingCount.getKey () + "" + RatingCount.getValue ())) .apply (TextIO.Write.to ("./ Rating_results"). withSuffix (". csv")); // Запустить конвейер и дождаться его завершения перед выходом из p.run (). WaitUntilFini sh (); }}

...