AD域生産代碼使用說明
-
- 1. 每次需要檢查的常量
- 2. 去隐私後的代碼
- 3. 如何使用&測試案例
1. 每次需要檢查的常量
-
CUSTOM_SAMA = '待填寫内容’
HAND的員工填
,zy的人填HAND
,都是大寫,這個将拼接工号補零到六位形成唯一的使用者賬号;來自HAND工号為123的則賬号為HAND000123,來自zy工号為23345的賬号為Z023345;Z
-
ENABLED_BASE_DN = '待填寫内容’
和AD域中現有架構對應,可用在windows server伺服器用ADSI編輯器檢視;
- zy的合作夥伴的填寫成
OU=合作夥伴,DC=XXX,DC=com
- zy自身的填寫成
OU=上海總部,OU=XX科技,DC=XXX,DC=com
;
新增員工表格裡面的部門字段是正向的,從eip搜到的或從資料庫導出的格式是
,因為設計之初在XX科技.一級子部門.二級子部門.三級子部門
和XX科技
中間加了一層一級子部門
,因而需要在表格中填寫成上海總部
;一級子部門.二級子部門.三級子部門
ENABLED_BASE_DN
字元串和表格中的部門路徑處理後将會拼接在一起,作為該使用者在AD域中的架構:
OU=三級子部門,OU=二級子部門,OU=一級子部門,OU=上海總部,OU=XX科技,DC=XXX,DC=com
- zy的合作夥伴的填寫成
以上是每次需要注意區分的兩種情況,因屬于zy和不屬于zy的人員部門架構設計不同,其自定義字首有些許變化;
其他參數說明
和代碼同路徑下有待新增員工清單表格檔案NEW_PERSON_EXCEL,支援一次性填寫多個員工資訊,用來批量為員工建立賬号;
NEW_PERSON_EXCEL = “new_person_list.xlsx”

2. 去隐私後的代碼
項目所有内容:
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
'''
@Author: randolph
@Date: 2020-05-27 14:33:03
@LastEditors: randolph
@LastEditTime: 2020-07-16 09:59:20
@version: 1.0
@Contact: [email protected]
@Descripttion: 用python3+ldap3管理windows server2019的AD域;
'''
import json
import logging.config
import os
import random
import re
import string
from time import sleep
import pandas as pd
import winrm
import yaml
from ldap3 import (ALL, ALL_ATTRIBUTES, MODIFY_REPLACE, NTLM, SASL, SIMPLE,
SUBTREE, SYNC, Connection, Server)
from tqdm import tqdm
# 日志配置
LOG_CONF = 'pro_logging.yaml'
# AD域設定
LDAP_IP = '' # LDAP本地伺服器IP
USER = '' # LDAP本地伺服器IP
PASSWORD = "" # LDAP本地伺服器管理者密碼
CUSTOM_SAMA = 'Z' # 自定義登入名英文字首 每次需要确定
DISABLED_BASE_DN = '' # 離職賬戶所在OU
ENABLED_BASE_DN = "OU=上海總部,DC=XXX,DC=com" # 正式員工賬戶所在OU 每次需要确定
USER_SEARCH_FILTER = '(objectclass=user)' # 隻擷取使用者對象 過濾條件
OU_SEARCH_FILTER = '(objectclass=organizationalUnit)' # 隻擷取OU對象 過濾條件
DISABLED_USER_FLAG = [514, 546, 66050, 66080, 66082] # 禁用賬戶UserAccountControl對應十進制值清單
ENABLED_USER_FLAG = [512, 544, 66048, 262656] # 啟用賬戶UserAccountControl對應十進制值清單
# excel表格
PERSON_EXCEL = "person_list.xlsx" # 全量公司員工資料
NEW_PERSON_EXCEL = "new_person_list.xlsx" # 新增員工表格
PWD_PATH = 'pro_pwd.txt'
# WINRM資訊 無需設定
WINRM_USER = 'Administrator'
WINRM_PWD = PASSWORD
class AD(object):
'''AD域的操作
'''
def __init__(self):
'''初始化加載日志配置
AD域連接配接
AD基礎資訊加載
'''
# 初始化加載日志配置
self.setup_logging(path=LOG_CONF)
SERVER = Server(host=LDAP_IP,
port=636, # 636安全端口
use_ssl=True,
get_info=ALL,
connect_timeout=3) # 連接配接逾時為3秒
try:
self.conn = Connection(
server=SERVER,
user=USER,
password=PASSWORD,
auto_bind=True,
read_only=False, # 禁止修改資料True
receive_timeout=10) # 10秒内沒傳回消息則觸發逾時異常
logging.info("distinguishedName:%s res: %s" % (USER, self.conn.bind()))
except BaseException as e:
logging.error("AD域連接配接失敗,請檢查IP/賬戶/密碼")
finally:
self.conn.closed
def setup_logging(self, path=LOG_CONF, default_level=logging.INFO, env_key="LOG_CFG"):
value = os.getenv(env_key, None)
if value:
path = value
if os.path.exists(path):
with open(path, "r") as f:
config = yaml.safe_load(f)
logging.config.dictConfig(config)
else:
logging.basicConfig(level=default_level)
def get_users(self, attr=ALL_ATTRIBUTES):
'''
@param {type}
@return: total_entries所有使用者
@msg: 擷取所有使用者
'''
entry_list = self.conn.extend.standard.paged_search(
search_base=ENABLED_BASE_DN,
search_filter=USER_SEARCH_FILTER,
search_scope=SUBTREE,
attributes=attr,
paged_size=5,
generator=False) # 關閉生成器,結果為清單
total_entries = 0
for entry in entry_list:
total_entries += 1
logging.info("共查詢到記錄條目: " + str(total_entries))
return entry_list
def get_ous(self, attr=None):
'''
@param {type}
@return: res所有OU
@msg: 擷取所有OU
'''
self.conn.search(search_base=ENABLED_BASE_DN,
search_filter=OU_SEARCH_FILTER,
attributes=attr)
result = self.conn.response_to_json()
res_list = json.loads(result)['entries']
return res_list[::-1]
def handle_excel(self, path):
'''
@param path{string} excel檔案絕對路徑
@return: result: { 'page_flag': True, 'person_list': [[], [], ...] }
@msg: 表格檔案預處理
1.增加行列數判————行數決定AD的查詢是否分頁,列數用以判斷必須列資料完整性與補充列;
2.判斷必須列【工号|姓名|部門】是否存在且是否有空值
3.人員清單的使用sort函數排序key用lambda函數,排序條件(i[2].count('.'), i[2], i[0])為(部門層級、部門名稱、工号)
'''
try:
# 1.開始源檔案格式掃描
df = pd.read_excel(path, encoding='utf-8', error_bad_lines=False) # 讀取源檔案
a, b = df.shape # 表格行列數
cols = df.columns.tolist() # 表格列名清單
is_ex_null = df.isnull().any().tolist() # 列是否存在空值
dic = dict(zip(cols, is_ex_null)) # 存在空值的列
if int("工号" in cols) + int("姓名" in cols) + int("部門" in cols) < 3: # 判斷必須列是否都存在
logging.error("表格缺少必要列【工号|姓名|部門】請選擇正确的源檔案;或者将相應列列名修改為【工号|姓名|部門】")
exit()
elif int(dic["工号"]) + int(dic["姓名"]) + int(dic["部門"]) > 0: # 判斷必須列是否有空值
logging.error("必要列存在空值記錄,請檢查補全後重試:" + '\n' + str(df[df.isnull().values == True]))
else:
df = pd.read_excel(path, encoding='utf-8', error_bad_lines=False, usecols=[i for i in range(0, b)])
use_cols = ["工号", "姓名", "部門"] # 使用的必須列
for c in ["郵件", "電話", "崗位"]: # 擴充列的列名在這裡添加即可
if c in cols:
use_cols.append(c)
df = df[use_cols] # 調整df使用列順序
person_list = df.values.tolist() # df資料框轉list
person_list.sort(key=lambda i: (i[2].count('.'), i[2], i[0]), reverse=False) # 多條件排序
# 2.開始處理清單
for i, row in enumerate(person_list):
job_id, name, depart = row[0:3]
# 将部門列替換成DN
row[2] = 'CN=' + str(name + str(job_id)) + ',' + 'OU=' + ',OU='.join(row[2].split('.')[::-1]) + ',' + ENABLED_BASE_DN
row.append(CUSTOM_SAMA + str(job_id).zfill(6)) # 增加登入名列,對應AD域user的 sAMAccountname 屬性
row.append(name + str(job_id)) # 增加CN列,對應user的 cn 屬性
# 3.開始處理傳回字典
result_dic = dict() # 傳回字典
if a > 1000:
result_dic['page_flag'] = True
else:
result_dic['page_flag'] = False
result_dic['person_list'] = person_list
return result_dic
except Exception as e:
logging.error(e)
return None
def generate_pwd(self, count):
'''
@param count{int} 所需密碼長度
@return: pwd: 生成的随機密碼
@msg: 生成随機密碼,必有數字、大小寫、特殊字元且數目僞均等;
'''
pwd_list = []
a, b = count // 4, count % 4
# 四種類别先均分除數個字元
pwd_list.extend(random.sample(string.digits, a))
pwd_list.extend(random.sample(string.ascii_lowercase, a))
pwd_list.extend(random.sample(string.ascii_uppercase, a))
pwd_list.extend(random.sample('[email protected]#$%^&*()', a))
# 從四種類别中再取餘數個字元
pwd_list.extend(random.sample(string.digits + string.ascii_lowercase + string.ascii_uppercase + '[email protected]#$%^&*()', b))
random.shuffle(pwd_list)
pwd_str = ''.join(pwd_list)
return pwd_str
def write2txt(self, path, content):
'''
@param path{string} 寫入檔案路徑;content{string} 每行寫入内容
@return:
@msg: 每行寫入檔案
'''
try:
if os.path.exists(path):
with open(path, mode='a', encoding='utf-8') as file:
file.write(content + '\n')
else:
with open(path, mode='a', encoding='utf-8') as file:
file.write(content + '\n')
return True
except Exception as e:
logging.error(e)
return False
def del_ou_right(self, flag):
'''
@param cmd_l{list} 待執行的powershell指令清單
@return: True/False
@msg: 連接配接遠端windows并批量執行powershell指令
'''
# powershell指令 用于啟用/關閉OU 防止對象被意外删除 屬性
# 防止對象被意外删除×
enable_del = ["Import-Module ActiveDirectory",
"Get-ADOrganizationalUnit -filter * -Properties ProtectedFromAccidentalDeletion | where {"
"$_.ProtectedFromAccidentalDeletion -eq $true} |Set-ADOrganizationalUnit "
"-ProtectedFromAccidentalDeletion $false"]
# 防止對象被意外删除√
disable_del = ["Import-Module ActiveDirectory",
"Get-ADOrganizationalUnit -filter * -Properties ProtectedFromAccidentalDeletion | where {"
"$_.ProtectedFromAccidentalDeletion -eq $false} |Set-ADOrganizationalUnit "
"-ProtectedFromAccidentalDeletion $true"]
flag_map = {0: enable_del, 1: disable_del}
try:
win = winrm.Session('http://' + LDAP_IP + ':5985/wsman', auth=(WINRM_USER, WINRM_PWD))
for cmd in flag_map[flag]:
ret = win.run_ps(cmd)
if ret.status_code == 0: # 調用成功 減少日志寫入
# if flag == 0:
# logging.info("防止對象被意外删除×")
# elif flag == 1:
# logging.info("防止對象被意外删除√")
return True
else:
return False
except Exception as e:
logging.error(e)
return False
def create_obj(self, dn=None, type='user', info=None):
'''
@param dn{string}, type{string}'user'/'ou'
@return: res建立結果, self.conn.result修改結果
@msg:新增對象
'''
object_class = {'user': ['user', 'posixGroup', 'top'],
'ou': ['organizationalUnit', 'posixGroup', 'top'],
}
if info is not None:
[job_id, name, dn, email, tel, title, sam, cn] = info
user_attr = {'sAMAccountname': sam, # 登入名
'userAccountControl': 544, # 啟用賬戶
'title': title, # 頭銜
'givenName': name[0:1], # 姓
'sn': name[1:], # 名
'displayname': name, # 姓名
'mail': email, # 郵箱
'telephoneNumber': tel, # 電話号
}
else:
user_attr = None
# 建立之前需要對dn中的OU部分進行判斷,如果沒有需要建立
dn_base = dn.split(',', 1)[1]
check_ou_res = self.check_ou(dn_base)
if not check_ou_res:
logging.error('check_ou失敗,未知原因!')
return False
else:
self.conn.add(dn=dn, object_class=object_class[type], attributes=user_attr)
add_result = self.conn.result
if add_result['result'] == 0:
logging.info('新增對象【' + dn + '】成功!')
if type == 'user': # 若是新增使用者對象,則需要一些初始化操作
self.conn.modify(dn, {'userAccountControl': [('MODIFY_REPLACE', 512)]}) # 激活使用者 # 如果是使用者時
new_pwd = self.generate_pwd(8)
old_pwd = ''
self.conn.extend.microsoft.modify_password(dn, new_pwd, old_pwd) # 初始化密碼
info = 'DN: ' + dn + ' PWD: ' + new_pwd
save_res = self.write2txt(PWD_PATH, info) # 将賬戶密碼寫入檔案中
if save_res:
logging.info('儲存初始化賬号密碼成功!')
else:
logging.error('儲存初始化賬号密碼失敗: ' + info)
# 密碼設定為下次登入需要修改密碼
# self.conn.modify(dn, {'pwdLastSet': (2, [0])}) # 設定第一次登入必須修改密碼
elif add_result['result'] == 68:
logging.error('entryAlreadyExists 使用者已經存在')
elif add_result['result'] == 32:
logging.error('noSuchObject 對象不存在ou錯誤')
else:
logging.error('新增對象: ' + dn + ' 失敗!其他未知錯誤')
return add_result
def del_obj(self, dn, type):
'''
@param dn{string}
@return: res修改結果
@msg: 删除對象
'''
if type == 'ou':
self.del_ou_right(flag=0)
res = self.conn.delete(dn=dn)
self.del_ou_right(flag=1)
else:
res = self.conn.delete(dn=dn)
if res == True:
logging.info('删除對象' + dn + '成功!')
return res
else:
return False
def update_obj(self, old_dn, info=None):
'''
@param {type}
@return:
@msg: 更新對象
'''
if info is not None:
[job_id, name, dn, email, tel, title, sam, cn] = info
# 組成更新屬性之前需要對dn中的OU部分進行判斷,如果沒有需要建立
dn_base = dn.split(',', 1)[1]
check_ou_res = self.check_ou(dn_base)
if not check_ou_res:
logging.error('check_ou失敗,未知原因!')
return False
else:
attr = {'distinguishedName': dn, # dn
'sAMAccountname': sam, # 登入名
'title': title, # 頭銜
'givenName': name[0:1], # 姓
'sn': name[1:], # 名
'displayname': name, # 姓名
'mail': email, # 郵箱
'telephoneNumber': tel, # 電話号
}
else:
attr = None
changes_dic = {}
for k, v in attr.items():
if not self.conn.compare(dn=old_dn, attribute=k, value=v): # 待修改屬性
if k == "distinguishedName": # 若屬性有distinguishedName則需要移動user或ou
# 若dn修改了需要将密碼檔案這個人的dn資訊更新下
self.update_pwd_file_line(old_dn=old_dn, new_dn=dn)
self.move_obj(dn=old_dn, new_dn=v)
changes_dic.update({k: [(MODIFY_REPLACE, [v])]})
if len(changes_dic) != 0: # 有修改的屬性時
modify_res = self.conn.modify(dn=dn, changes=changes_dic)
logging.info('更新對象: ' + dn + ' 更新内容: ' + str(changes_dic))
return self.conn.result
def rename_obj(self, dn, newname):
'''
@param newname{type}新的名字,User格式:"cn=新名字";OU格式:"OU=新名字"
@return: 修改結果
@msg: 重命名對象
'''
res = self.conn.modify_dn(dn, newname)
if res == True:
return True
else:
return False
def move_obj(self, dn, new_dn):
'''
@param {type}
@return:
@msg: 移動對象到新OU
'''
relative_dn, superou = new_dn.split(",", 1)
res = self.conn.modify_dn(dn=dn, relative_dn=relative_dn, new_superior=superou)
if res == True:
return True
else:
return False
def compare_attr(self, dn, attr, value):
'''
@param {type}
@return:
@msg:比較員工指定的某個屬性
'''
res = self.conn.compare(dn=dn, attribute=attr, value=value)
return res
def check_ou(self, ou, ou_list=None):
'''
@param {type}
@return:
@msg: 遞歸函數
如何判斷OU是修改了名字而不是建立的:當一個OU裡面沒有人就判斷此OU被修改了名字,删除此OU;
不管是建立還是修改了名字,都會将人員轉移到新的OU下面:需要建立OU則建立OU後再添加/轉移人員
check_ou的作用是為人員的變動準備好OU
'''
if ou_list is None:
ou_list = []
self.conn.search(ou, OU_SEARCH_FILTER) # 判斷OU存在性
while self.conn.result['result'] == 0:
if ou_list:
for ou in ou_list[::-1]:
self.conn.add(ou, 'organizationalUnit')
return True
else:
ou_list.append(ou)
ou = ",".join(ou.split(",")[1:])
self.check_ou(ou, ou_list) # 遞歸判斷
return True
def scan_ou(self):
'''掃描的時候,必須保證此OU為葉子節點,否則報notAllowedOnNonLeaf錯誤,
例如此次空OU——OU=開發部,OU=核心技術部,OU=RAN,OU=上海總部,DC=randolph,DC=com
的倒數第一、二層都是空OU,但是必須得先删除倒數第一層
是以在擷取所有OU清單的位置get_ous就将獲得的結果倒叙(用切片[::-1])
'''
res = self.get_ous(attr=['distinguishedName'])
# 調用ps腳本,防止對象被意外删除×
modify_right_res = self.del_ou_right(flag=0)
for i, ou in enumerate(res):
dn = ou['attributes']['distinguishedName']
# 判斷dd下面是否有使用者,沒有使用者的直接删除
self.conn.search(search_base=dn, search_filter=USER_SEARCH_FILTER)
if not self.conn.entries: # 沒有使用者存在的空OU,可以進行清理
try:
delete_res = self.conn.delete(dn=dn)
if delete_res:
logging.info('删除空的OU: ' + dn + ' 成功!')
else:
logging.error('删除操作處理結果' + str(self.conn.result))
except Exception as e:
logging.error(e)
else:
logging.info("沒有空OU,OU掃描完成!")
# 防止對象被意外删除√
self.del_ou_right(flag=1)
def disable_users(self, path):
'''
@param {type}
@return:
@msg: 将AD域内的使用者不在csv表格中的定義為離職員工
'''
result = ad.handle_excel(path)
newest_list = [] # 全量員工清單
for person in result['person_list']:
job_id, name, dn, email, tel, title, sam, cn = person[0:8]
dd = str(dn).split(',', 1)[1]
newest_list.append(name)
# 查詢AD域現有員工
res = self.get_users(attr=['distinguishedName', 'name', 'cn', 'displayName', 'userAccountControl'])
for i, ou in enumerate(res):
ad_user_distinguishedName, ad_user_displayName, ad_user_cn, ad_user_userAccountControl = ou['attributes'][
'distinguishedName'], ou['attributes']['displayName'], ou['attributes']['cn'], ou['attributes']['userAccountControl']
rela_dn = "cn=" + str(ad_user_cn)
# 判斷使用者不在最新的員工表格中 或者 AD域中某使用者為禁用使用者
if ad_user_displayName not in newest_list or ad_user_userAccountControl in DISABLED_USER_FLAG:
try:
# 禁用使用者
self.conn.modify(dn=ad_user_distinguishedName, changes={'userAccountControl': (2, [546])})
logging.info("在AD域中發現不在表格中使用者,禁用使用者:" + ad_user_distinguishedName)
# 移動到離職組 判斷OU存在性
self.conn.search(DISABLED_BASE_DN, OU_SEARCH_FILTER) # 判斷OU存在性
if self.conn.entries == []: # 搜不到離職員工OU則需要建立此OU
self.create_obj(dn=DISABLED_BASE_DN, type='ou')
# 移動到離職組
self.conn.modify_dn(dn=ad_user_distinguishedName, relative_dn=rela_dn, new_superior=DISABLED_BASE_DN)
logging.info('将禁用使用者【' + ad_user_distinguishedName + '】轉移到【' + DISABLED_BASE_DN + '】')
except Exception as e:
logging.error(e)
def create_user_by_excel(self, path):
'''
@param path{string} 用于新增使用者的表格
@return:
@msg:
'''
res_dic = self.handle_excel(path)
for person in res_dic['person_list']:
user_info = person
# print(user_info)
self.create_obj(info=user_info)
def ad_update(self, path):
'''AD域的初始化/更新——從表格檔案中繼資料更新AD域:
判斷使用者是否在AD域中——不在則新增;
在則判斷該使用者各屬性是否與表格中相同,有不同則修改;
完全相同的使用者不用作處理;
'''
# 準備表格檔案
result = ad.handle_excel(path)
ori_data = result['person_list']
try:
self.del_ou_right(flag=0) # 防止對象被意外删除×
with tqdm(iterable=ori_data, ncols=100, total=len(ori_data), desc='處理進度', unit='人') as tqdm_ori_data: # 封裝進度條
for person in tqdm_ori_data:
dn, cn = person[2], person[7]
user_info = person
dd = str(dn).split(',', 1)[1]
# 根據cn判斷使用者是否已經存在
filter_phrase_by_cn = "(&(objectclass=person)(cn=" + cn + "))"
search_by_cn = self.conn.search(search_base=ENABLED_BASE_DN, search_filter=filter_phrase_by_cn, attributes=['distinguishedName'])
search_by_cn_json_list = json.loads(self.conn.response_to_json())['entries']
search_by_cn_res = self.conn.result
if search_by_cn == False: # 根據cn搜尋失敗,查無此人則新增
self.create_obj(info=user_info)
else:
old_dn = search_by_cn_json_list[0]['dn'] # 部門改變的使用者的現有部門,從表格拼接出來的是新的dn在user_info中帶過去修改
self.update_obj(old_dn=old_dn, info=user_info)
# break # 可測試一個例子
self.del_ou_right(flag=1) # 防止對象被意外删除√
except KeyboardInterrupt:
tqdm_ori_data.close()
raise
tqdm_ori_data.close()
def handle_pwd_expire(self, attr=None):
'''
@param {type}
@return:
@msg: 處理密碼過期 設定密碼不過期 需要補全理論和測試
參考理論位址:
https://stackoverflow.com/questions/18615958/ldap-pwdlastset-unable-to-change-without-error-showing
'''
attr = ['pwdLastSet']
self.conn.search(search_base=ENABLED_BASE_DN,
search_filter=USER_SEARCH_FILTER,
attributes=attr)
result = self.conn.response_to_json()
res_list = json.loads(result)['entries']
for l in res_list:
pwdLastSet, dn = l['attributes']['pwdLastSet'], l['dn']
modify_res = self.conn.modify(dn, {'pwdLastSet': (2, [-1])}) # pwdLastSet隻能給-1 或 0
if modify_res:
logging.info('密碼不過期-修改使用者: ' + dn)
def update_pwd_file_line(self, old_dn=None, new_dn=None, new_pwd=None):
'''
@param dn{string}
@return: 修改結果
@msg: 當使用者的dn或密碼被程式更新,将會在這裡更新對應部分的資訊
采用臨時檔案替換源檔案的方式,節省記憶體,但占硬碟
參考文章: https://www.cnblogs.com/wuzhengzheng/p/9692368.html
'''
with open(PWD_PATH, mode='rt', encoding='utf-8') as file, \
open('TEMP.txt', mode='wt', encoding='utf-8') as temp_file:
for line in file:
if old_dn and new_dn: # dn被修改
if old_dn in line:
line = line.replace(old_dn, new_dn)
temp_file.write(line)
else:
temp_file.write(line)
elif new_pwd and old_dn: # 密碼被修改
if old_dn in line:
# 需要正則比對舊的密碼
pattern = "PWD: (.+?)\\n" # 惰性比對
local = re.findall(pattern, line)
old_pwd = local[0]
line = line.replace(old_pwd, new_pwd)
temp_file.write(line)
else:
temp_file.write(line)
os.remove(PWD_PATH)
os.rename('TEMP.txt', PWD_PATH)
def modify_pwd(self, cn):
'''
@param cn{string} 姓名工号 戴東1325
@return: 修改結果
@msg: 修改密碼
'''
# 根據cn判斷使用者是否已經存在
filter_phrase_by_cn = "(&(objectclass=person)(cn=" + cn + "))"
search_by_cn = self.conn.search(search_base=ENABLED_BASE_DN, search_filter=filter_phrase_by_cn, attributes=['distinguishedName'])
search_by_cn_json_list = json.loads(self.conn.response_to_json())['entries']
if search_by_cn:
new_pwd = self.generate_pwd(8)
old_pwd = ''
dn = search_by_cn_json_list[0]['dn']
modify_password_res = self.conn.extend.microsoft.modify_password(dn, new_pwd, old_pwd)
if modify_password_res:
logging.info('更新了對象: ' + dn + ' 的密碼')
is_exist = os.path.exists(PWD_PATH)
if not is_exist: # 校驗密碼檔案存在性
info = 'DN: ' + dn + ' PWD: ' + new_pwd
save_res = self.write2txt(PWD_PATH, info) # 将賬戶密碼寫入檔案中
if save_res:
logging.info('儲存初始化賬号密碼成功!')
else:
logging.error('儲存初始化賬号密碼失敗: ' + info)
else:
# 若密碼修改了需要将密碼檔案這個人的密碼資訊更新下
with open(PWD_PATH, mode='rt', encoding='utf-8') as file:
if dn in file.read():
is_exist_pwd_record = True
else:
is_exist_pwd_record = False
if is_exist_pwd_record: # 若發現此人資訊在密碼檔案裡則更新,否則需建立
self.update_pwd_file_line(old_dn=dn, new_pwd=new_pwd)
else:
info = 'DN: ' + dn + ' PWD: ' + new_pwd # 因為是修改密碼,是以dn未修改
self.write2txt(PWD_PATH, info)
else:
logging.error('更新對象密碼失敗!: ' + dn)
else:
logging.error('查無此人!請檢查待修改密碼對象格式是否為【姓名工号】')
if __name__ == "__main__":
# 建立AD域執行個體
ad = AD()
# res = ad.get_users()
# for user in res:
# if '24842' in user['attributes']['cn']:
# print(user)
# 修改密碼隻需要給出 姓名工号 組合的cn 通過√
# ad.modify_pwd("測試23345")
# 同步更新pwd檔案 通過√
# ad.update_pwd_file_line(old_dn='CN=戴東1325,OU=董事會,OU=RAN,OU=上海總部,DC=randolph,DC=com',
# new_dn='CN=戴東1325,OU=RAN,OU=上海總部,DC=randolph,DC=com')
# 更新AD域 通過√
# ad.ad_update(PERSON_EXCEL)
# 使用excel新增使用者 通過√
ad.create_user_by_excel(NEW_PERSON_EXCEL)
# 處理密碼過期
# res_list = ad.handle_pwd_expire()
# ad.get_ous()
# 處理源資料 通過√
# result = ad.handle_excel(PERSON_EXCEL)
# print(result)
# 添加OU 通過√
# ad.create_obj(dn='OU=TEST,DC=randolph,DC=com', type='ou')
# 分頁查詢全部user 通過√
# res = ad.get_users()
# print(res)
# 執行powershell指令 通過√
# ad.del_ou_right(flag=0)
# 空OU的掃描與删除 通過√
# ad.scan_ou()
# 離職員工邏輯 通過√ 【M】将禁用員工的處理內建
# ad.disable_users(PERSON_EXCEL)
3. 如何使用&測試案例
-
如何使用
通常,python代碼寫的測試案例可以在
寫,if __name__ == "__main__":
一行是初始化AD類,ad = AD()
,跑一下代碼,然後打開pro_info.txt看一下日志是否有連接配接AD成功的資訊:推薦每次執行CUDA操作前先隻解開這一句的注釋
有如上資訊,則可以打開
ad.create_user_by_excel(NEW_PERSON_EXCEL)
這句的注去運作。
- 測試案例——新增使用者
步驟一 該使用者是zy的,eip查詢到的架構是
XX科技.一級部門.二級部門.三級部門
,表格裡面如下填寫:
步驟二 檢查兩個常量,并測試AD域連通性
CUSTOM_SAMA = 'Z'
ENABLED_BASE_DN = "OU=上海總部,OU=XX科技,DC=XXXX,DC=com"
檢視日志資訊檔案看到
2020-07-20 17:41:42,332 INFO proAD.py 73 distinguishedName:CN=Administrator,CN=Users,DC=****,DC=com res: True
連接配接成功,可以解開建立使用者語句并執行了;
步驟三 執行新增使用者,檢視日志檔案和賬号密碼檔案
pro_info.txt
2020-07-20 17:44:19,501 INFO proAD.py 73 distinguishedName:CN=Administrator,CN=Users,DC=****,DC=com res: True
2020-07-20 17:44:19,658 INFO proAD.py 276 新增對象【CN=測試23345,OU=三級部門,OU=二級部門,OU=一級部門,OU=上海總部,OU=**科技,DC=****,DC=com】成功!
2020-07-20 17:44:19,707 INFO proAD.py 285 儲存初始化賬号密碼成功!
pro_pwd.txt
SAM: Z023345 PWD: 9P*Lq^v7 DN: CN=測試23345,OU=三級部門,OU=二級部門,OU=一級部門,OU=上海總部,OU=****,DC=****,DC=com
賬号是Z023345,密碼是9P*Lq^v7,如果想讓初始化的密碼立即能用,将代碼中的這一句注釋掉
# 密碼設定為下次登入需要修改密碼
# self.conn.modify(dn, {'pwdLastSet': (2, [0])})
結果
AD域中已經可以看到該使用者