感謝朋友支援本部落格,歡迎共同探讨交流,由于能力和時間有限,錯誤之處在所難免,歡迎指正!
如果轉載,請保留作者資訊。
部落格位址:http://blog.csdn.net/gaoxingnengjisuan
郵箱位址:[email protected]
PS:最近沒有登入部落格,很多朋友的留言沒有看見,這裡道歉!還有就是本人較少上QQ,可以郵件交流。
def object_audit(self, location):
"""
Audits the given object location.
對partition進行一些檢查 如果有問題抛出相應的異常;
如果抛AuditException這樣的異常,說明partition出現問題,需要隔離,然後同步;
檢查檔案的完整性,該方法封裝了obj server的DiskFile類;
該類有一個_handle_close_quarantine方法,用來檢測檔案是否需要被隔離;
如果發現損壞,則直接将檔案移動到隔離目錄下;
1 對于location确定的對象資料進行檢測,來判斷檔案是否損壞,檢測方法包括:
檢測檔案長度和讀取檔案長度值是否相同,以及通過檢測etag值是否相同;
2 在檔案損壞的情況下,設定損壞對象檔案的哈希值為空;
3 移動損壞對象檔案到隔離區域;
"""
def raise_dfq(msg):
raise DiskFileQuarantined(msg)
try:
# 管理磁盤上的object檔案類;
# 用來初始化object,其中包括了一切對于一個object的操作;
# get_diskfile_from_audit_location:擷取磁盤檔案的具體路徑;
# 擷取由audit_location =(path, device, partition)所确定的具體路徑;
# 應該就是指定對象的具體路徑;
df = self.diskfile_mgr.get_diskfile_from_audit_location(location)
with df.open():
# 擷取對象中繼資料字典;
metadata = df.get_metadata()
# 擷取對象的大小;
obj_size = int(metadata['Content-Length'])
if self.stats_sizes:
self.record_stats(obj_size)
# 對象資料沒有被損壞;
if self.zero_byte_only_at_fps and obj_size:
self.passes += 1
return
# df.reader方法在後面for循環中會調用類class DiskFileReader下的方法__iter__,
# 在這個方法中會進一步調用方法close,而在close方法中會實作以下步驟:
# 通過檢測檔案長度和讀取檔案長度值是否相同,以及通過檢測etag值是否相同,
# 來判斷檔案是否損壞,是否需要被隔離;
# 在檔案損壞的情況下,設定損壞對象檔案的哈希值為空;
# 并移動損壞對象檔案到隔離區域,以便後續通過複制操作實作損壞檔案的恢複;
# 關閉打開的檔案fp;
reader = df.reader(_quarantine_hook=raise_dfq)
with closing(reader):
for chunk in reader:
chunk_len = len(chunk)
self.bytes_running_time = ratelimit_sleep(
self.bytes_running_time,
self.max_bytes_per_second,
incr_by=chunk_len)
self.bytes_processed += chunk_len
self.total_bytes_processed += chunk_len
except DiskFileNotExist:
return
except DiskFileQuarantined as err:
self.quarantines += 1
self.logger.error(_('ERROR Object %(obj)s failed audit and was'
' quarantined: %(err)s'),
{'obj': location, 'err': err})
self.passes += 1
1.調用方法get_diskfile_from_audit_location擷取指定對象的具體路徑;
2.打開指定對象檔案,擷取其中繼資料資訊,并且從中繼資料中擷取指定對象的大小;
3.通過變量zero_byte_only_at_fps和obj_size判斷指定對象資料是否被損壞,如果沒有被損壞直接傳回;
4.對打開的對象調用reader方法,reader方法在後面for循環中會調用類class DiskFileReader下的方法
__iter__,在這個方法中會進一步調用方法close,而在close方法中會實作以下步驟:
4.1 通過檢測檔案長度和讀取檔案長度值是否相同,以及通過檢測etag值是否相同,來判斷檔案是否損壞,是否需要被隔離;
4.2 在檔案損壞的情況下,設定損壞對象檔案的哈希值為空;
4.3 并移動損壞對象檔案到隔離區域,以便後續通過複制操作實作損壞檔案的恢複;
4.4 關閉打開的指定對象檔案fp;
def reader(self, keep_cache=False, _quarantine_hook=lambda m: None):
dr = DiskFileReader(
self._fp, self._data_file, int(self._metadata['Content-Length']),
self._metadata['ETag'], self._threadpool, self._disk_chunk_size,
self._mgr.keep_cache_size, self._device_path, self._logger,
quarantine_hook=_quarantine_hook, keep_cache=keep_cache)
self._fp = None
return dr
def __iter__(self):
"""Returns an iterator over the data file."""
try:
......
finally:
# 這裡的close方法:
# 通過檢測檔案長度和讀取檔案長度值是否相同,以及通過檢測etag值是否相同,
# 來判斷檔案是否損壞,是否需要被隔離;
# 在檔案損壞的情況下,設定損壞對象檔案的哈希值為空;
# 并移動損壞對象檔案到隔離區域,以便後續通過複制操作實作損壞檔案的恢複;
# 關閉打開的檔案fp;
if not self._suppress_file_closing:
self.close()
def close(self):
"""
Close the open file handle if present.
For this specific implementation, this method will handle quarantining
the file if necessary.
通過檢測檔案長度和讀取檔案長度值是否相同,以及通過檢測etag值是否相同,
來判斷檔案是否損壞,是否需要被隔離;
在檔案損壞的情況下,設定損壞對象檔案的哈希值為空;
并移動損壞對象檔案到隔離區域,以便後續通過複制操作實作損壞檔案的恢複;
關閉打開的檔案fp;
"""
if self._fp:
# 通過檢測檔案長度和讀取檔案長度值是否相同,以及通過檢測etag值是否相同,
# 來判斷檔案是否損壞,是否需要被隔離;
# 在檔案損壞的情況下,設定損壞對象檔案的哈希值為空;
# 并移動損壞對象檔案到隔離區域,以便後續通過複制操作實作損壞檔案的恢複;
try:
if self._started_at_0 and self._read_to_eof:
self._handle_close_quarantine()
except DiskFileQuarantined:
raise
except (Exception, Timeout) as e:
self._logger.error(_(
'ERROR DiskFile %(data_file)s'
' close failure: %(exc)s : %(stack)s'),
{'exc': e, 'stack': ''.join(traceback.format_stack()),
'data_file': self._data_file})
finally:
fp, self._fp = self._fp, None
fp.close()
def _handle_close_quarantine(self):
"""
Check if file needs to be quarantined
通過檢測檔案長度和讀取檔案長度值是否相同,以及通過檢測etag值是否相同,
來判斷檔案是否損壞,是否需要被隔離;
在檔案損壞的情況下,設定損壞對象檔案的哈希值為空;
并移動損壞對象檔案到隔離區域,以便後續通過複制操作實作損壞檔案的恢複;
"""
# 如果檔案的長度和讀取檔案長度不相同,則為_quarantine方法傳入的是讀取的長度不比對;
# 如果etag不相同,則為_quarantine方法傳入的是md5值不比對;
if self._bytes_read != self._obj_size:
self._quarantine(
"Bytes read: %s, does not match metadata: %s" % (
self._bytes_read, self._obj_size))
elif self._iter_etag and self._etag != self._iter_etag.hexdigest():
self._quarantine(
"ETag %s and file's md5 %s do not match" % (
self._etag, self._iter_etag.hexdigest()))
def _quarantine(self, msg):
"""
在檔案損壞的情況下,設定損壞對象檔案的哈希值為空;
并移動損壞對象檔案到隔離區域,以便後續通過複制操作實作損壞檔案的恢複;
"""
# 在檔案損壞的情況下,設定損壞對象檔案的哈希值為空;
# 并移動損壞對象檔案到隔離區域,以便後續通過複制操作實作損壞檔案的恢複;
self._quarantined_dir = self._threadpool.run_in_thread(
quarantine_renamer, self._device_path, self._data_file)
self._logger.warn("Quarantined object %s: %s" % (self._data_file, msg))
self._logger.increment('quarantines')
self._quarantine_hook(msg)
def quarantine_renamer(device_path, corrupted_file_path):
"""
在檔案損壞的情況下,設定損壞對象檔案的哈希值為空;
并移動損壞對象檔案到隔離區域,以便後續通過複制操作實作損壞檔案的恢複;
"""
# 損壞檔案的路徑;
from_dir = dirname(corrupted_file_path)
# 檔案隔離區域的路徑;
to_dir = join(device_path, 'quarantined', 'objects', basename(from_dir))
# 設定損壞對象檔案的哈希值為空;
invalidate_hash(dirname(from_dir))
# 實作複制損壞對象檔案到隔離區域;
try:
renamer(from_dir, to_dir)
except OSError as e:
if e.errno not in (errno.EEXIST, errno.ENOTEMPTY):
raise
to_dir = "%s-%s" % (to_dir, uuid.uuid4().hex)
renamer(from_dir, to_dir)
return to_dir
def invalidate_hash(suffix_dir):
"""
Invalidates the hash for a suffix_dir in the partition's hashes file.
設定suffix_dir的哈希值為空;
"""
suffix = basename(suffix_dir)
partition_dir = dirname(suffix_dir)
hashes_file = join(partition_dir, HASH_FILE)
with lock_path(partition_dir):
try:
with open(hashes_file, 'rb') as fp:
hashes = pickle.load(fp)
if suffix in hashes and not hashes[suffix]:
return
except Exception:
return
hashes[suffix] = None
write_pickle(hashes, hashes_file, partition_dir, PICKLE_PROTOCOL)
def renamer(old, new):
"""
實作複制損壞對象檔案到隔離區域;
"""
try:
mkdirs(os.path.dirname(new))
os.rename(old, new)
except OSError:
mkdirs(os.path.dirname(new))
os.rename(old, new)