Я получаю ошибку при использовании метода "into" с FlatMapElements и MapElements. Пожалуйста, помогите мне решить проблему, так как я получаю сообщение об ошибке «Метод в (TypeDescriptor) не определен для типа FlatMapElements»
Он не определен для вышеуказанного TypeDescriptor, но не может заменить что-либо вместо это и я новичок в Apache Beam.
Plz help !!!!
Ниже приведен источник код :
<code>import org.apache.beam.examples.common.ExampleUtils;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.FlatMapElements;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
/**
* An example that counts words in Shakespeare and includes Beam best practices.
*
* <p>This class, {@link WordCount}, is the second in a series of four successively more detailed
* 'word count' examples. You may first want to take a look at {@link MinimalWordCount}. After
* you've looked at this example, then see the {@link DebuggingWordCount} pipeline, for introduction
* of additional concepts.
*
* <p>For a detailed walkthrough of this example, see <a
* href="https://beam.apache.org/get-started/wordcount-example/">
* https://beam.apache.org/get-started/wordcount-example/ </a>
*
* <p>Basic concepts, also in the MinimalWordCount example: Reading text files; counting a
* PCollection; writing to text files
*
* <p>New Concepts:
*
* <pre>
* 1. Executing a Pipeline both locally and using the selected runner
* 2. Using ParDo with static DoFns defined out-of-line
* 3. Building a composite transform
* 4. Defining your own pipeline options
*
* *
Концепция # 1: вы можете выполнить этот конвейер либо локально, либо используя, выбрав другого бегуна. * Теперь это параметры командной строки, а не жестко заданные, как в примере MinimalWordCount *. * *
Чтобы изменить исполнитель, укажите: * *
{@code
* --runner=YOUR_SELECTED_RUNNER
* }
* *
Чтобы выполнить этот конвейер, укажите локальный выходной файл (если используется {@code DirectRunner}) или * префикс вывода в поддерживаемом распределенном распределенном файле. файловая система. * *
{@code
* --output=[YOUR_LOCAL_FILE | YOUR_OUTPUT_PREFIX]
* }
* *
Входным файлом по умолчанию является набор данных c, содержащий текст «Короля Лира» Уильяма * Шекспира. Вы можете переопределить его и выбрать свой собственный ввод с помощью {@code --inputFile}. * / publi c class BeamPipeline {publi c stati c void main (String args []) {PipelineOptions options = PipelineOptionsFactory.create (); Pipeline p = Pipeline.create (параметры); PCollection csvRows = p.apply ("Читать из CSV", TextIO.Read.from ("./ reviews.csv")); // Шаг 2 - Извлекаем рейтинги и подсчитываем их. PCollection > RatingCounts = csvRows .apply («Извлечь рейтинги», FlatMapElements.into (TypeDescriptors.strings ()) .via (csvRow -> Arrays.asList (csvRow.split (",") [1]))) .apply (" Count Ratings ", Count. perElement ()); // Шаг 3 - Записать результаты в CSV RatingCounts .apply ("FormatResults", MapElements.into (TypeDescriptors.strings ()) .via ((KV RatingCount) -> RatingCount.getKey () + "" + RatingCount.getValue ())) .apply (TextIO.Write.to ("./ Rating_results"). withSuffix (". csv")); // Запустить конвейер и дождаться его завершения перед выходом из p.run (). WaitUntilFini sh (); }}