感謝朋友支援本部落格,歡迎共同探讨交流,由于能力和時間有限,錯誤之處在所難免,歡迎指正!
如果轉載,請保留作者資訊。
部落格位址:http://blog.csdn.net/gaoxingnengjisuan
郵箱位址:[email protected]
PS:最近沒有登入部落格,很多朋友的留言沒有看見,這裡道歉!還有就是本人較少上QQ,可以郵件交流。
概述部分:
實作複制指定分區(容器)資料到指定節點(用以實作資料副本之間的同步);
這裡定義的once=True,說明系統預設調用守護程序類Daemon中的run_once方法;
進而最終實作調用Replicator類中的run_once方法;
注:容器之間同步資料主要就是對形如object_file = /srv/node/node['device']/containers/partition/suffix/hsh****.db的資料庫檔案執行複制操作;
源碼解析部分:
下面是這部分代碼的主要執行流程,代碼中較重要的部分已經進行了相關的注釋;
from swift.container.replicator import ContainerReplicator
from swift.common.utils import parse_options
from swift.common.daemon import run_daemon
if __name__ == '__main__':
conf_file, options = parse_options(once=True)
run_daemon(ContainerReplicator, conf_file, **options)
from swift.container.backend import ContainerBroker, DATADIR
from swift.common import db_replicator
class ContainerReplicator(db_replicator.Replicator):
server_type = 'container'
brokerclass = ContainerBroker
datadir = DATADIR
default_port = 6001
def report_up_to_date(self, full_info):
for key in ('put_timestamp', 'delete_timestamp', 'object_count',
'bytes_used'):
if full_info['reported_' + key] != full_info[key]:
return False
return True
class Replicator(Daemon)----def run_once(self, *args, **kwargs):
"""
實作複制指定分區資料到指定節點(用以實作資料副本之間的同步);
資料類型可能是account或container或object;
"""
# 初始化若幹參數的操作;
# self.stats = {'attempted': 0, 'success': 0, 'failure': 0, 'ts_repl': 0,
# 'no_change': 0, 'hashmatch': 0, 'rsync': 0, 'diff': 0,
# 'remove': 0, 'empty': 0, 'remote_merge': 0,
# 'start': time.time(), 'diff_capped': 0}
self._zero_stats()
dirs = []
ips = whataremyips()
if not ips:
self.logger.error(_('ERROR Failed to get my own IPs?'))
return
# 擷取環上的裝置資訊;
for node in self.ring.devs:
if (node and node['replication_ip'] in ips and node['replication_port'] == self.port):
if self.mount_check and not ismount(os.path.join(self.root, node['device'])):
self.logger.warn(_('Skipping %(device)s as it is not mounted') % node)
continue
# 删除若幹過期檔案;
unlink_older_than(
os.path.join(self.root, node['device'], 'tmp'),
time.time() - self.reclaim_age)
datadir = os.path.join(self.root, node['device'], self.datadir)
if os.path.isdir(datadir):
dirs.append((datadir, node['id']))
self.logger.info(_('Beginning replication run'))
for part, object_file, node_id in roundrobin_datadirs(dirs):
# _replicate_object:複制指定分區資料到指定節點(用以實作資料副本之間的同步),具體步驟如下;
# 擷取指定分區所在的所有節點nodes(一個分區可能對應多個節點,因為可能有多個副本);
# 判斷node_id是否在nodes的範圍之内(這是合理的);
# 循環實作資料到各個目标節點上(的分區)的複制操作;
# 通過比較同步點和哈希值來判斷複制後的兩個版本是否是同步的,即複制操作是否成功;
self.cpool.spawn_n(self._replicate_object, part, object_file, node_id)
self.cpool.waitall()
self.logger.info(_('Replication run OVER'))
self._report_stats()
1.for node in self.ring.devs:從環上擷取所有裝置,周遊并執行以下操作:
通過IP位址判斷并擷取屬于本機的且已經挂載的裝置,并儲存設備對應的datadir = /srv/node/node['device']/containers和node['id']作為元素儲存在字典dirs中;
注:這裡實際上就是擷取屬于本機的裝置,且明确檔案路徑/srv/node/node['device']/containers(對應于容器);
2.循環周遊node['device']/containers下面的每一個檔案object_file(檔案路徑形如object_file = /srv/node/node['device']/containers/partition/suffix/hsh****.db,為容器中具體分區下的以.db為字尾的檔案),調用方法_replicate_object實作複制本地指定分區資料到指定節點(用以實作資料副本之間的同步);
注:其他部分的内容實作與swift-account-replicator的實作是一緻的,是以這裡不再進行贅述;