感謝朋友支援本部落格,歡迎共同探讨交流,由于能力和時間有限,錯誤之處在所難免,歡迎指正!
如果轉載,請保留作者資訊。
部落格位址:http://blog.csdn.net/gaoxingnengjisuan
郵箱位址:[email protected]
PS:最近沒有登入部落格,很多朋友的留言沒有看見,這裡道歉!還有就是本人較少上QQ,可以郵件交流。
接續上一篇部落格:
def reap_container(self, account, account_partition, account_nodes, container):
"""
實作收割container操作;
實作删除容器container下資料和容器container本身;
當執行删除一個單獨的object出現異常的時候,程序将會繼續執行删除container中其它object的操作,
删除失敗的object将會在下一次這個方法被調用的時候繼續嘗試删除操作;
當擷取要删除的object清單出現異常的時候,程序将會停止(但是也将會在下一次這個方法被調用的時候繼續嘗試這個操作);
如果所有object已經被删除,将會通過發送一個删除請求到所有的container節點,來實作删除container本身;
每個container服務的删除将會更新對應的account服務,且從account清單删除container;
"""
account_nodes = list(account_nodes)
# get_container_ring:擷取swift.common.ring.Ring對象,名稱為'container';
# 為account/container/object擷取分區和節點資訊;
# 傳回元組(分區,節點資訊清單);
# 在節點資訊清單中至少包含id、weight、zone、ip、port、device、meta;
part, nodes = self.get_container_ring().get_nodes(account, container)
node = nodes[-1]
# 綠色線程的連接配接池類;
pool = GreenPool(size=self.object_concurrency)
marker = ''
# 删除從container中擷取的所有object;
while True:
objects = None
try:
# direct_get_container:發送調用'GET'方法的請求,實作從容器服務(器)直接擷取容器中内容的清單;
# objects擷取所喲的object;
objects = direct_get_container(
node, part, account, container,
marker=marker,
conn_timeout=self.conn_timeout,
response_timeout=self.node_timeout)[1]
self.stats_return_codes[2] = self.stats_return_codes.get(2, 0) + 1
self.logger.increment('return_codes.2')
except ClientException as err:
if self.logger.getEffectiveLevel() <= DEBUG:
self.logger.exception(_('Exception with %(ip)s:%(port)s/%(device)s'), node)
self.stats_return_codes[err.http_status / 100] = self.stats_return_codes.get(err.http_status / 100, 0) + 1
self.logger.increment('return_codes.%d' % (err.http_status / 100,))
if not objects:
break
# 删除從container中擷取的所有object;
try:
for obj in objects:
if isinstance(obj['name'], unicode):
obj['name'] = obj['name'].encode('utf8')
# 在綠色線程中執行方法reap_object;
# reap_object:通過一個發送到每個節點的删除object請求,執行删除給定的obj['name'];
# 執行删除請求,每個object服務将會更新删除相應的容器伺服器;
# 并從container清單删除這個object;
pool.spawn(self.reap_object, account, container, part, nodes, obj['name'])
pool.waitall()
except (Exception, Timeout):
self.logger.exception(_('Exception with objects for container '
'%(container)s for account %(account)s'),
{'container': container, 'account': account})
marker = objects[-1]['name']
if marker == '':
break
successes = 0
failures = 0
# 在所有相關節點中,删除container的相關資訊;
for node in nodes:
anode = account_nodes.pop()
try:
# direct_delete_container:發送調用'DELETE'方法的請求,實作從account服務(器)直接删除container;
direct_delete_container(
node, part, account, container,
conn_timeout=self.conn_timeout,
response_timeout=self.node_timeout,
headers={'X-Account-Host': '%(ip)s:%(port)s' % anode,
'X-Account-Partition': str(account_partition),
'X-Account-Device': anode['device'],
'X-Account-Override-Deleted': 'yes'})
successes += 1
self.stats_return_codes[2] = self.stats_return_codes.get(2, 0) + 1
self.logger.increment('return_codes.2')
except ClientException as err:
if self.logger.getEffectiveLevel() <= DEBUG:
self.logger.exception(_('Exception with %(ip)s:%(port)s/%(device)s'), node)
failures += 1
self.logger.increment('containers_failures')
self.stats_return_codes[err.http_status / 100] = self.stats_return_codes.get(err.http_status / 100, 0) + 1
self.logger.increment('return_codes.%d' % (err.http_status / 100,))
if successes > failures:
self.stats_containers_deleted += 1
self.logger.increment('containers_deleted')
elif not successes:
self.stats_containers_remaining += 1
self.logger.increment('containers_remaining')
else:
self.stats_containers_possibly_remaining += 1
self.logger.increment('containers_possibly_remaining')
1.擷取指定account指定container的分區号和所有副本所在節點資訊,從節點清單中擷取一個節點(因為所有節點上的副本資訊都是一緻的);
2.調用方法direct_get_container,實作發送調用'GET'方法的請求,實作從容器服務(器)直接擷取指定容器下的所有對象清單;
3.周遊所有的對象,針對每一個對象調用方法reap_object實作從對象伺服器删除指定對象的資料資訊;
4.周遊container的所有副本所在節點,針對每一個節點調用方法direct_delete_container實作從服務(器)直接删除container相關資料資訊(比如中繼資料資訊資料庫資訊等等);
轉到3,來看方法reap_object的實作:
def reap_object(self, account, container, container_partition, container_nodes, obj):
"""
實作收割object操作;
通過一個發送到每個節點的删除object請求,執行删除給定的object;
執行删除請求,每個object服務将會更新删除相應的容器伺服器;
并從container清單删除這個object;
"""
# container副本相關節點;
container_nodes = list(container_nodes)
# 為account/container/object擷取分區和節點資訊;
# 傳回元組(分區,節點資訊清單);
# 在節點資訊清單中至少包含id、weight、zone、ip、port、device、meta;
# get_object_ring:擷取swift.common.ring.Ring對象,名稱為'object';
part, nodes = self.get_object_ring().get_nodes(account, container, obj)
successes = 0
failures = 0
for node in nodes:
cnode = container_nodes.pop()
try:
# 建立一個HTTPConnection類的對象;
# 發出的HTTP請求的方法'DELETE'到伺服器;
# 直接從對象服務(器)删除對象;
# 擷取來自伺服器的響應;
direct_delete_object(
node, part, account, container, obj,
conn_timeout=self.conn_timeout,
response_timeout=self.node_timeout,
headers={'X-Container-Host': '%(ip)s:%(port)s' % cnode,
'X-Container-Partition': str(container_partition),
'X-Container-Device': cnode['device']})
successes += 1
self.stats_return_codes[2] = self.stats_return_codes.get(2, 0) + 1
self.logger.increment('return_codes.2')
except ClientException as err:
if self.logger.getEffectiveLevel() <= DEBUG:
self.logger.exception(_('Exception with %(ip)s:%(port)s/%(device)s'), node)
failures += 1
self.logger.increment('objects_failures')
self.stats_return_codes[err.http_status / 100] = self.stats_return_codes.get(err.http_status / 100, 0) + 1
self.logger.increment('return_codes.%d' % (err.http_status / 100,))
if successes > failures:
self.stats_objects_deleted += 1
self.logger.increment('objects_deleted')
elif not successes:
self.stats_objects_remaining += 1
self.logger.increment('objects_remaining')
else:
self.stats_objects_possibly_remaining += 1
self.logger.increment('objects_possibly_remaining')
3.1.擷取指定account指定container指定object的分區号和所有副本所在節點資訊;
3.2.周遊所有副本所在節點,針對每一個節點調用方法direct_delete_object實作發送調用'DELETE'方法的請求,實作從對象服務(器)直接删除指定對象資料;
轉到3.2,來看方法direct_delete_object的實作:
def direct_delete_object(node, part, account, container, obj, conn_timeout=5, response_timeout=15, headers=None):
"""
直接從對象服務(器)删除對象;
建立一個HTTPConnection類的對象;
發出的HTTP請求的方法'DELETE'到伺服器;
直接從對象服務(器)删除對象;
擷取來自伺服器的響應;
"""
if headers is None:
headers = {}
path = '/%s/%s/%s' % (account, container, obj)
with Timeout(conn_timeout):
# 建立一個HTTPConnection類的對象;
# 發出的HTTP請求的方法'DELETE';
# 傳回HTTPConnection連接配接對象;
conn = http_connect(node['ip'], node['port'], node['device'], part,
'DELETE', path, headers=gen_headers(headers, True))
with Timeout(response_timeout):
# getresponse:擷取來自伺服器的響應;
resp = conn.getresponse()
resp.read()
if not is_success(resp.status):
raise ClientException(
'Object server %s:%s direct DELETE %s gave status %s' %
(node['ip'], node['port'],
repr('/%s/%s%s' % (node['device'], part, path)),
resp.status),
http_host=node['ip'], http_port=node['port'],
http_device=node['device'], http_status=resp.status,
http_reason=resp.reason)
轉到4,來看方法direct_delete_container的實作:
def direct_delete_container(node, part, account, container, conn_timeout=5, response_timeout=15, headers=None):
"""
發送調用'DELETE'方法的請求,實作從容器服務(器)直接删除container相關資料;
"""
if headers is None:
headers = {}
path = '/%s/%s' % (account, container)
with Timeout(conn_timeout):
conn = http_connect(node['ip'], node['port'], node['device'], part,
'DELETE', path, headers=gen_headers(headers, True))
with Timeout(response_timeout):
resp = conn.getresponse()
resp.read()
if not is_success(resp.status):
raise ClientException(
'Container server %s:%s direct DELETE %s gave status %s' %
(node['ip'], node['port'],
repr('/%s/%s%s' % (node['device'], part, path)), resp.status),
http_host=node['ip'], http_port=node['port'],
http_device=node['device'], http_status=resp.status,
http_reason=resp.reason)