代码拉取完成,页面将自动刷新
同步操作将从 FX/sync_ding_to_ldap 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
# -*- coding: utf-8 -*-
from ldap3 import Server, Connection, ALL, MODIFY_REPLACE, MODIFY_DELETE, HASHED_SALTED_SHA
from ldap3.utils.hashed import hashed
from tools import convert_chinese_to_pinyin, name_add_number, get_random_password
class LdapApi(object):
def __init__(self, host, user, password, baseDn, use_ssl):
"""
:param host: ad的host
:param user: ad的dn or user
:param password: ad的密码
:param baseDn: 用作search base
"""
self.host = host
self.user = user
self.password = password
self.baseDn = baseDn
self.use_ssl = use_ssl
self.c = self.bind()
def bind(self):
"""
:return: 创建并返回一个LDAP连接
"""
print(self.host, self.use_ssl, self.user, self.password)
s = Server(self.host, get_info=ALL, use_ssl=self.use_ssl)
c = Connection(s, user=self.user, password=self.password, auto_bind=True)
return c
def search(self, search_filter='(objectClass=organizationalUnit)', attributes=None):
"""
:return: 搜索LDAP,返回一个dn列表,例如:['ou=测试组,dc=my-company,dc=com', ...]
"""
query_list = []
self.c.search(
search_base=self.baseDn,
search_filter=search_filter,
attributes=attributes,
)
# print(self.c.entries) # 打印搜索结果
# print(self.c.response)
for entry in self.c.response:
try:
query_list.append(entry['dn'])
except Exception as e:
print("There are no dn in entry: " + str(entry) + 'Exception: ' + str(e))
return query_list
def list_ou(self, search_filter='(objectClass=organizationalUnit)', attributes=None):
"""
:return: 搜索LDAP,返回一个ou的dn列表
"""
return self.search(search_filter=search_filter, attributes=attributes)
def list_user(self, search_filter='(objectClass=inetOrgPerson)', attributes=None):
"""
:return: 搜索LDAP,返回一个user的dn列表
"""
query_list = []
self.c.search(
search_base=self.baseDn,
search_filter=search_filter,
attributes=attributes,
)
for entry in self.c.response:
try:
query_list.append(entry)
except Exception as e:
print("There are no attr in entry: " + str(entry) + 'Except: ' + str(e))
return query_list
def search_user_uid(self, uid):
"""
:param uid: inetOrgPerson类型用户的 uid,例如:zhangsan
:return: True/False
"""
query_list = []
filter = '(&(uid=%s)(objectClass=inetOrgPerson))' % uid
self.c.search(
search_base=self.baseDn,
search_filter=filter,
attributes=['uid'],
)
for entry in self.c.response:
try:
query_list.append(entry['attributes'])
except Exception as e:
print("There are no attr in entry: " + str(entry) + 'Except: ' + str(e))
if len(query_list) == 0:
print("You can use uid: %s to create user" % uid)
return False
else:
print("Ldap user uid: %s already exists, can not crate user!!!" % uid)
return True
def modify_dn(self, dn, changes):
"""
修改一个LDAP条目
:param dn: 'uid=fuxiao1,ou=测试组,ou=People,dc=my-company,dc=com'
:param changes: {'mail': [(MODIFY_REPLACE, ['abc@123.com'])]}
:return:
"""
self.c.modify(dn, changes)
return self.c.result
def mv_dn(self, dn, relative_dn, new_superior):
"""
移动一个dn
:param dn: like 'uid=fuxiao1,ou=组2,ou=People,dc=my-company,dc=com'
:param relative_dn: 'uid=fuxiao1'
:param new_superior: 'ou=新组织,ou=People,dc=my-company,dc=com'
:return:
"""
self.c.modify_dn(dn, relative_dn, new_superior=new_superior)
return self.c.result
def create_ou(self, ou):
"""
:param ou: dn中的ou,例如 'ou=研发部,ou=People,dc=my-company,dc=com'
:return:
"""
# add_dn = ou + ',' + self.baseDn
res = self.c.add(ou, 'organizationalUnit')
print("Add {} result: {}".format(ou, res))
def get_all_user_dict(self):
user_dict = {}
for user in self.list_user(attributes=['sn', 'uid']):
# 注意,这里使用ldap用户的sn属性和钉钉的userid对应
sn = user['attributes']['sn'][0]
uid = user['attributes']['uid'][0]
user['uid']= uid
if sn:
user_dict[str(sn)] = user
return user_dict
def create_user(self, name, ou, ding_userid, mobile, email, title, photo):
"""
:param name: '张三丰'
:param ou: 'ou=研发部,ou=People,dc=my-company,dc=com' 注意没有uid
:param ding_userid: 'xxx'
:param mobile: '123456'
:param email: '11@ww.com'
:param title: '总经理'
:param photo: 'https://static-legacy.dingtalk.com/media/lQLPM5MFWauWvnbNAfLNAfKwT4V6sLLYMEAEWkmvwcALAA_498_498.png'
:return:
"""
uid = convert_chinese_to_pinyin(name)
password = get_random_password(15)
# print(uid)
while self.search_user_uid(uid): # 如果ldap中有重名的uid,就将名字加上数字,直到不重名
uid = name_add_number(uid)
USER_DN = 'uid={},{}'.format(uid, ou)
objectClass = ['top', 'person', 'inetOrgPerson', 'organizationalPerson']
attributes = {'uid': uid,
'cn': name,
'sn': ding_userid,
'mobile': mobile,
'mail': email,
'userPassword': password,
'title': title,
'labeledURI': photo
}
# perform the Add operation
self.c.add(USER_DN, objectClass, attributes)
print(self.c.result)
# 增加用户之后,设置默认密码
# DEFAULT_PWD = 'abcd@12345'
# print(USER_DN)
# self.change_password(USER_DN, DEFAULT_PWD)
return
def change_password(self, user_dn, password):
hashed_password = hashed(HASHED_SALTED_SHA, password)
changes = {
'userPassword': [(MODIFY_REPLACE, [hashed_password])]
}
success = self.c.modify(user_dn, changes=changes)
print(self.c.result)
return success
def delete_password(self, user_dn):
changes = {
'userPassword': [(MODIFY_DELETE, [])]
}
success = self.c.modify(user_dn, changes=changes)
print(self.c.result)
return success
@property
def unbind(self):
self.c.unbind()
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。