最近樓主加班 喽, 好久沒有更新我的部落格了,哎,一言難盡,廢話我就不說了,來開始上精華。
背景:目前市面上有很多bug管理工具,但是各有各的特點,最著名,最流行的就是禅道,一個偶然的機會接觸到了python ,學到tornado後,就想着去怎麼去用到實處,後來發現自己公司的除了禅道就記錄bug沒有什麼可以用的工具了。
語言:python3 第三庫 :tornado,qiniu(用于雲存儲檔案),資料庫用sqlite
why use tornado?很多人其實會這麼問我,我感覺tornado可以實作異步,雖然現在代碼還沒有用到異步,我感覺還是很不錯的架構,值得學習,現在很多公司都在用,個人感覺這是一個不錯的,值得我們大家去學習的架構。
來看看我的需求文檔

大題是這麼的輪廓,那麼拿到這個的時候,我會進行需求分析,
需要什麼樣的資料庫, 資料模型之間的關系,雖然現在還是有很多地方是寫死的還沒有進行搜尋功能的設定,但是我相信,有了現在這個demo
那麼我開始來設計下我主要會結構
handlsers 存放類似flask的views
models存資料庫相關的,
static存放靜态
template存放模闆
untils存放公共庫
setting 配置檔案
urls url位址映射
run 運作檔案。
那麼我來開始設計我的資料,其實我的資料模型也是一波三折的。
選擇用了sqlalchemy,之前用過flask的sqlalchemy感覺不錯
from models.dataconfig import db_session,Base,create_all
from sqlalchemy import Column,Integer,DateTime,Boolean,String,ForeignKey,desc,asc,Text
from sqlalchemy.orm import relationship,backref
from untils.common import encrypt
import datetime
class User(Base):
__tablename__='users'
id=Column(Integer(),primary_key=True)
username=Column(String(64),unique=True,index=True)
email=Column(String(64))
password=Column(String(64))
last_logtime=Column(DateTime())
status=Column(Integer())
leves=Column(Integer())
iphone=Column(Integer())
Projects=relationship('Project',backref='users')
shebei=relationship('Shebei',backref='users')
file=relationship('FilePan',backref='users')
banben=relationship('BanbenWrite',backref='users')
testresult=relationship('TestResult',backref='users')
testcase=relationship('TestCase',backref='users')
buglog=relationship('BugLog',backref='users')
def __repr__(self):
return self.username
@classmethod
def get_by_id(cls, id):
item = db_session.query(User).filter(User.id==id).first()
return item
@classmethod
def get_by_username(cls, username):
item = db_session.query(User).filter(User.username== username).first()
return item
@classmethod
def get_count(cls):
return db_session.query(Shebei).count()
@classmethod
def add_new(cls,username,password,iphone,email,leves):
new=User(username=username,iphone=iphone,email=email,leves=leves)
new.password=encrypt(password)
new.status=0
db_session.add(new)
try:
db_session.commit()
except:
db_session.rollback()
class Shebei(Base):
__tablename__='shebeis'
id=Column(Integer(),primary_key=True)
shebei_id=Column(String(32),unique=True)
shebei_name=Column(String(64))
shebei_xitong=Column(String(64))
shebei_xinghao=Column(String(255))
shebei_jiage=Column(Integer())
shebei_fapiaobianhao=Column(String(64))
shebei_quanxian=Column(Boolean())
shebei_jie=Column(Boolean())
shebei_shuyu=Column(String())
shebei_date=Column(DateTime())
shebei_user=Column(String())
gou_date=Column(DateTime())
shebei_status=Column(String(16))
she_sta=Column(Integer(),default=0)
ruku_user=Column(Integer(),ForeignKey('users.id'))
def __repr__(self):
return self.shebei_name
@classmethod
def get_by_name(cls,name):
item=db_session.query(Shebei).filter(Shebei.shebei_name==name).first()
return item
@classmethod
def get_by_id(cls,id):
item=db_session.query(Shebei).filter(Shebei.id==id).first()
return item
@classmethod
def get_count(cls):
return db_session.query(Shebei).count()
class TestResult(Base):
__tablename__='testresults'
id=Column(Integer(),primary_key=True)
porject_id=Column(Integer(),ForeignKey('projects.id'))
creat_time=Column(DateTime())
bug_first=Column(Integer())
ceshirenyuan=Column(String(255))
is_send=Column(Boolean(),default=True)
filepath=Column(String(64))
status=Column(Integer(),default=0)
user_id=Column(Integer(),ForeignKey('users.id'))
def __repr__(self):
return self.porject_name
@classmethod
def get_by_name(cls,name):
item=db_session.query(TestResult).filter(TestResult.porject_name==name).first()
return item
@classmethod
def get_by_id(cls,id):
item=db_session.query(TestResult).filter(TestResult.id==id).first()
return item
@classmethod
def get_by_user_id(cls,user_id):
item=db_session.query(TestResult).filter(TestResult.user_id==user_id).first()
return item
@classmethod
def get_count(cls):
return db_session.query(TestResult).count()
class BanbenWrite(Base):
__tablename__='banbens'
id=Column(Integer(),primary_key=True)
porject_id=Column(Integer(),ForeignKey('projects.id'))
creat_time=Column(DateTime(),default=datetime.datetime.now())
banbenhao=Column(String(32))
is_xian=Column(Boolean(),default=False)
is_test=Column(Boolean(),default=False)
status=Column(Integer())
user_id=Column(Integer(),ForeignKey('users.id'))
bugadmin=relationship('BugAdmin',backref='banbens')
def __repr__(self):
return self.banbenhao
@classmethod
def get_by_name(cls,name):
item=db_session.query(BanbenWrite).filter(BanbenWrite.porject_name==name).first()
return item
@classmethod
def get_by_id(cls,id):
item=db_session.query(BanbenWrite).filter(BanbenWrite.id==id).first()
return item
@classmethod
def get_by_user_id(cls,user_id):
item=db_session.query(BanbenWrite).filter(BanbenWrite.user_id==user_id).first()
return item
@classmethod
def get_count(cls):
return db_session.query(BanbenWrite).count()
class FilePan(Base):
__tablename__='files'
id=Column(Integer(),primary_key=True)
file_fenlei=Column(String(64))
file_name=Column(String(64))
down_count=Column(Integer(),default=0)
creat_time=Column(DateTime(),default=datetime.datetime.now())
status=Column(Integer(),default=0)
down_url=Column(String(64))
is_tui=Column(Boolean(),default=False)
user_id=Column(Integer(),ForeignKey('users.id'))
def __repr__(self):
return self.file_name
@classmethod
def get_by_file_name(cls,name):
item=db_session.query(FilePan).filter(FilePan.file_name==name).first()
return item
@classmethod
def get_by_id(cls,id):
item=db_session.query(FilePan).filter(FilePan.id==id).first()
return item
@classmethod
def get_by_user_id(cls,user_id):
item=db_session.query(FilePan).filter(FilePan.user_id==user_id).first()
return item
@classmethod
def get_count(cls):
return db_session.query(FilePan).count()
class BugAdmin(Base):
__tablename__='bugadmins'
id=Column(Integer(),primary_key=True)
porject_id=Column(Integer(),ForeignKey('projects.id'))
bugname=Column(String(64))
bugdengji=Column(String(64))
bugtime=Column(DateTime(),default=datetime.datetime.now())
bug_miaoshu=Column(String(255))
ban_id=Column(Integer(),ForeignKey('banbens.id'))
fujian=Column(String(64))
is_que=Column(Boolean())
bug_status=Column(String(64))
bug_jiejuefangan=Column(String(64))
bug_send=Column(String(64))
status=Column(Integer(),default=0)
bug_log=relationship('BugLog',backref='bugadmins')
user_id=Column(Integer(),ForeignKey('users.id'))
def __repr__(self):
return self.bugname
@classmethod
def get_by_bugname(cls,bugname):
item=db_session.query(BugAdmin).filter(BugAdmin.bugname==bugname).first()
return item
@classmethod
def get_by_id(cls,id):
item=db_session.query(BugAdmin).filter(BugAdmin.id==id).first()
return item
@classmethod
def get_by_porject_name(cls,porject_name):
item=db_session.query(BugAdmin).filter(BugAdmin.porject_name==porject_name).first()
return item
@classmethod
def get_count(cls):
return db_session.query(BugAdmin).count()
class TestCase(Base):
__tablename__='testcases'
id=Column(Integer(),primary_key=True)
porject_id=Column(Integer(),ForeignKey('projects.id'))
casename=Column(String(64))
case_qianzhi=Column(String())
case_buzhou=Column(String())
case_yuqi=Column(String())
status=Column(Integer(),default=0)
case_crea_time=Column(DateTime(),default=datetime.datetime.now())
user_id=Column(Integer(),ForeignKey('users.id'))
def __repr__(self):
return self.casename
@classmethod
def get_by_project_name(Cls,project_name):
item=db_session.query(TestCase).filter(TestCase.project_name==project_name).first()
return item
@classmethod
def get_by_casename(Cls,casename):
item=db_session.query(TestCase).filter(TestCase.casename==casename).first()
return item
@classmethod
def get_by_id(cls,id):
item=db_session.query(TestCase).filter(TestCase.id==id).first()
return item
@classmethod
def get_count(cls):
return db_session.query(TestCase).count()
class BugLog(Base):
__tablename__='buglogs'
id=Column(Integer(),primary_key=True)
bug_id=Column(Integer(),ForeignKey('bugadmins.id'))
caozuo=Column(String())
caozuo_time=Column(DateTime())
user_id=Column(Integer(),ForeignKey('users.id'))
def __repr__(self):
return self.caozuo
@classmethod
def get_by_id(Cls,id):
item=db_session.query(BugLog).filter(BugLog.id==id).first()
return item
@classmethod
def get_by_user_id(Cls,user_id):
item=db_session.query(BugLog).filter(BugLog.user_id==user_id).first()
return item
@classmethod
def get_by_bug_id(Cls,bug_id):
item=db_session.query(BugLog).filter(BugLog.bug_id==bug_id).first()
return item
class Project(Base):
__tablename__='projects'
id=Column(Integer(),primary_key=True)
name=Column(String(64))
user_id=Column(Integer(),ForeignKey('users.id'))
bug_log=relationship('BugAdmin',backref='projects')
banben=relationship('BanbenWrite',backref='projects')
testresult=relationship('TestResult',backref='projects')
testcase=relationship('TestCase',backref='projects')
def __repr__(self):
return self.name
@classmethod
def get_by_id(cls,id):
item=db_session.query(Project).filter(Project.id==id).first()
return item
@classmethod
def get_by_name(cls,name):
item=db_session.query(Project).filter(Project.name==name).first()
return item
這是資料庫相關的,
1 資料庫配置相關的
2
3 from sqlalchemy import create_engine
4 from sqlalchemy.orm import scoped_session,sessionmaker
5 from sqlalchemy.ext.declarative import declarative_base
6 engine=create_engine('sqlite:///shebei.db',convert_unicode=True)
7 Base=declarative_base()
8 db_session=scoped_session(sessionmaker(bind=engine))
9 def create_all():
10 Base.metadata.create_all(engine)
11 def drop_all():
12 Base.metadata.drop_all(engine)
其實在開發的過程中,也遇到了很多阻力,比如下載下傳一直實作不好,比如分頁,也是參照别人的實作的,
分頁公共子產品
class Pagination:
def __init__(self, current_page, all_item):
try:
page = int(current_page)
except:
page = 1
if page < 1:
page = 1
all_pager, c = divmod(all_item, 10)
if int(c) > 0:
all_pager += 1
self.current_page = page
self.all_pager = all_pager
@property
def start(self):
return (self.current_page - 1) * 10
@property
def end(self):
return self.current_page * 10
def string_pager(self, base_url="/index/"):
if self.current_page == 1:
prev = '<li><a href="javascript:void(0);">上一頁</a></li>'
else:
prev = '<li><a href="%s%s">上一頁</a></li>' % (base_url, self.current_page - 1,)
if self.current_page == self.all_pager:
nex = '<li><a href="javascript:void(0);">下一頁</a></li>'
else:
nex = '<li><a href="%s%s">下一頁</a></li>' % (base_url, self.current_page + 1,)
last = '<li><a href="%s%s">尾頁</a></li>' % (base_url, self.all_pager,)
str_page = "".join((prev,nex,last))
return str_page
在上傳檔案的時候,原來存放在本地,結果呢,下載下傳處理不好,于是乎選擇了七牛,需要到七牛的官網去注冊自己的賬号
from qiniu import Auth,put_file,etag,urlsafe_base64_encode
import qiniu.config
access_key='uVxowDUcYx641ivtUb111WBEI4112L3D117JHNM_AOtskRh4'
secret_key='PdXU9XrXTLtp1N21bhU1Frm1FDZqE1qhjkEaE9d1xVLZ5C'
def sendfile(key,file):
q=Auth(access_key,secret_key)
bucket_name='leilei22'
token = q.upload_token(bucket_name, key)
ret, info = put_file(token, key, file)
me= ret['hash']
f=etag(file)
if me==f:
assert_t=True
else:
assert_t=False
return assert_t
解析Excel,主要用于上傳測試用例
import xlrd,xlwt
from xlutils.copy import copy
def datacel(filepath):
file=xlrd.open_workbook(filepath)
me=file.sheets()[0]
nrows=me.nrows
porject_id_list=[]
casename_list=[]
case_qianzhi_list=[]
case_buzhou_list=[]
case_yuqi_list=[]
for i in range(1,nrows):
porject_id_list.append(me.cell(i,0).value)
casename_list.append(me.cell(i,2).value)
case_qianzhi_list.append(me.cell(i,3).value)
case_buzhou_list.append(me.cell(i,4).value)
case_yuqi_list.append(me.cell(i,1).value)
return porject_id_list,casename_list,case_qianzhi_list,case_buzhou_list,case_yuqi_list
其實這麼現在公共子產品完畢了,其實作在可以着手去開始寫我們的代碼了,主要的代碼,還有靜态界面,因為前後端都是我自己,我的前端其實還是從網上找來的模闆,
學習的道路是痛苦的,但是我相信我是可以成功的,
from tornado.web import RequestHandler
from models.model_py import User
class BaseHandler(RequestHandler):
@property
def db(self):
return self.application.db
def get_current_user(self):
user_id = self.get_secure_cookie('user_id')
if not user_id:
return None
return User.get_by_id(int(user_id))