天天看點

python連接配接資料庫:pymsql子產品--增删查改操作類化dbDemon項目目錄結構:和shell互動操作格式化輸出内容(第二個測試檔案)

dbDemon項目目錄結構:

__init__.py初始化檔案 #一個子產品必須要的檔案
config.py 配置檔案
model.py  具體實作操作的檔案
dbDemonTest.py  測試db子產品的檔案
           

config.py 配置檔案

#定義資料庫資訊
host='localhost'
user='root'
password=''
dbname='mydb'
port=3306
           

model.py 具體實作操作的檔案

import pymysql,os,sys
from dbDemon import config

class Model:
    link=""
    cursor=""
    primarykey='id'
    table=''
    field=[]  #定義表格字段

    def __init__(self,tablename,config=config):  #config預設使用以及定義好的py檔案,如果初始化時自動傳入也可以
        try:
            self.link=pymysql.connect(host=config.host,user=config.user,password=config.password,port=config.port,db=config.dbname)
            self.cursor=self.link.cursor()  #傳回一個cursor遊标對象
            #self.cursor=self.link.cursor(pymysql.cursors.DictCursor)  #傳回的是字典格式的cursor對象

            self.table=tablename
            self.__loadfield()
        except Exception:
            print("初始化model類失敗:%s"%Exception)

    def __del__(self):   ####定義析構函數,結束調用時自動關閉
        if self.cursor:
            self.cursor.close()
        if self.link:
            self.link.close()

    def findAll(self):
        try:
            sql='select * from %s' % self.table
            print('執行:'+sql)
            self.cursor.execute(sql)
            data=self.cursor.fetchall()
            print('目前表格有%s條資料' % self.cursor.rowcount + "\n" + "已有資料為:")
            #print(data)

            return  data
        except Exception:
            print('sql 查詢所有資料失敗')
    def findone(self,id=0):
        try:
            sql='select * from %s where %s="%s"' % (self.table,self.primarykey,id)
            print('執行:'+sql)
            self.cursor.execute(sql)
            data=self.cursor.fetchall()
            print('目前表格有%s條資料'%self.cursor.rowcount+"\n"+"已有資料為:")
            print(data)
        except Exception:
            print('sql 查詢所有資料失敗')

    def select(self,where=[],order='',limit=''):
        try:
            sql = 'select * from %s ' %self.table
            if isinstance(where,list) and len(where)>0:#判斷輸入的條件為list格式且不為空
                wheres=' and ' .join(where)
                print('查詢滿足條件:%s的資料'%wheres)
                sql=sql+' where %s' % wheres
            if order:
                sql=sql+" order by "+order
            if limit:
                sql=sql+' limit '+limit
            print('執行:' + sql)
            self.cursor.execute(sql)
            data = self.cursor.fetchall()
            print(data)
        except Exception:
            print('條件錯誤!')

    def save(self,data={}):  #增加方法 寫入資料庫 字典中key為資料庫字段,value為值
        try:
            print(data)
            keys=[]
            values=[]
            for key,value in data.items():
                keys.append(key)
                print(keys)
                values.append(value)
                print(values)
            #print('寫入的字段為%s,資料為%S'%(keys,values))
            #print(','.join(values))
            #sql = 'insert into %s(%s) values(%s)' % (self.table,','.join(keys),','.join(values))  報錯  因為values裡面有str和int型
            sql = 'insert into %s(%s) values(%s)' % (self.table, ','.join(keys), ','.join(['%s']*len(values)))  #valuse是放的占位符%s,執行時才傳入
            #['%s']  必須用中括号
            print(sql)
            self.cursor.execute(sql,tuple(values))
            self.link.commit()
            print(self.cursor.lastrowid)
            self.findAll()
        except Exception as err:
            print('寫入失敗 %s' %err)

    def updata(self,data={}):  #增加方法 寫入資料庫 字典中key為資料庫字段,value為值
        try:
            print(data)
            values=[]
            for key,value in data.items():
                if key != self.primarykey:
                    #values.append(key+'='+value)  #這種寫法不能自動轉換int為str類型
                    values.append("%s='%s'"%(key,value))
                    print(values)
            sql = 'update %s set %s where %s =%s ' % (self.table,','.join(values),self.primarykey,data.get('id'))  #valuse是放的占位符%s,執行時才傳入
            #['%s']  必須用中括号
            print(sql)
            self.cursor.execute(sql)
            self.link.commit()
            print(self.cursor.lastrowid)
            self.findAll()
        except Exception as err:
            print('寫入失敗 %s' %err)

    def delete(self,id=0):
        try:
            sql='delete from %s where %s = "%s" ' % (self.table,self.primarykey,id)
            print(sql)
            self.cursor.execute(sql)
            self.link.commit()
            self.findAll()
        except Exception as err:
            print('删除失敗 %s' % err)
    def __loadfield(self):#根據表格名擷取他的字段和主鍵  這個是内部私有方法 不能外部調用
        try:
            sql="desc %s" % self.table
            self.cursor.execute(sql)
            data=self.cursor.fetchall()
            #print(data)
            for v in data:
               # print(v)
                #if v["Key"] == "PRI":
                #    self.primarykey=v["Key"]
                if "PRI" in v:
                    self.primarykey=v[0]
                    #print(self.primarykey)
                self.field.append(v[0])
            print("目前表格主鍵為:")
            print(self.field)
        except Exception as err:
            print('擷取字段失敗 %s' % err)
           

dbDemonTest.py 測試db子產品的檔案(第一個測試檔案)

from dbDemon.model import Model  #導入dbdemone目錄下的model檔案内的Model類
import os,sys
if __name__=="__main__":
    test=Model('mytable')
    #test.findAll()
    #test.findone(4)
    print("歡迎來到mytable表格進行查詢")
    #{0:<5}表示第1個位置 占位5個;
    print("|{0:<12} |{1:<12} |{2:<12} |{3:<10}".format('1.查詢所有資料','2.查詢單個資料','3.插入資料','4.删除資料'))
    while True:
        type=input("輸入你要進行的操作:")
        if type == '1':
            print("開始查詢所有資料")
            data=test.findAll()
            print("|{0:<5} |{1:<5} |{2:<5} |{3:<5}".format('id', 'name', 'class', 'age'))
            for da in data:
                print("|{0:<5} |{1:<5} |{2:<5} |{3:<5}".format(da[0], da[1], da[2], da[3]))
        elif type =='2':
            ids=input("請輸入要查詢的人員id号:")
            test.findone(int(ids))
        elif type =='3':
            name=input("請輸入建立的人員名稱")
            id=input("請輸入建立的id")
            classic=input("請輸入班級")
            age=input("請輸入年齡")
            test.save({'id': id, 'name': name, 'class': classic, 'age': age})  # 增加資料
        elif type =='4':
            id=input("請輸入需要删除的人員的id值")
            test.delete(id)
        else:
            print("輸入錯誤!!退出")
            break


    #lists=['id=7','name="newname"']
    lists=['name="newname"']
    #test.select(lists,'id desc','2')
    #test.save({'id':11,'name':'addn2','class':'shell1','age':18}) #增加資料
    #test.updata({'id':11,'name':'updatename','class':'shell','age':18})
    #test.delete(10)
           

和shell互動操作格式化輸出内容(第二個測試檔案)

from dbDemon.model import Model  #導入dbdemone目錄下的model檔案内的Model類
import os,sys
if __name__=="__main__":
    test=Model('mytable')
    #test.findAll()
    #test.findone(4)
    print("歡迎來到mytable表格進行查詢")
    #{0:<5}表示第1個位置 占位5個;
    print("|{0:<12} |{1:<12} |{2:<12} |{3:<10}".format('1.查詢所有資料','2.查詢單個資料','3.插入資料','4.删除資料'))
    while True:
        type=input("輸入你要進行的操作:")
        if type == '1':
            print("開始查詢所有資料")
            data=test.findAll()
            print("|{0:<5} |{1:<5} |{2:<5} |{3:<5}".format('id', 'name', 'class', 'age'))
            for da in data:
                print("|{0:<5} |{1:<5} |{2:<5} |{3:<5}".format(da[0], da[1], da[2], da[3]))
        elif type =='2':
            ids=input("請輸入要查詢的人員id号:")
            test.findone(int(ids))
        elif type =='3':
            name=input("請輸入建立的人員名稱")
            id=input("請輸入建立的id")
            classic=input("請輸入班級")
            age=input("請輸入年齡")
            test.save({'id': id, 'name': name, 'class': classic, 'age': age})  # 增加資料
        elif type =='4':
            id=input("請輸入需要删除的人員的id值")
            test.delete(id)
        else:
            print("輸入錯誤!!退出")
            break


    #lists=['id=7','name="newname"']
    lists=['name="newname"']
    #test.select(lists,'id desc','2')
    #test.save({'id':11,'name':'addn2','class':'shell1','age':18}) #增加資料
    #test.updata({'id':11,'name':'updatename','class':'shell','age':18})
    #test.delete(10)

           

繼續閱讀