import numpy as np
import pandas as pd
import sparkobj as spk
from sklearn.datasets import make_blobs
from sklearn.ensemble import IsolationForest
def train_forest_per_partition_map_step(partition):
print('partition')
print(partition)
get_data = np.asarray(list(partition))
assert get_data.shape[1] == 2
return [IsolationForest(n_estimators=100,
contamination=0.15,
random_state=666).fit(get_data)]
def main():
spark = spk.getsparkobj()
n_samples = 300
outliers_fraction = 0.15
n_outliers = int(outliers_fraction * n_samples)
n_inliers = n_samples - n_outliers
rng = np.random.RandomState(666)
data = pd.DataFrame(data=np.concatenate([make_blobs(centers=[[0, 0], columns=["feat_1", "feat_2"]) # skipping some unrelevant
df = spark.createDataFrame(data=data)
df = df.rdd.repartition(numPartitions=3).toDF()
forest = df.rdd.mapPartitions(f=train_forest_per_partition_map_step).collect()
lines = df.rdd.collect().foreach(println)
# Reduce step: Combine scores from partitions.
forest[0].decision_function(data) # Partition 1 Isolation forest.
forest[1].decision_function(data) # Partition 2 Isolation forest.
forest[2].decision_function(data) # Partition 3 Isolation forest.
if __name__ == '__main__':
main()
Есть ли способ получить результаты печати в функции "train_forest_per_partition_map_step" после того, как разделы были выполнены? Я пробовал df.rdd.collect (). Foreach (println), но продолжаю получать ошибку атрибута
AttributeError: 'list' object has no attribute 'foreach'
AttributeError Traceback (most recent call last)
in engine
1 if __name__ == '__main__':
----> 2 main()
<ipython-input-1-c5cff78d4b35> in main()
25
26 forest = df.rdd.mapPartitions(f=train_forest_per_partition_map_step).collect()
---> 27 lines = df.rdd.take(100).foreach(println)
28
29 # Reduce step: Combine scores from partitions.
AttributeError: 'list' object has no attribute 'foreach'
Думаю, это доступно только для Scala, но хотелось бы знать эквивалент Python