Я работаю с СДР, и каждая запись в СДР будет обновлять значение модели. Значение на каждом уровне необходимо запомнить, чтобы обновить его до нового значения. Теперь эта задача очень проста, если мы можем где-то хранить значение модели, но мы не должны собирать значения в каждой точке.
записи данных выглядят следующим образом: (имя data_label feature_1 feature_2 feature_3 ...... feature_n)
Я написал функцию отображения
def iterators(c, delta = 0.002):
new = list(c.split(" "))
t = new[1] #data_label
F = new[2:] #list of features
try:
model
except:
model = {}
score = spamminess(F,model) #a function that uses the features and the previous model
#to output a float
prob = 1.0/(1+exp(-score))
for f in F:
try:
model[f]
except:
model[f] =0
if t == 'label_1':
model[f] += (1.0-prob)*delta
elif t == 'label_2':
model[f] += -1*prob*delta
else:
model[f] = -999999999999
yield model
, но когда Я запускаю map или mapPartition на моем rdd с помощью этой функции, системе это не нравится, я не уверен, что не так. Я получаю следующие ошибки, которые не помогают мне понять, что не так:
enter code here
p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts))
**res = self.context.runJob(self, takeUpToNumLeft, p)**
mappedRDD = rdd.mapPartitions(partitionFunc)
**sock_info = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)**
return list(_load_from_socket(sock_info, mappedRDD._jrdd_deserializer))
return_value = get_return_value(
**answer, self.gateway_client, self.target_id, self.name)**
raise Py4JJavaError(
"An error occurred while calling {0}{1}{2}.\n".
**format(target_id, ".", name), value)**
else:
raise Py4JError(
Может кто-нибудь подсказать, что говорит об ошибке и как ее исправить?