Часть кода ниже пытается сделать следующее:
Для каждого customer_code
в sdf1
проверьте, отображается ли этот код клиента в sdf2
. Если это так, замените df1.actual_related_customer
на df2.actual_related_customer
.
Этот код не работает, потому что я неправильно получаю доступ к своим строкам в df2
. Как я могу достичь вышеупомянутой цели? (если у вас есть другое предложение, чем индексы, стреляйте!)
sdf1 = sqlCtx.createDataFrame(
[
('customer1', 'customer_code1', 'other'),
('customer2', 'customer_code2', 'other'),
('customer3', 'customer_code3', 'other'),
('customer4', 'customer_code4', 'other')
],
('actual_related_customer', 'customer_code', 'other')
)
sdf2 = sqlCtx.createDataFrame(
[
('Peter', 'customer_code1'),
('Deran', 'customer_code5'),
('Christopher', 'customer_code3'),
('Nick', 'customer_code4')
],
('actual_related_customer', 'customer_code')
)
def right_customer(x,y):
for row in sdf2.collect() :
if x == row['customer_code'] :
return row['actual_related_customer']
return y
fun1 = udf(right_customer, StringType())
test = sdf1.withColumn(
"actual_related_customer",
fun1(sdf1.customer_code, sdf1.actual_related_customer)
)
И мой желаемый результат будет выглядеть так:
desired_output = sqlCtx.createDataFrame(
[
('Peter', 'customer_code1', 'other'),
('customer2', 'customer_code2', 'other'),
('Christopher', 'customer_code3', 'other'),
('Nick', 'customer_code4', 'other')
],
('actual_related_customer', 'customer_code', 'other')
)