天天看點

Swift源碼分析----swift-container-updater

感謝朋友支援本部落格,歡迎共同探讨交流,由于能力和時間有限,錯誤之處在所難免,歡迎指正!

如果轉載,請保留作者資訊。

部落格位址:http://blog.csdn.net/gaoxingnengjisuan

郵箱位址:[email protected]

PS:最近沒有登入部落格,很多朋友的留言沒有看見,這裡道歉!還有就是本人較少上QQ,可以郵件交流。

概述部分:

容器資料更新守護程序;

周遊所有裝置下所有容器下的DB檔案,

如果DB檔案指定的容器發生資料更新,

通過HTTP協定應用PUT方法實作報告container資訊給相應的account服務,

在account中實作相應的更新資訊操作;

源碼解析部分:

下面是這部分代碼的主要執行流程,代碼中較重要的部分已經進行了相關的注釋;

from swift.container.updater import ContainerUpdater
from swift.common.utils import parse_options
from swift.common.daemon import run_daemon

if __name__ == '__main__':
    conf_file, options = parse_options(once=True)
    run_daemon(ContainerUpdater, conf_file, **options)
           
def run_once(self, *args, **kwargs):
     """
     周遊所有裝置下所有容器下的DB檔案,如果DB檔案指定的容器發生資料更新,
     通過HTTP協定應用PUT方法實作報告container資訊給相應的account服務,
     在account中實作相應的更新資訊操作;
     """
    patcher.monkey_patch(all=False, socket=True)
    self.logger.info(_('Begin container update single threaded sweep'))
    begin = time.time()
    self.no_changes = 0
    self.successes = 0
    self.failures = 0
        
    # get_paths:擷取所有裝置下容器下的所有分區的具體路徑;
      # 周遊所有容器相關的所有分區的具體路徑;
    for path in self.get_paths():
           # 周遊path下所有檔案(樹形周遊),查找DB檔案,并調用方法process_container對檔案進行處理;
           # 如果DB檔案指定的容器發生資料更新,通過HTTP協定應用PUT方法實作報告container資訊給相應的account服務;
           # 方法process_container實作了對container進行處理,并更新它在對應account中的資訊;
        self.container_sweep(path)
    elapsed = time.time() - begin
    self.logger.info(_(
        'Container update single threaded sweep completed: '
        '%(elapsed).02fs, %(success)s successes, %(fail)s failures, '
        '%(no_change)s with no changes'),
        {'elapsed': elapsed, 'success': self.successes,
         'fail': self.failures, 'no_change': self.no_changes})
    dump_recon_cache({'container_updater_sweep': elapsed},
                     self.rcache, self.logger)
           

1.擷取所有裝置下容器(containers)下的所有分區的具體路徑;

2.周遊所有分區的具體路徑,調用方法container_sweep實作擷取分區下的DB檔案,如果DB檔案指定的容器發生資料更新,通過HTTP協定應用PUT方法實作報告container資訊給相應的account服務,在account中實作相應的更新資訊操作;

轉到2,來看方法container_sweep的實作:

def container_sweep(self, path):
    """
    周遊path下所有檔案(樹形周遊),查找DB檔案,并調用方法process_container對檔案進行處理;
    如果DB檔案指定的容器發生資料更新,通過HTTP協定應用PUT方法實作報告container資訊給相應的account服務;
    方法process_container實作了對container進行處理,并更新它在對應account中的資訊;
    """
    for root, dirs, files in os.walk(path):
        for file in files:
            if file.endswith('.db'):
                # process_container:對container進行處理,并更新它在對應account中的資訊;
                # 如果資料庫檔案指定的容器發生資料更新,通過HTTP協定應用PUT方法實作報告container資訊給相應的account服務;
                self.process_container(os.path.join(root, file))
                time.sleep(self.slowdown)
           

查找給定分區目錄下所有的以.db為字尾的檔案,并調用方法process_container實作對container進行處理,并更新它在對應account中的資訊;如果資料庫檔案指定的容器發生資料更新,通過HTTP協定應用PUT方法實作報告container資訊給相應的account服務;

來看方法process_container的實作:

def process_container(self, dbfile):
    """
    對container進行處理,并更新它在對應account中的資訊;
    如果容器發生資料更新,通過HTTP協定應用PUT方法實作報告container資訊給相應的account服務;
    """
    start_time = time.time()
        
    # 容器資料庫的封裝類實作;
    broker = ContainerBroker(dbfile, logger=self.logger)
    # get_info:擷取container的全局資料;
    # 包括account/container/created_at/put_timestamp/delete_timestamp/object_count/bytes_used/reported_put_timestamp/reported_delete_timestamp/reported_object_count/reported_bytes_used/hash/id等資訊;
    info = broker.get_info()
    # Don't send updates if the container was auto-created since it
    # definitely doesn't have up to date statistics.
    if float(info['put_timestamp']) <= 0:
        return
        
    if self.account_suppressions.get(info['account'], 0) > time.time():
        return
        
    # 如果滿足下述條件,說明容器發生資料更新;
    # 通過HTTP協定應用PUT方法實作報告container資訊給account服務;
    if info['put_timestamp'] > info['reported_put_timestamp'] or info['delete_timestamp'] > info['reported_delete_timestamp'] \
       or info['object_count'] != info['reported_object_count'] or info['bytes_used'] != info['reported_bytes_used']:
        container = '/%s/%s' % (info['account'], info['container'])
            
        # 擷取容器對應的account的相關分區和節點資訊;
        # 傳回元組(分區,節點資訊清單);
        # 在節點資訊清單中至少包含id、weight、zone、ip、port、device、meta;
        part, nodes = self.get_account_ring().get_nodes(info['account'])
            
        # 在綠色線程中執行方法container_report,實作報告container資訊給account服務;
        # container_report:通過HTTP協定應用PUT方法實作報告container資訊給account服務;
        events = [spawn(self.container_report, node, part, container,
                        info['put_timestamp'], info['delete_timestamp'],
                        info['object_count'], info['bytes_used'])
                  for node in nodes]
            
        successes = 0
        for event in events:
            if is_success(event.wait()):
                successes += 1
        if successes >= quorum_size(len(events)):
            self.logger.increment('successes')
            self.successes += 1
            self.logger.debug(_('Update report sent for %(container)s %(dbfile)s'),
                               {'container': container, 'dbfile': dbfile})
            # reported:更新資料庫中container_stat表的reported_put_timestamp、reported_delete_timestamp、reported_object_count和reported_bytes_used;
            broker.reported(info['put_timestamp'],
                            info['delete_timestamp'], info['object_count'],
                            info['bytes_used'])
        else:
            self.logger.increment('failures')
            self.failures += 1
            self.logger.debug(_('Update report failed for %(container)s %(dbfile)s'), {'container': container, 'dbfile': dbfile})
            self.account_suppressions[info['account']] = until = \
                time.time() + self.account_suppression_time
            if self.new_account_suppressions:
                print >>self.new_account_suppressions, info['account'], until
        # Only track timing data for attempted updates:
        self.logger.timing_since('timing', start_time)
    else:
        self.logger.increment('no_changes')
        self.no_changes += 1
           

2.1.根據指定的資料庫檔案(字尾為.db的檔案),實作擷取container的全局資料資訊;

2.2.根據container的全局資料資訊中若幹屬性的比較,判斷該容器是否發生了資料的更新;

2.3 如果容器發生了資料更新,調用方法get_nodes擷取容器所屬account的分區号和副本節點;

2.4.針對容器所屬account的每一個副本節點,調用方法container_report實作通過HTTP協定應用PUT方法實作報告container資訊給account服務;

注:這裡就是處理一個分區上所有的容器,首先驗證容器是否發生資料更新,如果容器發生資料更新,則先擷取該容器所屬賬戶的分區,和所有賬戶的副本節點,再周遊每個副本節點,實作把容器資料更新的資訊報告給賬戶的每一個副本節點;

轉到2.4,來看方法container_report的實作:

def container_report(self, node, part, container, put_timestamp, delete_timestamp, count, bytes):
    """
    通過HTTP協定應用PUT方法實作報告container資訊給account服務;
    """
    with ConnectionTimeout(self.conn_timeout):
        try:
            headers = {
                'X-Put-Timestamp': put_timestamp,
                'X-Delete-Timestamp': delete_timestamp,
                'X-Object-Count': count,
                'X-Bytes-Used': bytes,
                'X-Account-Override-Deleted': 'yes',
                'user-agent': self.user_agent}
                
            # 通過HTTP協定應用PUT方法實作報告container資訊給account服務;
            # 建立一個HTTPConnection類的對象;
            # 發出的HTTP請求的方法'PUT';
            # 傳回HTTPConnection連接配接對象;
            conn = http_connect(node['ip'], node['port'], node['device'], part, 'PUT', container, headers=headers)
        except (Exception, Timeout):
            self.logger.exception(_(
                'ERROR account update failed with '
                '%(ip)s:%(port)s/%(device)s (will retry later): '), node)
            return HTTP_INTERNAL_SERVER_ERROR
    with Timeout(self.node_timeout):
        try:
            # 擷取來自伺服器的響應;
            resp = conn.getresponse()
            resp.read()
            return resp.status
        except (Exception, Timeout):
            if self.logger.getEffectiveLevel() <= logging.DEBUG:
                self.logger.exception(_('Exception with %(ip)s:%(port)s/%(device)s'), node)
            return HTTP_INTERNAL_SERVER_ERROR
        finally:
            conn.close()