Я пытаюсь обработать фрейм данных с помощью MapReduce.Я изначально создал скрипт для маппера и попытался запустить его с локального терминала, и он работает правильно:
mapper.py
import sys
import string
import pandas as pd
df = pd.read_csv(sys.stdin)
#cleaning relevant fields
df['Time'] = pd.to_datetime(df['Time'], unit='s').apply(lambda x : x.year)
df['Summary'] = df['Summary'].str.lower()
df['Summary'] = df['Summary'].str.replace('[{}]'.format(string.punctuation), '')
for index, row in df.iterrows():
key = ''
key += str(row.iloc[7])
key += '-'
for word in str(row.iloc[8]).split():
key += word
print('{}\t{}'.format(key, 1))
key = key.replace(word, '')
Затем я разработал reducer.py :
import sys
current_key = None
current_count = 0
key = None
for line in sys.stdin:
line = line.strip()
key, count = line.split('\t', 1)
try:
count = int(count)
except ValueError:
continue
if current_key == key:
current_count += count
else:
if current_key:
print(current_key, current_count)
current_count = count
current_key = key
if current_key == key:
print('{}\t{}'.format(current_key, current_count))
На этом этапе я попытался запустить обработку через Hadoop с помощью следующей команды (очевидно, после загрузки файла .csv через HDFS):
bin/hadoop jar share/hadoop/tools/lib/hadoop-streaming-3.1.0.jar \
-file /mypath/mapper.py -mapper 'python /mypath/mapper.py' \
-file /mypath/reducer.py -reducer 'python /mypath/reducer.py' \
-input /user/andreone/input/Reviews.csv -output /user/andreone/output/out_1
Но я получаю следующую ошибку:
[...]
2018-05-11 17:46:03,706 INFO mapreduce.Job: map 100% reduce 0%
2018-05-11 17:46:03,983 INFO streaming.PipeMapRed: R/W/S=1000/0/0 in:NA [rec/s] out:NA [rec/s]
2018-05-11 17:46:04,039 INFO streaming.PipeMapRed: R/W/S=10000/0/0 in:NA [rec/s] out:NA [rec/s]
2018-05-11 17:46:04,793 INFO streaming.PipeMapRed: R/W/S=100000/0/0 in:100000=100000/1 [rec/s] out:0=0/1 [rec/s]
2018-05-11 17:46:05,721 INFO streaming.PipeMapRed: R/W/S=200000/0/0 in:100000=200000/2 [rec/s] out:0=0/2 [rec/s]
Traceback (most recent call last):
File "/usr/lib/python3.6/site-packages/pandas/core/indexes/base.py", line 2525, in get_loc
return self._engine.get_loc(key)
File "pandas/_libs/index.pyx", line 117, in pandas._libs.index.IndexEngine.get_loc
File "pandas/_libs/index.pyx", line 139, in pandas._libs.index.IndexEngine.get_loc
File "pandas/_libs/hashtable_class_helper.pxi", line 1265, in pandas._libs.hashtable.PyObjectHashTable.get_item
File "pandas/_libs/hashtable_class_helper.pxi", line 1273, in pandas._libs.hashtable.PyObjectHashTable.get_item
KeyError: 'Time'
During handling of the above exception, another exception occurred:
[....]
Кажется, он не нашел ключ 'Time'.Кто-нибудь может мне помочь?