天天看點

mysqldume.py代碼分析

 可以根據實際需求修改,先來看看

#!/usr/bin/env python 

# -*- coding: utf-8 -*- 

__title__ = "mysqlpdump" 

__version__ = "0.5" 

__author__= "Carles Amigo" 

__email__= "fr3nd at fr3nd dot net" 

__website__= "http://www.fr3nd.net/projects/mysqlpdump" 

import threading, Queue 

import MySQLdb 

from optparse import OptionParser 

import commands 

import sys 

import os 

import gzip 

class Log: 

    """Simple class for logging""" 

    def __init__(self, verbose): 

        self.verbose = verbose 

    def log(self, line): 

        """Logs an especified line""" 

        if self.verbose: 

            sys.stderr.write (" - " + str(line) + "\n") 

class Database: 

    """Class to handle database connection""" 

    def __init__(self, log, mysqluser, mysqlpass, mysqlhost): 

        self.user = mysqluser 

        self.password = mysqlpass 

        self.host = mysqlhost 

        self.log = log 

        self.log.log("Connecting to database") 

        self.db=MySQLdb.connect(user=mysqluser,passwd=mysqlpass,host=mysqlhost) 

        self.cursor = self.db.cursor() 

    def close(self): 

        self.log.log("Closing database connection") 

        self.db.close() 

    def lock(self): 

        """Locks all tables for read/write""" 

        self.log.log("Locking all tables") 

        self.cursor.execute("FLUSH TABLES WITH READ LOCK;") 

    def unlock(self): 

        """Unlocks all tables in the database""" 

        self.log.log("Unlocking all tables") 

        self.cursor.execute("UNLOCK TABLES") 

    def get_databases(self, included, excluded): 

        """Return all the databases. Included and excluded databases can be specified""" 

        self.cursor.execute("show databases;") 

        result = self.cursor.fetchall() 

        databases = [] 

        for database in result: 

            if len(included) == 0: 

                if database[0] not in excluded: 

                    databases.append(database[0]) 

            else: 

                if (database[0] in included) and (database[0] not in excluded): 

        return databases 

    def get_tables(self, database): 

        """Return all tables for a given database""" 

        self.cursor.execute("show tables from " + str(database) + ";") 

        tables = [] 

        for table in result: 

            tables.append(table[0]) 

        return tables 

    def get_slave_status(self): 

        """Return slave status""" 

        self.cursor.execute("show slave status;") 

        return result 

    def get_change_master_to(self, slave_status): 

        try: 

            return "CHANGE MASTER TO MASTER_HOST=\'" + slave_status[0][1] + "\', MASTER_LOG_FILE=\'" + slave_status[0][5] + "\', MASTER_LOG_POS=" + str(slave_status[0][6]) + ";" 

        except: 

            return "" 

    def mysqldump(self, database, table, destination, custom_parameters="", stdout=False, gzip=False, mysqldump="/usr/bin/mysqldump"): 

        """Dumps a specified table. 

        It can dump it to a file or just return all the dumped data. 

        It can waste a lot of memory if its returning a big table.""" 

        default_parameters = "--skip-lock-tables" 

        cmd=mysqldump + " " + default_parameters 

        if custom_parameters != "": 

            cmd = cmd + " " + custom_parameters 

        cmd = cmd + " -u" + self.user + " -p" + self.password + " -h" + self.host + " " + database + " " + table 

        if stdout: 

            return commands.getstatusoutput(cmd) 

        else: 

            file = destination + "/" + database + "-" + table + ".sql" 

            if gzip: 

                cmd = cmd + " | gzip -c > " + file + ".gz" 

                cmd = cmd + " > " + file 

            os.system(cmd) 

            return (None, None) 

class Worker(threading.Thread): 

    def __init__(self, queue, log, db, event_dict, destination, custom_parameters="", stdout=False, gzip=False, ): 

        threading.Thread.__init__(self) 

        self.queue = queue 

        self.db = db 

        self.event_dict = event_dict 

        self.stdout = stdout 

        self.gzip = gzip 

        self.destination = destination 

        self.custom_parameters = custom_parameters 

    def run(self): 

        self.log.log("Worker " + self.getName() + " started") 

        while True: 

            try: 

                num, database, table = self.queue.get(True, 1) 

            except Queue.Empty: 

                break 

            self.event_dict[num] = threading.Event() 

            self.event_dict[num].clear() 

            self.log.log(self.getName() + " dumping " + database + " " + table) 

            status, output = self.db.mysqldump(database, table, custom_parameters=self.custom_parameters, stdout=self.stdout, gzip=self.gzip, destination=self.destination) 

            if self.stdout: 

                if num > 0: 

                    while not self.event_dict[num-1].isSet(): 

                        self.event_dict[num-1].wait() 

            self.log.log(self.getName() + " dumped " + database + " " + table) 

            if output: 

                print output 

            self.event_dict[num].set() 

def main(): 

    try: 

        current_user = os.getlogin() 

    except: 

        current_user = "nobody" 

    usage = "usage: %prog [options]\n Run mysqldump in paralel" 

    parser = OptionParser(usage, version=__version__) 

    parser.add_option("-v", "--verbose", action="store_true", dest="verbose", default=False, help="verbose output.") 

    parser.add_option("-u", "--user", action="store", dest="user", type="string", default=current_user, help="User for login.") 

    parser.add_option("-p", "--password", action="store", dest="password", type="string", default='', help="Password for login.") 

    parser.add_option("-H", "--host", action="store", dest="host", type="string", default='localhost', help="Connect to host.") 

    parser.add_option("-t", "--threads", action="store", dest="threads", type="int", default=5, help="Threads used. Default = 5") 

    parser.add_option("-s", "--stdout", action="store_true", dest="stdout", default=False, help="Output dumps to stdout instead to files. WARNING: It can exaust all your memory!") 

    parser.add_option("-g", "--gzip", action="store_true", dest="gzip", default=False, help="Add gzip compression to files.") 

    parser.add_option("-m", "--master-data", action="store_true", dest="master_data", default=False, help="This causes the binary log position and filename to be written to the file 00_master_data.sql.") 

    parser.add_option("-d", "--destination", action="store", dest="destination", type="string", default=".", help="Path where to store generated dumps.") 

    parser.add_option("-P", "--parameters", action="store", dest="parameters", type="string", default="", help="Pass parameters directly to mysqldump.") 

    parser.add_option("-i", "--include_database", action="append", dest="included_databases", default=[], help="Databases to be dumped. By default, all databases are dumped. Can be called more than one time.") 

    parser.add_option("-e", "--exclude_database", action="append", dest="excluded_databases", default=[], help="Databases to be excluded from the dump. No database is excluded by default. Can be called more than one time.") 

    (options, args) = parser.parse_args() 

    log = Log(options.verbose) 

        db = Database(log, options.user, options.password, options.host) 

        parser.error("Cannot connect to database") 

    db.lock() 

    queue = Queue.Queue() 

    x = 0 

    if options.master_data: 

        if options.gzip: 

            f=gzip.open(options.destination + '/00_master_data.sql.gz', 'w') 

            f=open(options.destination + '/00_master_data.sql', 'w') 

        f.write(db.get_change_master_to(db.get_slave_status())) 

        f.write('\n') 

        f.close() 

    for database in db.get_databases(options.included_databases, options.excluded_databases): 

        for table in db.get_tables(database): 

            queue.put([x,database,table]) 

            x = x + 1 

    event_dict = {} 

    threads = [] 

    for i in range(options.threads): 

        threads.append(Worker(queue, log, db, event_dict, custom_parameters=options.parameters, stdout=options.stdout, gzip=options.gzip, destination=options.destination)) 

        threads[x].setDaemon(True) 

        threads[x].start() 

        x = x + 1 

    # Wait for all threads to finish 

    for thread in threads: 

        thread.join() 

    db.unlock() 

    db.close() 

if __name__ == "__main__": 

    main() 

     本文轉自 珏石頭 51CTO部落格,原文連結:http://blog.51cto.com/gavinshaw/574638,如需轉載請自行聯系原作者

上一篇: 一鍵停服務