感謝朋友支援本部落格,歡迎共同探讨交流,由于能力和時間有限,錯誤之處在所難免,歡迎指正!
如果轉載,請保留作者資訊。
部落格位址:http://blog.csdn.net/gaoxingnengjisuan
郵箱位址:[email protected]
PS:最近沒有登入部落格,很多朋友的留言沒有看見,這裡道歉!還有就是本人較少上QQ,可以郵件交流。
概述部分:
容器資料更新守護程序;
周遊所有裝置下所有容器下的DB檔案,
如果DB檔案指定的容器發生資料更新,
通過HTTP協定應用PUT方法實作報告container資訊給相應的account服務,
在account中實作相應的更新資訊操作;
源碼解析部分:
下面是這部分代碼的主要執行流程,代碼中較重要的部分已經進行了相關的注釋;
from swift.container.updater import ContainerUpdater
from swift.common.utils import parse_options
from swift.common.daemon import run_daemon
if __name__ == '__main__':
conf_file, options = parse_options(once=True)
run_daemon(ContainerUpdater, conf_file, **options)
def run_once(self, *args, **kwargs):
"""
周遊所有裝置下所有容器下的DB檔案,如果DB檔案指定的容器發生資料更新,
通過HTTP協定應用PUT方法實作報告container資訊給相應的account服務,
在account中實作相應的更新資訊操作;
"""
patcher.monkey_patch(all=False, socket=True)
self.logger.info(_('Begin container update single threaded sweep'))
begin = time.time()
self.no_changes = 0
self.successes = 0
self.failures = 0
# get_paths:擷取所有裝置下容器下的所有分區的具體路徑;
# 周遊所有容器相關的所有分區的具體路徑;
for path in self.get_paths():
# 周遊path下所有檔案(樹形周遊),查找DB檔案,并調用方法process_container對檔案進行處理;
# 如果DB檔案指定的容器發生資料更新,通過HTTP協定應用PUT方法實作報告container資訊給相應的account服務;
# 方法process_container實作了對container進行處理,并更新它在對應account中的資訊;
self.container_sweep(path)
elapsed = time.time() - begin
self.logger.info(_(
'Container update single threaded sweep completed: '
'%(elapsed).02fs, %(success)s successes, %(fail)s failures, '
'%(no_change)s with no changes'),
{'elapsed': elapsed, 'success': self.successes,
'fail': self.failures, 'no_change': self.no_changes})
dump_recon_cache({'container_updater_sweep': elapsed},
self.rcache, self.logger)
1.擷取所有裝置下容器(containers)下的所有分區的具體路徑;
2.周遊所有分區的具體路徑,調用方法container_sweep實作擷取分區下的DB檔案,如果DB檔案指定的容器發生資料更新,通過HTTP協定應用PUT方法實作報告container資訊給相應的account服務,在account中實作相應的更新資訊操作;
轉到2,來看方法container_sweep的實作:
def container_sweep(self, path):
"""
周遊path下所有檔案(樹形周遊),查找DB檔案,并調用方法process_container對檔案進行處理;
如果DB檔案指定的容器發生資料更新,通過HTTP協定應用PUT方法實作報告container資訊給相應的account服務;
方法process_container實作了對container進行處理,并更新它在對應account中的資訊;
"""
for root, dirs, files in os.walk(path):
for file in files:
if file.endswith('.db'):
# process_container:對container進行處理,并更新它在對應account中的資訊;
# 如果資料庫檔案指定的容器發生資料更新,通過HTTP協定應用PUT方法實作報告container資訊給相應的account服務;
self.process_container(os.path.join(root, file))
time.sleep(self.slowdown)
查找給定分區目錄下所有的以.db為字尾的檔案,并調用方法process_container實作對container進行處理,并更新它在對應account中的資訊;如果資料庫檔案指定的容器發生資料更新,通過HTTP協定應用PUT方法實作報告container資訊給相應的account服務;
來看方法process_container的實作:
def process_container(self, dbfile):
"""
對container進行處理,并更新它在對應account中的資訊;
如果容器發生資料更新,通過HTTP協定應用PUT方法實作報告container資訊給相應的account服務;
"""
start_time = time.time()
# 容器資料庫的封裝類實作;
broker = ContainerBroker(dbfile, logger=self.logger)
# get_info:擷取container的全局資料;
# 包括account/container/created_at/put_timestamp/delete_timestamp/object_count/bytes_used/reported_put_timestamp/reported_delete_timestamp/reported_object_count/reported_bytes_used/hash/id等資訊;
info = broker.get_info()
# Don't send updates if the container was auto-created since it
# definitely doesn't have up to date statistics.
if float(info['put_timestamp']) <= 0:
return
if self.account_suppressions.get(info['account'], 0) > time.time():
return
# 如果滿足下述條件,說明容器發生資料更新;
# 通過HTTP協定應用PUT方法實作報告container資訊給account服務;
if info['put_timestamp'] > info['reported_put_timestamp'] or info['delete_timestamp'] > info['reported_delete_timestamp'] \
or info['object_count'] != info['reported_object_count'] or info['bytes_used'] != info['reported_bytes_used']:
container = '/%s/%s' % (info['account'], info['container'])
# 擷取容器對應的account的相關分區和節點資訊;
# 傳回元組(分區,節點資訊清單);
# 在節點資訊清單中至少包含id、weight、zone、ip、port、device、meta;
part, nodes = self.get_account_ring().get_nodes(info['account'])
# 在綠色線程中執行方法container_report,實作報告container資訊給account服務;
# container_report:通過HTTP協定應用PUT方法實作報告container資訊給account服務;
events = [spawn(self.container_report, node, part, container,
info['put_timestamp'], info['delete_timestamp'],
info['object_count'], info['bytes_used'])
for node in nodes]
successes = 0
for event in events:
if is_success(event.wait()):
successes += 1
if successes >= quorum_size(len(events)):
self.logger.increment('successes')
self.successes += 1
self.logger.debug(_('Update report sent for %(container)s %(dbfile)s'),
{'container': container, 'dbfile': dbfile})
# reported:更新資料庫中container_stat表的reported_put_timestamp、reported_delete_timestamp、reported_object_count和reported_bytes_used;
broker.reported(info['put_timestamp'],
info['delete_timestamp'], info['object_count'],
info['bytes_used'])
else:
self.logger.increment('failures')
self.failures += 1
self.logger.debug(_('Update report failed for %(container)s %(dbfile)s'), {'container': container, 'dbfile': dbfile})
self.account_suppressions[info['account']] = until = \
time.time() + self.account_suppression_time
if self.new_account_suppressions:
print >>self.new_account_suppressions, info['account'], until
# Only track timing data for attempted updates:
self.logger.timing_since('timing', start_time)
else:
self.logger.increment('no_changes')
self.no_changes += 1
2.1.根據指定的資料庫檔案(字尾為.db的檔案),實作擷取container的全局資料資訊;
2.2.根據container的全局資料資訊中若幹屬性的比較,判斷該容器是否發生了資料的更新;
2.3 如果容器發生了資料更新,調用方法get_nodes擷取容器所屬account的分區号和副本節點;
2.4.針對容器所屬account的每一個副本節點,調用方法container_report實作通過HTTP協定應用PUT方法實作報告container資訊給account服務;
注:這裡就是處理一個分區上所有的容器,首先驗證容器是否發生資料更新,如果容器發生資料更新,則先擷取該容器所屬賬戶的分區,和所有賬戶的副本節點,再周遊每個副本節點,實作把容器資料更新的資訊報告給賬戶的每一個副本節點;
轉到2.4,來看方法container_report的實作:
def container_report(self, node, part, container, put_timestamp, delete_timestamp, count, bytes):
"""
通過HTTP協定應用PUT方法實作報告container資訊給account服務;
"""
with ConnectionTimeout(self.conn_timeout):
try:
headers = {
'X-Put-Timestamp': put_timestamp,
'X-Delete-Timestamp': delete_timestamp,
'X-Object-Count': count,
'X-Bytes-Used': bytes,
'X-Account-Override-Deleted': 'yes',
'user-agent': self.user_agent}
# 通過HTTP協定應用PUT方法實作報告container資訊給account服務;
# 建立一個HTTPConnection類的對象;
# 發出的HTTP請求的方法'PUT';
# 傳回HTTPConnection連接配接對象;
conn = http_connect(node['ip'], node['port'], node['device'], part, 'PUT', container, headers=headers)
except (Exception, Timeout):
self.logger.exception(_(
'ERROR account update failed with '
'%(ip)s:%(port)s/%(device)s (will retry later): '), node)
return HTTP_INTERNAL_SERVER_ERROR
with Timeout(self.node_timeout):
try:
# 擷取來自伺服器的響應;
resp = conn.getresponse()
resp.read()
return resp.status
except (Exception, Timeout):
if self.logger.getEffectiveLevel() <= logging.DEBUG:
self.logger.exception(_('Exception with %(ip)s:%(port)s/%(device)s'), node)
return HTTP_INTERNAL_SERVER_ERROR
finally:
conn.close()