В настоящее время у меня есть код scala, который выполняет итерацию по всем записям и находит из него композитный ключ в отдельном Dataframe / Dataset.
case class CompositePrimaryKey(asin: String, marketplace_id: String)
def getAllPrimaryKeys(tableName: String, partitionKey: String, sortKey: String): Dataset[CompositePrimaryKey] = {
new DynamoDBReader(tableName).read()
.map {
case (text, ddbWritable) => {
val item = ddbWritable.getItem
(item.get(partitionKey).getS, item.get(sortKey).getS)
}
}
.toDF(partitionKey, sortKey)
.as[CompositePrimaryKey]
}
Теперь у меня есть вариант использования, чтобы получить все записи полей . Я могу сделать это вручную, явно указав каждое поле по порядку, но список полей очень большой, и я хочу, чтобы он оставался в удобочитаемой форме. Итак, я ищу способ динамически перебирать каждую запись, извлекать каждое поле и преобразовывать их в DataFrame / DataSet.
def getAllKeys(tableName: String, keyFields: List): Dataset[AllKeyClass] = {
new DynamoDBReader(tableName).read()
.map {
case (text, ddbWritable) => {
val item = ddbWritable.getItem
// TODO: How to iterate through each field of keyFields here for each item and make tuple that I can convert to DF
}
}
.toDF() // TODO: I'll need to provide all keyFields names here also for header naming purposes i.e. .toDF('id','name','city', 'country', etc...)
.as[AllKeyClass]
}
Я борюсь с вложенным .map и как его лучше всего использовать. Любая помощь приветствуется.