Использование Spark MapReduce для вызова другой функции и агрегирования - PullRequest
0 голосов
/ 14 января 2019

Я ужасно незнаком с искрой, но я почти уверен, что существует хороший способ сделать то, что я хочу, гораздо быстрее, чем я это делаю в настоящее время.

По сути, у меня есть корзина S3, в которой много JSON-данных из твиттера. Я хочу просмотреть все эти файлы, получить текст из JSON, выполнить анализ настроений (в настоящее время использующий Stanford NLP) для текста, а затем загрузить Tweet + Sentiment в базу данных (сейчас я использую динамо, но это это не заигрывание)

Код, который у меня сейчас есть,

        /**
         * Per thread:
         * 1. Download a file
         * 2. Do sentiment on the file -> output Map<String, List<Float>>
         * 3. Upload to Dynamo: (a) sentiment (b) number of tweets (c) timestamp
         *
         */

        List<String> keys = s3Connection.getKeys();

        ThreadPoolExecutor threads = new ThreadPoolExecutor(40, 40, 10000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(10));
        threads.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

        for (String key : keys) {
                threads.submit(new Thread(() -> {
                    try {
                        S3Object s3Object = s3Connection.getObject(key);
                        Map<String, List<Float>> listOfTweetsWithSentiment = tweetSentimentService.getTweetsFromJsonFile(s3Object.getObjectContent());
                        List<AggregatedTweets> aggregatedTweets = tweetSentimentService.createAggregatedTweetsFromMap(listOfTweetsWithSentiment, key);

                        for (AggregatedTweets aggregatedTweet : aggregatedTweets) {
                            System.out.println(aggregatedTweet);
                            tweetDao.putItem(aggregatedTweet);
                        }
                    } catch (Exception e) {
                        System.out.println(e.getMessage());
                    }
                }));
        }

Это работает и хорошо. И я смог ускорить процесс всего до 2 часов, запустив этот код в определенных диапазонах дат (т. Е. GetKeys получает ключи только для определенных диапазонов дат) и раскручивая множество экземпляров этого процесса в разных EC2, каждый из которых действует по-своему. диапазон дат.

Тем не менее, должен быть более быстрый способ сделать это с хорошим уменьшением ole карты, но я просто не знаю, как вообще начать изучать это. Можно ли выполнить анализ настроений на моей карте, а затем уменьшить его на основе отметки времени?

Кроме того, я изучал использование AWS Glue, но не вижу хорошего способа использовать там библиотеку Stanford NLP.

Любая помощь будет принята с благодарностью.

1 Ответ

0 голосов
/ 14 января 2019

Да, вы можете сделать это с Apache Spark. Существует множество способов разработки вашего приложения, настройки инфраструктуры и т. Д. Я предлагаю простой дизайн:

  1. Вы находитесь в AWS, поэтому создайте кластер EMR с помощью Spark. Было бы полезно включить Zeppelin для интерактивной отладки.

  2. Spark использует несколько абстракций данных. Ваши друзья - RDD и Datasets (о них читайте в документации). Код для чтения данных в наборы данных может быть таким же:

    SparkSession ss = SparkSession.builder().getOrCreate();
    Dataset<Row> dataset = ss.read("s3a://your_bucket/your_path");
    
  3. Теперь у вас есть Dataset<Row>. Это полезно для SQL-подобных операций. Для анализа вам необходимо преобразовать его в Spark RDD:

    JavaRDD<Tweet> analyticRdd = dataset.toJavaRDD().map(row -> {
      return TweetsFactory.tweetFromRow(row);
    });
    
  4. Итак, с analyticRdd вы можете проводить анализ. Только не забудьте сделать все ваши сервисы, которые работают с данными, Сериализуемыми.

...