Вам предоставлены данные клиента и данные счета, которые содержат информацию, описанную ниже.
Данные клиента: Поле Описание customerId Уникальный идентификатор для каждого человека клиент в данных. имя Имя клиента. Фамилия Фамилия клиента.
Данные учетной записи: Описание поля customerId Уникальный идентификатор каждого отдельного клиента в данных. accountId Уникальный идентификатор для каждой отдельной учетной записи в данных. сальдо Текущий баланс счета ($).
Ожидаемый формат вывода * + ---------- + ----------- + ------- --- + ---------------------------------------------- ----------------------- + -------------- + ----------- - + ----------------- + * | customerId | имя | фамилия | account | numberAccounts | totalBalance | среднегоBalance | * + ---------- + ----------- + ---------- + -------------- -------------------------------------------------- ----- + -------------- + ------------ + ---------------- - + * | IND0001 | Кристофер | Черный | [] | 0 | 0 | 0.0 | * | IND0002 | Мадлен | Керр | [[IND0002, ACC0155,323], [IND0002, ACC0262,60]] | 2 | 383 | 191,5 | * | IND0003 | Сара | Скиннер | [[IND0003, ACC0235,631], [IND0003, ACC0486,400], [IND0003, ACC0540,53]] | 3 | 1084 | 361.3333333333333 | * ... =
** Обратите внимание, что все значения должны быть заменены пустыми скобками. У меня возникают проблемы при замене нулевых значений. Может ли кто-нибудь помочь мне с этим?
Мой код:
case class CustomerAccountOutput(
customerId: String,
forename: String,
surname: String,
//Accounts for this customer
accounts: Seq[AccountData],
//Statistics of the accounts
numberAccounts: Int,
totalBalance: Long,
averageBalance: Double
)
//Create Datasets of sources
val customerDS = customerDF.as[CustomerData]
val accountDS = accountDF.withColumn("balance",'balance.cast("long")).as[AccountData]
// customerDS.show
// accountDS.show
//END GIVEN CODE
// Lets summarize the address dataset first
val acctstep1 = accountDS.groupByKey(_.customerId).mapGroups{(customer,account)=>
val acctstep1=if(account==null) List() else account.toList
val length: Double=acctstep1.map(_.balance).length
val sumBal: Double=acctstep1.map(_.balance).sum
val avgBal: Double=sumBal/length
(customer,acctstep1,length,sumBal,avgBal)
}.toDF("customerId","accounts","numberAccounts","totalBalance","averageBalance")
//Once the map values are computed lets use it join the two groups
//Cast function recommended by the mentor
val mergedDS =customerDS.join(acctstep1,Seq("customerId"),"outer")
.withColumn("numberAccounts", 'numberAccounts.cast("Int"))
.withColumn("totalBalance",'totalBalance.cast("Long"))
// mergedDS.show()
// Taking care of the NULL values
// Lets remove all the accounts with missing values for customers
val customerAccountOutputDS = mergedDS.as[CustomerAccountOutput].na.fill(0).show(false)
print(customerAccountOutputDS)
}