Мой текущий лог c должен принимать имя пользователя и имя домена .
L oop на каждом контроллере домена в этом домене и получить запись пользователя.
Рассмотрим последнюю запись из всех них на основе отметки времени whenChanged, которую мы всегда фиксируем, и извлекаем из нее группы пользователей, т.е. memberOf .
Для каждой группы пользователей ,
L oop для каждого контроллера домена в домене и получение записи группы.
Рассмотрим последняя запись из всех них и выборка подгрупп группы, т.е. поле memberOf . (сделать это рекурсивно)
Отправить окончательный список обратно пользователю.
Примечание:
Есть также случаи дублирования , и могут быть редкие шансы бесконечность l oop также, скажем, где группы A присутствуют в B, а B присутствует в A.
Следовательно, если группа ищется один раз, ее не нужно искать снова.
Мой конфиг такой:
{
"domain_controllers":
["abc.domainname.companyname.com",
"pqr.domainname.companyname.com"],
"name": "domainname",
"user": "*",
"password": "*",
"base_dn": "DC=domainname,DC=companyname,DC=com"
}
Мой текущий код такой, как показано ниже. Но это очень много времени.
def get_latest_entry(self, domain_name, data_type, ldap_query):
domain = self.config.get_domain(domain_name)
ldap_attrs = ["memberOf", "whenChanged"]
ldap_entries_for_all_dcs = []
for hostname in domain.domain_controllers:
with "<<ldap_connection>>":
try:
ldap_connection.search(serahc_base=domain.base_dn, search_filter=ldap_query, attributes=ldap_attrs)
except ldap3.core.exceptions.LDAPInvalidFilterError:
return None
if not ldap_connection.entries:
continue
json_entry = <<self.converttojson>>(ldap_connection.entries[0]))
if json_entry["memberOf"]:
json_entry["memberOf"] = [str(memberOf.encode('utf-8').decode('latin-1')) for memberOf in json_entry["memberOf"]]
ldap_entries_for_all_dcs.append(json_entry)
return max(ldap_entries_for_all_dcs,
key=lambda ldap_entry: ldap_entry['whenChanged'])["memberOf"] if ldap_entries_for_all_dcs else None
def get_user_groups(self, sam_account_name, domain_name):
return self.get_latest_entry(domain_name=domain_name, data_type="user", ldap_query="(&(objectCategory=Person)(objectClass=user)(sAMAccountName={}))".format(sam_account_name))
def get_sub_groups(self, distinguished_name, domain_name):
return self.get_latest_entry(domain_name=domain_name, data_type="group", ldap_query="(&(objectCategory=Group)(objectClass=Group)(distinguishedName={}))".format( distinguished_name))
def get_recursive_groups(self, sam_account_name, domain_name):
def nestedgrouplookup(distinguished_name, domain_name):
if distinguished_name not in recursive_groups_list:
recursive_groups_list.append(distinguished_name)
nested_list = self.get_sub_groups(distinguished_name, domain_name)
if nested_list:
for memberOf in nested_list:
nestedgrouplookup(memberOf, domain_name)
recursive_groups_list = []
user_groups_list = self.get_user_groups(sam_account_name, domain_name)
if not user_groups_list:
return []
for distinguished_name in user_groups_list:
nestedgrouplookup(distinguished_name, domain_name)
return recursive_groups_list