Несколько входов с MRJob - PullRequest
       12

Несколько входов с MRJob

6 голосов
/ 16 февраля 2012

Я пытаюсь научиться использовать Python API Yelp для MapReduce, MRJob. Их простой пример со счетчиком слов имеет смысл, но мне любопытно, как можно было бы обрабатывать приложение с несколькими входами. Например, вместо того, чтобы просто считать слова в документе, умножая вектор на матрицу. Я придумал это решение, которое работает, но чувствует себя глупо:

class MatrixVectMultiplyTast(MRJob):
    def multiply(self,key,line):
            line = map(float,line.split(" "))
            v,col = line[-1],line[:-1]

            for i in xrange(len(col)):
                    yield i,col[i]*v

    def sum(self,i,occurrences):
            yield i,sum(occurrences)

    def steps(self):
            return [self.mr (self.multiply,self.sum),]

if __name__=="__main__":
    MatrixVectMultiplyTast.run()

Этот код запускается ./matrix.py < input.txt, и причина его работы заключается в том, что матрица хранится в input.txt по столбцам с соответствующим значением вектора в конце строки.

Итак, следующая матрица и вектор:

enter image description here

представлены как input.txt как:

enter image description here

Короче, как бы я мог более естественно хранить матрицу и вектор в отдельных файлах и передавать их оба в MRJob?

Ответы [ 4 ]

3 голосов
/ 13 июня 2012

Если вам нужно обработать ваши необработанные данные с использованием другого (или того же набора данных row_i, row_j), вы можете:

1) Создайте корзину S3 для хранения копии ваших данных. Передайте местоположение этой копии вашему классу задач, например, self.options.bucket и self.options.my_datafile_copy_location в приведенном ниже коде. Предостережение: К сожалению, кажется, что весь файл должен быть "загружен" на компьютеры задач перед обработкой. Если соединение прерывается или занимает слишком много времени для загрузки, это задание может завершиться ошибкой. Вот код Python / MRJob, чтобы сделать это.

Поместите это в функцию отображения:

d1 = line1.split('\t', 1)
v1, col1 = d1[0], d1[1]
conn = boto.connect_s3(aws_access_key_id=<AWS_ACCESS_KEY_ID>, aws_secret_access_key=<AWS_SECRET_ACCESS_KEY>)
bucket = conn.get_bucket(self.options.bucket)  # bucket = conn.get_bucket(MY_UNIQUE_BUCKET_NAME_AS_STRING)
data_copy = bucket.get_key(self.options.my_datafile_copy_location).get_contents_as_string().rstrip()
### CAVEAT: Needs to get the whole file before processing the rest.
for line2 in data_copy.split('\n'):
    d2 = line2.split('\t', 1)
    v2, col2 = d2[0], d2[1]
    ## Now, insert code to do any operations between v1 and v2 (or c1 and c2) here:
    yield <your output key, value pairs>
conn.close()

2) Создайте домен SimpleDB и сохраните там все свои данные. Читайте здесь на boto и SimpleDB: http://code.google.com/p/boto/wiki/SimpleDbIntro

Ваш код сопоставления будет выглядеть так:

dline = dline.strip()
d0 = dline.split('\t', 1)
v1, c1 = d0[0], d0[1]
sdb = boto.connect_sdb(aws_access_key_id=<AWS_ACCESS_KEY>, aws_secret_access_key=<AWS_SECRET_ACCESS_KEY>)
domain = sdb.get_domain(MY_DOMAIN_STRING_NAME)
for item in domain:
    v2, c2 = item.name, item['column']
    ## Now, insert code to do any operations between v1 and v2 (or c1 and c2) here:
    yield <your output key, value pairs>
sdb.close()

Этот второй вариант может работать лучше, если у вас очень большие объемы данных, поскольку он может выполнять запросы для каждой строки данных, а не для всего объема сразу. Помните, что значения SimpleDB могут содержать не более 1024 символов, поэтому вам может потребоваться сжать / распаковать каким-либо методом, если ваши значения данных длиннее.

2 голосов
/ 24 декабря 2014

Это то, как я использую несколько входов и, основываясь на имени файла, делаю подходящие изменения на этапе отображения.

Программа бегуна:

from mrjob.hadoop import *


#Define all arguments

os.environ['HADOOP_HOME'] = '/opt/cloudera/parcels/CDH/lib/hadoop/'
print "HADOOP HOME is now set to : %s" % (str(os.environ.get('HADOOP_HOME')))
job_running_time = datetime.datetime.now().strftime('%Y-%m-%d_%H_%M_%S')
hadoop_bin = '/usr/bin/hadoop'
mode = 'hadoop'
hs = HadoopFilesystem([hadoop_bin])

input_file_names = ["hdfs:///app/input_file1/","hdfs:///app/input_file2/"]

aargs = ['-r',mode,'--jobconf','mapred.job.name=JobName','--jobconf','mapred.reduce.tasks=3','--no-output','--hadoop-bin',hadoop_bin]
aargs.extend(input_file_names)
aargs.extend(['-o',output_dir])
print aargs
status_file = True

mr_job = MRJob(args=aargs)
with mr_job.make_runner() as runner:
    runner.run()
os.environ['HADOOP_HOME'] = ''
print "HADOOP HOME is now set to : %s" % (str(os.environ.get('HADOOP_HOME')))

Класс MRJob:

class MR_Job(MRJob):
    DEFAULT_OUTPUT_PROTOCOL = 'repr_value'
    def mapper(self, _, line):
    """
    This function reads lines from file.
    """
    try:
        #Need to clean email.
        input_file_name = get_jobconf_value('map.input.file').split('/')[-2]
                """
                Mapper code
                """
    except Exception, e:
        print e

    def reducer(self, email_id,visitor_id__date_time):
    try:
        """
                Reducer Code
                """
    except:
        pass


if __name__ == '__main__':
    MRV_Email.run()
2 голосов
/ 13 сентября 2013

Фактический ответ на ваш вопрос заключается в том, что mrjob еще не поддерживает шаблон потокового соединения hadoop, который заключается в чтении переменной среды map_input_file (которая предоставляет свойство map.input.file), чтобы определить, какой тип файла вы используете. имеет дело с его путем и / или именем.

Возможно, вы все равно сможете это осуществить, если сможете легко определить, просто прочитав сами данные, к какому типу они относятся, как показано в этой статье:

http://allthingshadoop.com/2011/12/16/simple-hadoop-streaming-tutorial-using-joins-and-keys-with-python/

Однако это не всегда возможно ...

В противном случае myjob выглядит фантастически, и я бы хотел, чтобы они добавили поддержку для этого в будущем. А до тех пор это для меня довольно много.

1 голос
/ 13 июня 2012

Насколько я понимаю, вы не будете использовать MrJob, если не захотите использовать кластер Hadoop или сервисы Hadoop от Amazon, даже если в примере используется работа с локальными файлами.

MrJob в основном использует " Hadoop Streaming " для отправки задания.

Это означает, что все входные данные, указанные в виде файлов или папок из Hadoop, передаются в преобразователь, а последующие результаты - в преобразователь. Весь маппер получает срез ввода и считает все входные данные схематически одинаковыми, так что он равномерно анализирует и обрабатывает ключ, значение для каждого среза данных.

Исходя из этого понимания, входы схематически совпадают с картографом. Единственный способ включить две разные схематические данные - это чередовать их в одном и том же файле таким образом, чтобы картограф мог понять, что является векторными данными, а какие матричными.

You are actually doing it already.

Вы можете просто улучшить это, имея некоторый спецификатор, если строка является матричными или векторными данными. Как только вы видите векторные данные, к ним применяются предыдущие матричные данные.

matrix, 1, 2, ...
matrix, 2, 4, ...
vector, 3, 4, ...
matrix, 1, 2, ...
.....

Но процесс, который вы упомянули, работает хорошо. Вы должны иметь все данные схемы в одном файле.

Это все еще имеет проблемы, хотя. Карта K, V Reduce лучше работает, когда полная схема присутствует в одной строке и содержит один блок обработки целиком.

Насколько я понимаю, вы уже делаете это правильно, но я думаю, что Map-Reduce не подходит для такого рода данных. Я надеюсь, что кто-то прояснит это даже дальше, чем я мог.

...