天天看點

AD域生産代碼使用說明

AD域生産代碼使用說明

    • 1. 每次需要檢查的常量
    • 2. 去隐私後的代碼
    • 3. 如何使用&測試案例

1. 每次需要檢查的常量

  1. CUSTOM_SAMA = '待填寫内容’

    HAND的員工填

    HAND

    ,zy的人填

    Z

    ,都是大寫,這個将拼接工号補零到六位形成唯一的使用者賬号;來自HAND工号為123的則賬号為HAND000123,來自zy工号為23345的賬号為Z023345;
  2. ENABLED_BASE_DN = '待填寫内容’

    和AD域中現有架構對應,可用在windows server伺服器用ADSI編輯器檢視;

    • zy的合作夥伴的填寫成

      OU=合作夥伴,DC=XXX,DC=com

    • zy自身的填寫成

      OU=上海總部,OU=XX科技,DC=XXX,DC=com

      新增員工表格裡面的部門字段是正向的,從eip搜到的或從資料庫導出的格式是

      XX科技.一級子部門.二級子部門.三級子部門

      ,因為設計之初在

      XX科技

      一級子部門

      中間加了一層

      上海總部

      ,因而需要在表格中填寫成

      一級子部門.二級子部門.三級子部門

      ENABLED_BASE_DN

      字元串和表格中的部門路徑處理後将會拼接在一起,作為該使用者在AD域中的架構:

      OU=三級子部門,OU=二級子部門,OU=一級子部門,OU=上海總部,OU=XX科技,DC=XXX,DC=com

以上是每次需要注意區分的兩種情況,因屬于zy和不屬于zy的人員部門架構設計不同,其自定義字首有些許變化;

其他參數說明

和代碼同路徑下有待新增員工清單表格檔案NEW_PERSON_EXCEL,支援一次性填寫多個員工資訊,用來批量為員工建立賬号;

NEW_PERSON_EXCEL = “new_person_list.xlsx”

AD域生産代碼使用說明

2. 去隐私後的代碼

項目所有内容:

AD域生産代碼使用說明
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
'''
@Author: randolph
@Date: 2020-05-27 14:33:03
@LastEditors: randolph
@LastEditTime: 2020-07-16 09:59:20
@version: 1.0
@Contact: [email protected]
@Descripttion: 用python3+ldap3管理windows server2019的AD域;
'''
import json
import logging.config
import os
import random
import re
import string
from time import sleep

import pandas as pd
import winrm
import yaml
from ldap3 import (ALL, ALL_ATTRIBUTES, MODIFY_REPLACE, NTLM, SASL, SIMPLE,
                   SUBTREE, SYNC, Connection, Server)
from tqdm import tqdm

# 日志配置
LOG_CONF = 'pro_logging.yaml'
# AD域設定
LDAP_IP = ''                                   # LDAP本地伺服器IP
USER = ''    # LDAP本地伺服器IP
PASSWORD = ""                              # LDAP本地伺服器管理者密碼
CUSTOM_SAMA = 'Z'                                          # 自定義登入名英文字首 每次需要确定
DISABLED_BASE_DN = ''   # 離職賬戶所在OU
ENABLED_BASE_DN = "OU=上海總部,DC=XXX,DC=com"       # 正式員工賬戶所在OU 每次需要确定
USER_SEARCH_FILTER = '(objectclass=user)'                  # 隻擷取使用者對象 過濾條件
OU_SEARCH_FILTER = '(objectclass=organizationalUnit)'      # 隻擷取OU對象 過濾條件
DISABLED_USER_FLAG = [514, 546, 66050, 66080, 66082]       # 禁用賬戶UserAccountControl對應十進制值清單
ENABLED_USER_FLAG = [512, 544, 66048, 262656]              # 啟用賬戶UserAccountControl對應十進制值清單
# excel表格
PERSON_EXCEL = "person_list.xlsx"                           # 全量公司員工資料
NEW_PERSON_EXCEL = "new_person_list.xlsx"                   # 新增員工表格
PWD_PATH = 'pro_pwd.txt'
# WINRM資訊 無需設定
WINRM_USER = 'Administrator'
WINRM_PWD = PASSWORD


class AD(object):
    '''AD域的操作
    '''

    def __init__(self):
        '''初始化加載日志配置
        AD域連接配接
        AD基礎資訊加載
        '''
        # 初始化加載日志配置
        self.setup_logging(path=LOG_CONF)
        SERVER = Server(host=LDAP_IP,
                        port=636,               # 636安全端口
                        use_ssl=True,
                        get_info=ALL,
                        connect_timeout=3)      # 連接配接逾時為3秒
        try:
            self.conn = Connection(
                server=SERVER,
                user=USER,
                password=PASSWORD,
                auto_bind=True,
                read_only=False,                # 禁止修改資料True
                receive_timeout=10)             # 10秒内沒傳回消息則觸發逾時異常
            logging.info("distinguishedName:%s res: %s" % (USER, self.conn.bind()))
        except BaseException as e:
            logging.error("AD域連接配接失敗,請檢查IP/賬戶/密碼")
        finally:
            self.conn.closed

    def setup_logging(self, path=LOG_CONF, default_level=logging.INFO, env_key="LOG_CFG"):
        value = os.getenv(env_key, None)
        if value:
            path = value
        if os.path.exists(path):
            with open(path, "r") as f:
                config = yaml.safe_load(f)
                logging.config.dictConfig(config)
        else:
            logging.basicConfig(level=default_level)

    def get_users(self, attr=ALL_ATTRIBUTES):
        '''
        @param {type}
        @return: total_entries所有使用者
        @msg: 擷取所有使用者
        '''
        entry_list = self.conn.extend.standard.paged_search(
            search_base=ENABLED_BASE_DN,
            search_filter=USER_SEARCH_FILTER,
            search_scope=SUBTREE,
            attributes=attr,
            paged_size=5,
            generator=False)                                        # 關閉生成器,結果為清單
        total_entries = 0
        for entry in entry_list:
            total_entries += 1
        logging.info("共查詢到記錄條目: " + str(total_entries))
        return entry_list

    def get_ous(self, attr=None):
        '''
        @param {type}
        @return: res所有OU
        @msg: 擷取所有OU
        '''
        self.conn.search(search_base=ENABLED_BASE_DN,
                         search_filter=OU_SEARCH_FILTER,
                         attributes=attr)
        result = self.conn.response_to_json()
        res_list = json.loads(result)['entries']
        return res_list[::-1]

    def handle_excel(self, path):
        '''
        @param path{string} excel檔案絕對路徑
        @return: result: { 'page_flag': True, 'person_list': [[], [], ...] }
        @msg: 表格檔案預處理
        1.增加行列數判————行數決定AD的查詢是否分頁,列數用以判斷必須列資料完整性與補充列;
        2.判斷必須列【工号|姓名|部門】是否存在且是否有空值
        3.人員清單的使用sort函數排序key用lambda函數,排序條件(i[2].count('.'), i[2], i[0])為(部門層級、部門名稱、工号)
        '''
        try:
            # 1.開始源檔案格式掃描
            df = pd.read_excel(path, encoding='utf-8', error_bad_lines=False)           # 讀取源檔案
            a, b = df.shape                                                             # 表格行列數
            cols = df.columns.tolist()                  # 表格列名清單
            is_ex_null = df.isnull().any().tolist()     # 列是否存在空值
            dic = dict(zip(cols, is_ex_null))           # 存在空值的列
            if int("工号" in cols) + int("姓名" in cols) + int("部門" in cols) < 3:     # 判斷必須列是否都存在
                logging.error("表格缺少必要列【工号|姓名|部門】請選擇正确的源檔案;或者将相應列列名修改為【工号|姓名|部門】")
                exit()
            elif int(dic["工号"]) + int(dic["姓名"]) + int(dic["部門"]) > 0:            # 判斷必須列是否有空值
                logging.error("必要列存在空值記錄,請檢查補全後重試:" + '\n' + str(df[df.isnull().values == True]))
            else:
                df = pd.read_excel(path, encoding='utf-8', error_bad_lines=False, usecols=[i for i in range(0, b)])
                use_cols = ["工号", "姓名", "部門"]     # 使用的必須列
                for c in ["郵件", "電話", "崗位"]:      # 擴充列的列名在這裡添加即可
                    if c in cols:
                        use_cols.append(c)
                df = df[use_cols]                       # 調整df使用列順序
                person_list = df.values.tolist()        # df資料框轉list
                person_list.sort(key=lambda i: (i[2].count('.'), i[2], i[0]), reverse=False)        # 多條件排序
                # 2.開始處理清單
                for i, row in enumerate(person_list):
                    job_id, name, depart = row[0:3]
                    # 将部門列替換成DN
                    row[2] = 'CN=' + str(name + str(job_id)) + ',' + 'OU=' + ',OU='.join(row[2].split('.')[::-1]) + ',' + ENABLED_BASE_DN
                    row.append(CUSTOM_SAMA + str(job_id).zfill(6))        # 增加登入名列,對應AD域user的 sAMAccountname 屬性
                    row.append(name + str(job_id))                  # 增加CN列,對應user的 cn 屬性
                # 3.開始處理傳回字典
                result_dic = dict()                         # 傳回字典
                if a > 1000:
                    result_dic['page_flag'] = True
                else:
                    result_dic['page_flag'] = False
                result_dic['person_list'] = person_list
                return result_dic
        except Exception as e:
            logging.error(e)
            return None

    def generate_pwd(self, count):
        '''
        @param count{int} 所需密碼長度
        @return: pwd: 生成的随機密碼
        @msg: 生成随機密碼,必有數字、大小寫、特殊字元且數目僞均等;
        '''
        pwd_list = []
        a, b = count // 4, count % 4
        # 四種類别先均分除數個字元
        pwd_list.extend(random.sample(string.digits, a))
        pwd_list.extend(random.sample(string.ascii_lowercase, a))
        pwd_list.extend(random.sample(string.ascii_uppercase, a))
        pwd_list.extend(random.sample('[email protected]#$%^&*()', a))
        # 從四種類别中再取餘數個字元
        pwd_list.extend(random.sample(string.digits + string.ascii_lowercase + string.ascii_uppercase + '[email protected]#$%^&*()', b))
        random.shuffle(pwd_list)
        pwd_str = ''.join(pwd_list)
        return pwd_str

    def write2txt(self, path, content):
        '''
        @param path{string} 寫入檔案路徑;content{string} 每行寫入内容
        @return:
        @msg: 每行寫入檔案
        '''
        try:
            if os.path.exists(path):
                with open(path, mode='a', encoding='utf-8') as file:
                    file.write(content + '\n')
            else:
                with open(path, mode='a', encoding='utf-8') as file:
                    file.write(content + '\n')
            return True
        except Exception as e:
            logging.error(e)
            return False

    def del_ou_right(self, flag):
        '''
        @param cmd_l{list} 待執行的powershell指令清單
        @return: True/False
        @msg: 連接配接遠端windows并批量執行powershell指令
        '''
        # powershell指令 用于啟用/關閉OU 防止對象被意外删除 屬性
        # 防止對象被意外删除×
        enable_del = ["Import-Module ActiveDirectory",
                      "Get-ADOrganizationalUnit -filter * -Properties ProtectedFromAccidentalDeletion | where {"
                      "$_.ProtectedFromAccidentalDeletion -eq $true} |Set-ADOrganizationalUnit "
                      "-ProtectedFromAccidentalDeletion $false"]
        # 防止對象被意外删除√
        disable_del = ["Import-Module ActiveDirectory",
                       "Get-ADOrganizationalUnit -filter * -Properties ProtectedFromAccidentalDeletion | where {"
                       "$_.ProtectedFromAccidentalDeletion -eq $false} |Set-ADOrganizationalUnit "
                       "-ProtectedFromAccidentalDeletion $true"]
        flag_map = {0: enable_del, 1: disable_del}

        try:
            win = winrm.Session('http://' + LDAP_IP + ':5985/wsman', auth=(WINRM_USER, WINRM_PWD))
            for cmd in flag_map[flag]:
                ret = win.run_ps(cmd)
            if ret.status_code == 0:      # 調用成功 減少日志寫入
                # if flag == 0:
                #     logging.info("防止對象被意外删除×")
                # elif flag == 1:
                #     logging.info("防止對象被意外删除√")
                return True
            else:
                return False
        except Exception as e:
            logging.error(e)
            return False

    def create_obj(self, dn=None, type='user', info=None):
        '''
        @param dn{string}, type{string}'user'/'ou'
        @return: res建立結果, self.conn.result修改結果
        @msg:新增對象
        '''
        object_class = {'user': ['user', 'posixGroup', 'top'],
                        'ou': ['organizationalUnit', 'posixGroup', 'top'],
                        }
        if info is not None:
            [job_id, name, dn, email, tel, title, sam, cn] = info
            user_attr = {'sAMAccountname': sam,      # 登入名
                         'userAccountControl': 544,  # 啟用賬戶
                         'title': title,             # 頭銜
                         'givenName': name[0:1],     # 姓
                         'sn': name[1:],             # 名
                         'displayname': name,        # 姓名
                         'mail': email,              # 郵箱
                         'telephoneNumber': tel,     # 電話号
                         }
        else:
            user_attr = None
        # 建立之前需要對dn中的OU部分進行判斷,如果沒有需要建立
        dn_base = dn.split(',', 1)[1]
        check_ou_res = self.check_ou(dn_base)
        if not check_ou_res:
            logging.error('check_ou失敗,未知原因!')
            return False
        else:
            self.conn.add(dn=dn, object_class=object_class[type], attributes=user_attr)
            add_result = self.conn.result

            if add_result['result'] == 0:
                logging.info('新增對象【' + dn + '】成功!')
                if type == 'user':          # 若是新增使用者對象,則需要一些初始化操作
                    self.conn.modify(dn, {'userAccountControl': [('MODIFY_REPLACE', 512)]})         # 激活使用者                                                               # 如果是使用者時
                    new_pwd = self.generate_pwd(8)
                    old_pwd = ''
                    self.conn.extend.microsoft.modify_password(dn, new_pwd, old_pwd)                # 初始化密碼
                    info = 'DN: ' + dn + ' PWD: ' + new_pwd
                    save_res = self.write2txt(PWD_PATH, info)                                       # 将賬戶密碼寫入檔案中
                    if save_res:
                        logging.info('儲存初始化賬号密碼成功!')
                    else:
                        logging.error('儲存初始化賬号密碼失敗: ' + info)
                    # 密碼設定為下次登入需要修改密碼
                    # self.conn.modify(dn, {'pwdLastSet': (2, [0])})                                  # 設定第一次登入必須修改密碼
            elif add_result['result'] == 68:
                logging.error('entryAlreadyExists 使用者已經存在')
            elif add_result['result'] == 32:
                logging.error('noSuchObject 對象不存在ou錯誤')
            else:
                logging.error('新增對象: ' + dn + ' 失敗!其他未知錯誤')
            return add_result

    def del_obj(self, dn, type):
        '''
        @param dn{string}
        @return: res修改結果
        @msg: 删除對象
        '''
        if type == 'ou':
            self.del_ou_right(flag=0)
            res = self.conn.delete(dn=dn)
            self.del_ou_right(flag=1)
        else:
            res = self.conn.delete(dn=dn)
        if res == True:
            logging.info('删除對象' + dn + '成功!')
            return res
        else:
            return False

    def update_obj(self, old_dn, info=None):
        '''
        @param {type}
        @return:
        @msg: 更新對象
        '''
        if info is not None:
            [job_id, name, dn, email, tel, title, sam, cn] = info
            # 組成更新屬性之前需要對dn中的OU部分進行判斷,如果沒有需要建立
            dn_base = dn.split(',', 1)[1]
            check_ou_res = self.check_ou(dn_base)
            if not check_ou_res:
                logging.error('check_ou失敗,未知原因!')
                return False
            else:
                attr = {'distinguishedName': dn,    # dn
                        'sAMAccountname': sam,      # 登入名
                        'title': title,             # 頭銜
                        'givenName': name[0:1],     # 姓
                        'sn': name[1:],             # 名
                        'displayname': name,        # 姓名
                        'mail': email,              # 郵箱
                        'telephoneNumber': tel,     # 電話号
                        }
        else:
            attr = None
        changes_dic = {}
        for k, v in attr.items():
            if not self.conn.compare(dn=old_dn, attribute=k, value=v):                  # 待修改屬性
                if k == "distinguishedName":        # 若屬性有distinguishedName則需要移動user或ou
                    # 若dn修改了需要将密碼檔案這個人的dn資訊更新下
                    self.update_pwd_file_line(old_dn=old_dn, new_dn=dn)
                    self.move_obj(dn=old_dn, new_dn=v)
                changes_dic.update({k: [(MODIFY_REPLACE, [v])]})
        if len(changes_dic) != 0:   # 有修改的屬性時
            modify_res = self.conn.modify(dn=dn, changes=changes_dic)
            logging.info('更新對象: ' + dn + ' 更新内容: ' + str(changes_dic))
        return self.conn.result

    def rename_obj(self, dn, newname):
        '''
        @param newname{type}新的名字,User格式:"cn=新名字";OU格式:"OU=新名字"
        @return: 修改結果
        @msg: 重命名對象
        '''
        res = self.conn.modify_dn(dn, newname)
        if res == True:
            return True
        else:
            return False

    def move_obj(self, dn, new_dn):
        '''
        @param {type}
        @return:
        @msg: 移動對象到新OU
        '''
        relative_dn, superou = new_dn.split(",", 1)
        res = self.conn.modify_dn(dn=dn, relative_dn=relative_dn, new_superior=superou)
        if res == True:
            return True
        else:
            return False

    def compare_attr(self, dn, attr, value):
        '''
        @param {type}
        @return:
        @msg:比較員工指定的某個屬性
        '''
        res = self.conn.compare(dn=dn, attribute=attr, value=value)
        return res

    def check_ou(self, ou, ou_list=None):
        '''
        @param {type}
        @return:
        @msg: 遞歸函數
    如何判斷OU是修改了名字而不是建立的:當一個OU裡面沒有人就判斷此OU被修改了名字,删除此OU;
    不管是建立還是修改了名字,都會将人員轉移到新的OU下面:需要建立OU則建立OU後再添加/轉移人員
    check_ou的作用是為人員的變動準備好OU
        '''
        if ou_list is None:
            ou_list = []
        self.conn.search(ou, OU_SEARCH_FILTER)      # 判斷OU存在性

        while self.conn.result['result'] == 0:
            if ou_list:
                for ou in ou_list[::-1]:
                    self.conn.add(ou, 'organizationalUnit')
            return True
        else:
            ou_list.append(ou)
            ou = ",".join(ou.split(",")[1:])
            self.check_ou(ou, ou_list)  # 遞歸判斷
            return True

    def scan_ou(self):
        '''掃描的時候,必須保證此OU為葉子節點,否則報notAllowedOnNonLeaf錯誤,
        例如此次空OU——OU=開發部,OU=核心技術部,OU=RAN,OU=上海總部,DC=randolph,DC=com
        的倒數第一、二層都是空OU,但是必須得先删除倒數第一層
        是以在擷取所有OU清單的位置get_ous就将獲得的結果倒叙(用切片[::-1])
        '''
        res = self.get_ous(attr=['distinguishedName'])
        # 調用ps腳本,防止對象被意外删除×
        modify_right_res = self.del_ou_right(flag=0)
        for i, ou in enumerate(res):
            dn = ou['attributes']['distinguishedName']
            # 判斷dd下面是否有使用者,沒有使用者的直接删除
            self.conn.search(search_base=dn, search_filter=USER_SEARCH_FILTER)
            if not self.conn.entries:  # 沒有使用者存在的空OU,可以進行清理
                try:
                    delete_res = self.conn.delete(dn=dn)
                    if delete_res:
                        logging.info('删除空的OU: ' + dn + ' 成功!')
                    else:
                        logging.error('删除操作處理結果' + str(self.conn.result))
                except Exception as e:
                    logging.error(e)
        else:
            logging.info("沒有空OU,OU掃描完成!")
        # 防止對象被意外删除√
        self.del_ou_right(flag=1)

    def disable_users(self, path):
        '''
        @param {type}
        @return:
        @msg: 将AD域内的使用者不在csv表格中的定義為離職員工
        '''
        result = ad.handle_excel(path)
        newest_list = []        # 全量員工清單
        for person in result['person_list']:
            job_id, name, dn, email, tel, title, sam, cn = person[0:8]
            dd = str(dn).split(',', 1)[1]
            newest_list.append(name)
        # 查詢AD域現有員工
        res = self.get_users(attr=['distinguishedName', 'name', 'cn', 'displayName', 'userAccountControl'])
        for i, ou in enumerate(res):
            ad_user_distinguishedName, ad_user_displayName, ad_user_cn, ad_user_userAccountControl = ou['attributes'][
                'distinguishedName'], ou['attributes']['displayName'], ou['attributes']['cn'], ou['attributes']['userAccountControl']
            rela_dn = "cn=" + str(ad_user_cn)
            # 判斷使用者不在最新的員工表格中 或者 AD域中某使用者為禁用使用者
            if ad_user_displayName not in newest_list or ad_user_userAccountControl in DISABLED_USER_FLAG:
                try:
                    # 禁用使用者
                    self.conn.modify(dn=ad_user_distinguishedName, changes={'userAccountControl': (2, [546])})
                    logging.info("在AD域中發現不在表格中使用者,禁用使用者:" + ad_user_distinguishedName)
                    # 移動到離職組 判斷OU存在性
                    self.conn.search(DISABLED_BASE_DN, OU_SEARCH_FILTER)    # 判斷OU存在性
                    if self.conn.entries == []:                             # 搜不到離職員工OU則需要建立此OU
                        self.create_obj(dn=DISABLED_BASE_DN, type='ou')
                    # 移動到離職組
                    self.conn.modify_dn(dn=ad_user_distinguishedName, relative_dn=rela_dn, new_superior=DISABLED_BASE_DN)
                    logging.info('将禁用使用者【' + ad_user_distinguishedName + '】轉移到【' + DISABLED_BASE_DN + '】')
                except Exception as e:
                    logging.error(e)

    def create_user_by_excel(self, path):
        '''
        @param path{string} 用于新增使用者的表格
        @return:
        @msg:
        '''
        res_dic = self.handle_excel(path)
        for person in res_dic['person_list']:
            user_info = person
            # print(user_info)
            self.create_obj(info=user_info)

    def ad_update(self, path):
        '''AD域的初始化/更新——從表格檔案中繼資料更新AD域:
        判斷使用者是否在AD域中——不在則新增;
        在則判斷該使用者各屬性是否與表格中相同,有不同則修改;
        完全相同的使用者不用作處理;
        '''
        # 準備表格檔案
        result = ad.handle_excel(path)
        ori_data = result['person_list']
        try:
            self.del_ou_right(flag=0)       # 防止對象被意外删除×
            with tqdm(iterable=ori_data, ncols=100, total=len(ori_data), desc='處理進度', unit='人') as tqdm_ori_data:    # 封裝進度條
                for person in tqdm_ori_data:
                    dn, cn = person[2], person[7]
                    user_info = person
                    dd = str(dn).split(',', 1)[1]
                    # 根據cn判斷使用者是否已經存在
                    filter_phrase_by_cn = "(&(objectclass=person)(cn=" + cn + "))"
                    search_by_cn = self.conn.search(search_base=ENABLED_BASE_DN, search_filter=filter_phrase_by_cn, attributes=['distinguishedName'])
                    search_by_cn_json_list = json.loads(self.conn.response_to_json())['entries']
                    search_by_cn_res = self.conn.result
                    if search_by_cn == False:                       # 根據cn搜尋失敗,查無此人則新增
                        self.create_obj(info=user_info)
                    else:
                        old_dn = search_by_cn_json_list[0]['dn']    # 部門改變的使用者的現有部門,從表格拼接出來的是新的dn在user_info中帶過去修改
                        self.update_obj(old_dn=old_dn, info=user_info)
                    # break                     # 可測試一個例子
                self.del_ou_right(flag=1)       # 防止對象被意外删除√
        except KeyboardInterrupt:
            tqdm_ori_data.close()
            raise
        tqdm_ori_data.close()

    def handle_pwd_expire(self, attr=None):
        '''
        @param {type}
        @return:
        @msg: 處理密碼過期 設定密碼不過期 需要補全理論和測試
        參考理論位址:
        https://stackoverflow.com/questions/18615958/ldap-pwdlastset-unable-to-change-without-error-showing
        '''
        attr = ['pwdLastSet']
        self.conn.search(search_base=ENABLED_BASE_DN,
                         search_filter=USER_SEARCH_FILTER,
                         attributes=attr)
        result = self.conn.response_to_json()
        res_list = json.loads(result)['entries']
        for l in res_list:
            pwdLastSet, dn = l['attributes']['pwdLastSet'], l['dn']
            modify_res = self.conn.modify(dn, {'pwdLastSet': (2, [-1])})      # pwdLastSet隻能給-1 或 0
            if modify_res:
                logging.info('密碼不過期-修改使用者: ' + dn)

    def update_pwd_file_line(self, old_dn=None, new_dn=None, new_pwd=None):
        '''
        @param dn{string}
        @return: 修改結果
        @msg: 當使用者的dn或密碼被程式更新,将會在這裡更新對應部分的資訊
        采用臨時檔案替換源檔案的方式,節省記憶體,但占硬碟
        參考文章: https://www.cnblogs.com/wuzhengzheng/p/9692368.html
        '''
        with open(PWD_PATH, mode='rt', encoding='utf-8') as file, \
                open('TEMP.txt', mode='wt', encoding='utf-8') as temp_file:
            for line in file:
                if old_dn and new_dn:                   # dn被修改
                    if old_dn in line:
                        line = line.replace(old_dn, new_dn)
                        temp_file.write(line)
                    else:
                        temp_file.write(line)
                elif new_pwd and old_dn:                # 密碼被修改
                    if old_dn in line:
                        # 需要正則比對舊的密碼
                        pattern = "PWD: (.+?)\\n"       # 惰性比對
                        local = re.findall(pattern, line)
                        old_pwd = local[0]
                        line = line.replace(old_pwd, new_pwd)
                        temp_file.write(line)
                    else:
                        temp_file.write(line)
        os.remove(PWD_PATH)
        os.rename('TEMP.txt', PWD_PATH)

    def modify_pwd(self, cn):
        '''
        @param cn{string} 姓名工号 戴東1325
        @return: 修改結果
        @msg: 修改密碼
        '''
        # 根據cn判斷使用者是否已經存在
        filter_phrase_by_cn = "(&(objectclass=person)(cn=" + cn + "))"
        search_by_cn = self.conn.search(search_base=ENABLED_BASE_DN, search_filter=filter_phrase_by_cn, attributes=['distinguishedName'])
        search_by_cn_json_list = json.loads(self.conn.response_to_json())['entries']
        if search_by_cn:
            new_pwd = self.generate_pwd(8)
            old_pwd = ''
            dn = search_by_cn_json_list[0]['dn']
            modify_password_res = self.conn.extend.microsoft.modify_password(dn, new_pwd, old_pwd)
            if modify_password_res:
                logging.info('更新了對象: ' + dn + ' 的密碼')
                is_exist = os.path.exists(PWD_PATH)
                if not is_exist:        # 校驗密碼檔案存在性
                    info = 'DN: ' + dn + ' PWD: ' + new_pwd
                    save_res = self.write2txt(PWD_PATH, info)                                       # 将賬戶密碼寫入檔案中
                    if save_res:
                        logging.info('儲存初始化賬号密碼成功!')
                    else:
                        logging.error('儲存初始化賬号密碼失敗: ' + info)
                else:
                    # 若密碼修改了需要将密碼檔案這個人的密碼資訊更新下
                    with open(PWD_PATH, mode='rt', encoding='utf-8') as file:
                        if dn in file.read():
                            is_exist_pwd_record = True
                        else:
                            is_exist_pwd_record = False
                    if is_exist_pwd_record:     # 若發現此人資訊在密碼檔案裡則更新,否則需建立
                        self.update_pwd_file_line(old_dn=dn, new_pwd=new_pwd)
                    else:
                        info = 'DN: ' + dn + ' PWD: ' + new_pwd     # 因為是修改密碼,是以dn未修改
                        self.write2txt(PWD_PATH, info)
            else:
                logging.error('更新對象密碼失敗!: ' + dn)
        else:
            logging.error('查無此人!請檢查待修改密碼對象格式是否為【姓名工号】')
        

if __name__ == "__main__":
    # 建立AD域執行個體
    ad = AD()
    # res = ad.get_users()
    # for user in res:
    #     if '24842' in user['attributes']['cn']:
    #         print(user)
    # 修改密碼隻需要給出 姓名工号 組合的cn     通過√
    # ad.modify_pwd("測試23345")
    # 同步更新pwd檔案     通過√
    # ad.update_pwd_file_line(old_dn='CN=戴東1325,OU=董事會,OU=RAN,OU=上海總部,DC=randolph,DC=com',
    #                         new_dn='CN=戴東1325,OU=RAN,OU=上海總部,DC=randolph,DC=com')
    # 更新AD域     通過√
    # ad.ad_update(PERSON_EXCEL)
    # 使用excel新增使用者    通過√
    ad.create_user_by_excel(NEW_PERSON_EXCEL)
    # 處理密碼過期
    # res_list = ad.handle_pwd_expire()
    # ad.get_ous()
    # 處理源資料    通過√
    # result = ad.handle_excel(PERSON_EXCEL)
    # print(result)
    # 添加OU      通過√
    # ad.create_obj(dn='OU=TEST,DC=randolph,DC=com', type='ou')
    # 分頁查詢全部user    通過√
    # res = ad.get_users()
    # print(res)
    # 執行powershell指令   通過√
    # ad.del_ou_right(flag=0)
    # 空OU的掃描與删除    通過√
    # ad.scan_ou()
    # 離職員工邏輯    通過√       【M】将禁用員工的處理內建
    # ad.disable_users(PERSON_EXCEL)
           

3. 如何使用&測試案例

  1. 如何使用

    通常,python代碼寫的測試案例可以在

    if __name__ == "__main__":

    寫,

    ad = AD()

    一行是初始化AD類,

    推薦每次執行CUDA操作前先隻解開這一句的注釋

    ,跑一下代碼,然後打開pro_info.txt看一下日志是否有連接配接AD成功的資訊:

有如上資訊,則可以打開

ad.create_user_by_excel(NEW_PERSON_EXCEL)

這句的注去運作。

  1. 測試案例——新增使用者

步驟一 該使用者是zy的,eip查詢到的架構是

XX科技.一級部門.二級部門.三級部門

,表格裡面如下填寫:

AD域生産代碼使用說明

步驟二 檢查兩個常量,并測試AD域連通性

CUSTOM_SAMA = 'Z'

ENABLED_BASE_DN = "OU=上海總部,OU=XX科技,DC=XXXX,DC=com"

檢視日志資訊檔案看到

2020-07-20 17:41:42,332 INFO proAD.py 73 distinguishedName:CN=Administrator,CN=Users,DC=****,DC=com res: True

連接配接成功,可以解開建立使用者語句并執行了;

步驟三 執行新增使用者,檢視日志檔案和賬号密碼檔案

pro_info.txt

2020-07-20 17:44:19,501  INFO  proAD.py  73  distinguishedName:CN=Administrator,CN=Users,DC=****,DC=com res: True
2020-07-20 17:44:19,658  INFO  proAD.py  276  新增對象【CN=測試23345,OU=三級部門,OU=二級部門,OU=一級部門,OU=上海總部,OU=**科技,DC=****,DC=com】成功!
2020-07-20 17:44:19,707  INFO  proAD.py  285  儲存初始化賬号密碼成功!
           

pro_pwd.txt

SAM: Z023345 PWD: 9P*Lq^v7 DN: CN=測試23345,OU=三級部門,OU=二級部門,OU=一級部門,OU=上海總部,OU=****,DC=****,DC=com

賬号是Z023345,密碼是9P*Lq^v7,如果想讓初始化的密碼立即能用,将代碼中的這一句注釋掉

# 密碼設定為下次登入需要修改密碼
# self.conn.modify(dn, {'pwdLastSet': (2, [0])})   
           

結果

AD域中已經可以看到該使用者

AD域生産代碼使用說明
AD域生産代碼使用說明