Hadoop MapReduce (с использованием Python) запускает KeyError на Pandas DataFrame - PullRequest
0 голосов
/ 11 мая 2018

Я пытаюсь обработать фрейм данных с помощью MapReduce.Я изначально создал скрипт для маппера и попытался запустить его с локального терминала, и он работает правильно:

mapper.py

import sys
import string
import pandas as pd

df = pd.read_csv(sys.stdin)

#cleaning relevant fields
df['Time'] = pd.to_datetime(df['Time'], unit='s').apply(lambda x : x.year)
df['Summary'] = df['Summary'].str.lower()
df['Summary'] = df['Summary'].str.replace('[{}]'.format(string.punctuation), '')

for index, row in df.iterrows():
    key = ''
    key += str(row.iloc[7])
    key += '-'
    for word in str(row.iloc[8]).split():
        key += word
        print('{}\t{}'.format(key, 1))
        key = key.replace(word, '')

Затем я разработал reducer.py :

import sys

current_key = None
current_count = 0
key = None

for line in sys.stdin:
    line = line.strip()

    key, count = line.split('\t', 1)

    try:
        count = int(count)
    except ValueError:
        continue

    if current_key == key:
        current_count += count
    else:
        if current_key:
            print(current_key, current_count)

        current_count = count
        current_key = key

if current_key == key:
    print('{}\t{}'.format(current_key, current_count))

На этом этапе я попытался запустить обработку через Hadoop с помощью следующей команды (очевидно, после загрузки файла .csv через HDFS):

bin/hadoop jar share/hadoop/tools/lib/hadoop-streaming-3.1.0.jar \
-file /mypath/mapper.py    -mapper 'python /mypath/mapper.py' \
-file /mypath/reducer.py   -reducer 'python /mypath/reducer.py' \
-input /user/andreone/input/Reviews.csv -output /user/andreone/output/out_1

Но я получаю следующую ошибку:

[...]
2018-05-11 17:46:03,706 INFO mapreduce.Job:  map 100% reduce 0%
2018-05-11 17:46:03,983 INFO streaming.PipeMapRed: R/W/S=1000/0/0 in:NA [rec/s] out:NA [rec/s]
2018-05-11 17:46:04,039 INFO streaming.PipeMapRed: R/W/S=10000/0/0 in:NA [rec/s] out:NA [rec/s]
2018-05-11 17:46:04,793 INFO streaming.PipeMapRed: R/W/S=100000/0/0 in:100000=100000/1 [rec/s] out:0=0/1 [rec/s]
2018-05-11 17:46:05,721 INFO streaming.PipeMapRed: R/W/S=200000/0/0 in:100000=200000/2 [rec/s] out:0=0/2 [rec/s]
Traceback (most recent call last):
  File "/usr/lib/python3.6/site-packages/pandas/core/indexes/base.py", line 2525, in get_loc
    return self._engine.get_loc(key)
  File "pandas/_libs/index.pyx", line 117, in pandas._libs.index.IndexEngine.get_loc
  File "pandas/_libs/index.pyx", line 139, in pandas._libs.index.IndexEngine.get_loc
  File "pandas/_libs/hashtable_class_helper.pxi", line 1265, in pandas._libs.hashtable.PyObjectHashTable.get_item
  File "pandas/_libs/hashtable_class_helper.pxi", line 1273, in pandas._libs.hashtable.PyObjectHashTable.get_item
KeyError: 'Time'

During handling of the above exception, another exception occurred:
[....]

Кажется, он не нашел ключ 'Time'.Кто-нибудь может мне помочь?

1 Ответ

0 голосов
/ 17 марта 2019

Извините за поздний ответ, но я решил это сегодня (после нахождения вашего вопроса здесь), просто переименовав столбцы в те же имена:

# Read the data to a Pandas Dataframe
inputDataCSV = pandas.read_csv(sys.stdin, dtype={"Date/Time" : object, "Lat" : object, "Lon" : object, "Base" : object}, encoding='ascii')
# Rename columns
inputDataCSV.columns = ['Date/Time', 'Lat', 'Lon', 'Base']

Это мой первый пост. Надеюсь, это кому-нибудь поможет.

...