python tornado解析項目核心原理
同學們要跑下圖代碼,需要建立資料庫和下圖資料庫對應的位置。
該下圖代碼是從tornado項目中連接配接資料庫的代碼,一共3個部分,這就是tornado的核心,掌握了下述原理,應付工作完全可以了。
不懂得直接留言問我。
from functools import wraps
import threading
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy import Column, String, Integer, func
from sqlalchemy.ext.declarative import declarative_base
# 第一步:連接配接mysql的準備工作
thread_local = threading.local()
DB_CONF = {
'host': 'localhost',
'port': 3306,
'database': 'test_20191125',
'user': 'root',
'passwd': 'mysql',
'pool_size': 500
}
def transactional(func):
"""
service中的函數裝飾器,用來開啟事務和送出事務以及遇到錯誤時的事務復原
:param func: 需要被裝飾的函數
:return:
"""
@wraps(func)
def wrapper(*args, **kwargs):
closeable = False
is_commit = True
if not hasattr(thread_local, 'session'):
print('start new transaction')
thread_local.session = DBUtil.session()
closeable = True
try:
return func(*args, **kwargs)
except Exception as e:
is_commit = False
print('%s', e)
raise
finally:
if closeable:
print('close transaction, is_commit=%s', str(is_commit))
if is_commit:
thread_local.session.commit()
else:
thread_local.session.rollback()
thread_local.session.close()
del thread_local.session
return wrapper
def synchronized(func):
func.__lock__ = threading.Lock()
def lock_func(*args, **kwargs):
with func.__lock__:
return func(*args, **kwargs)
return lock_func
class BaseService:
"""
*Service的基類,單例模式
"""
_instance = None
@synchronized
def __new__(cls, *args, **kwargs):
if not cls._instance:
cls._instance = super(BaseService, cls).__new__(cls)
return cls._instance
def __getattr__(self, item):
if item == 'session':
return thread_local.session
class DBUtil:
db_config = 'mysql+mysqlconnector://' \
+ DB_CONF['user'] + ':'\
+ DB_CONF['passwd'] + '@'\
+ DB_CONF['host'] + ':'\
+ str(DB_CONF['port']) + '/'\
+ DB_CONF['database']\
+ '?charset=utf8'
engine = create_engine(db_config, encoding='utf-8', convert_unicode=True, pool_size=DB_CONF['pool_size'],
pool_pre_ping=True)
DBSession = sessionmaker(bind=engine, autocommit=False, autoflush=False)
@staticmethod
def session():
return DBUtil.DBSession()
@staticmethod
def conn():
return DBUtil.engine.connect()
# 第二步: 資料庫表模型
Base = declarative_base()
class TceVariableMapper(Base):
__tablename__ = 'tce_variable_mapper'
variable_id = Column(Integer, primary_key=True, autoincrement=True)
software_id = Column(Integer)
product_name = Column(String)
variable_name = Column(String)
variable_code = Column(String)
variable_type = Column(Integer)
input_limit = Column(String)
multiple_limit = Column(Integer, default=1)
unit = Column(String)
num = Column(Integer)
version_id = Column(Integer)
comment = Column(String)
state = Column(Integer, default=1)
update_time = Column(server_default=func.now(), onupdate=func.now())
class HH(BaseService):
"""
第三步: 連接配接資料庫,輸出結果
"""
@transactional
def func(self):
software_client_mapper = {}
client_software_list = self.session.query(TceVariableMapper) \
.join().filter(TceVariableMapper.variable_id > 300).all()
for client_software in client_software_list:
software_client_mapper[client_software.variable_id] = client_software.product_name
print(software_client_mapper)
if __name__ == '__main__':
hah = HH()
hah.func()
列印結果
{512: ‘Web應用防火牆(WAF)’, 513: ‘Web應用防火牆(WAF)’}