AWS Клей Dynami c Фильтрация - отфильтруйте один динамический c кадр, используя другой динамический c кадр - PullRequest
0 голосов
/ 02 мая 2020

Я пытаюсь отфильтровать динамическую c фильтрацию на основе данных, находящихся в другом динамическом c фрейме, я работаю над примером реляционного соединения , в этом коде персонажа и динамического членства c кадры объединяются по идентификатору, но я хотел бы отфильтровать лиц на основе идентификатора, присутствующего в членстве DF, ниже приведен код, в который я помещаю stati c значения

    import sys
from awsglue.transforms import Join
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

glueContext = GlueContext(SparkContext.getOrCreate())

# catalog: database and table names
db_name = "legislators"
tbl_persons = "persons_json"
tbl_membership = "memberships_json"
tbl_organization = "organizations_json"



# Create dynamic frames from the source tables 
persons = glueContext.create_dynamic_frame.from_catalog(database=db_name, table_name=tbl_persons)
memberships = glueContext.create_dynamic_frame.from_catalog(database=db_name, table_name=tbl_membership)
orgs = glueContext.create_dynamic_frame.from_catalog(database=db_name, table_name=tbl_organization)

persons = persons.drop_fields(['links', 'identifiers','other_names', 'images','contact_details'])


# Keep the fields we need and rename some.
orgs = orgs.drop_fields(['other_names', 'identifiers','links'])


fileredPersons = Filter.apply(frame = persons,
                              f = lambda x: x["id"] in ["0005af3a-9471-4d1f-9299-737fff4b9b46", "00aca284-9323-4953-bb7a-1bf6f5eefe95"])
print "Filtered record count:  ", fileredPersons.count()

ниже приведен лог фильтра c

 fileredPersons = Filter.apply(frame = persons,
                                  f = lambda x: x["id"] in ["0005af3a-9471-4d1f-9299-737fff4b9b46", "00aca284-9323-4953-bb7a-1bf6f5eefe95"])

Я хотел бы передать столбец person_id, присутствующий в членстве DF, в условие функции фильтра, в основном фильтрует лиц, имеющих членство, любая помощь будет принята.

1 Ответ

1 голос
/ 02 мая 2020

Вы можете просто выполнить внутреннее объединение вместо фильтрации, как

persons_filtered = persons.alias('persons').join(memberships, persons.id==memberships.id).select('persons.*')

Это даст вам только отфильтрованные значения. Если ваше членство в df невелико или является своего рода поиском, вы можете даже транслировать его для более быстрых результатов

from pyspark.sql.functions import broadcast
persons_filtered = persons.alias('persons').join(broadcast(memberships), persons.id==memberships.id).select('persons.*')

Надеюсь, это поможет.

...