這個問題似乎很有趣,它給了我一個理由來研究Guppy / Heapy,因為我感謝你。
我嘗試了大約2個小時,讓Heapy做監控一個函數調用/程序,而不用零運作來修改它的源代碼。
我确實找到了一種使用内置的Python庫resource來完成任務的方式。請注意,該文檔不會顯示RU_MAXRSS值傳回的内容。另一個SO使用者noted,它在kB。在下面的測試代碼中運作Mac OSX 7.3并觀察我的系統資源,我相信傳回的值将以位元組為機關,而不是kBytes。
關于如何使用資源庫監視庫調用的一個10000ft的視圖是在單獨的(可監視)線程中啟動該函數,并在主線程中跟蹤該程序的系統資源。下面我有兩個你需要運作的檔案來測試它。
import resource
import time
from stoppable_thread import StoppableThread
class MyLibrarySniffingClass(StoppableThread):
def __init__(self, target_lib_call, arg1, arg2):
super(MyLibrarySniffingClass, self).__init__()
self.target_function = target_lib_call
self.arg1 = arg1
self.arg2 = arg2
self.results = None
def startup(self):
# Overload the startup function
print "Calling the Target Library Function..."
def cleanup(self):
# Overload the cleanup function
print "Library Call Complete"
def mainloop(self):
# Start the library Call
self.results = self.target_function(self.arg1, self.arg2)
# Kill the thread when complete
self.stop()
def SomeLongRunningLibraryCall(arg1, arg2):
max_dict_entries = 2500
delay_per_entry = .005
some_large_dictionary = {}
dict_entry_count = 0
while(1):
time.sleep(delay_per_entry)
dict_entry_count += 1
some_large_dictionary[dict_entry_count]=range(10000)
if len(some_large_dictionary) > max_dict_entries:
break
print arg1 + " " + arg2
return "Good Bye World"
if __name__ == "__main__":
# Lib Testing Code
mythread = MyLibrarySniffingClass(SomeLongRunningLibraryCall, "Hello", "World")
mythread.start()
start_mem = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
delta_mem = 0
max_memory = 0
memory_usage_refresh = .005 # Seconds
while(1):
time.sleep(memory_usage_refresh)
delta_mem = (resource.getrusage(resource.RUSAGE_SELF).ru_maxrss) - start_mem
if delta_mem > max_memory:
max_memory = delta_mem
# Uncomment this line to see the memory usuage during run-time
# print "Memory Usage During Call: %d MB" % (delta_mem / 1000000.0)
# Check to see if the library call is complete
if mythread.isShutdown():
print mythread.results
break;
print "\nMAX Memory Usage in MB: " + str(round(max_memory / 1000.0, 3))
import threading
import time
class StoppableThread(threading.Thread):
def __init__(self):
super(StoppableThread, self).__init__()
self.daemon = True
self.__monitor = threading.Event()
self.__monitor.set()
self.__has_shutdown = False
def run(self):
'''Overloads the threading.Thread.run'''
# Call the User's Startup functions
self.startup()
# Loop until the thread is stopped
while self.isRunning():
self.mainloop()
# Clean up
self.cleanup()
# Flag to the outside world that the thread has exited
# AND that the cleanup is complete
self.__has_shutdown = True
def stop(self):
self.__monitor.clear()
def isRunning(self):
return self.__monitor.isSet()
def isShutdown(self):
return self.__has_shutdown
###############################
### User Defined Functions ####
###############################
def mainloop(self):
'''
Expected to be overwritten in a subclass!!
Note that Stoppable while(1) is handled in the built in "run".
'''
pass
def startup(self):
'''Expected to be overwritten in a subclass!!'''
pass
def cleanup(self):
'''Expected to be overwritten in a subclass!!'''
pass