天天看點

k8s 叢集中,rbd 和 cephfs 資料的備份腳本

cat >> k8s-ceph-bak.py <<EOF
#!/usr/bin/env python
#coding:utf-8

from kubernetes import client, config
from executor import execute
import datetime, sys, os, shutil
import argparse
import base64
import json

if sys.version_info[0] < 3:
    import ConfigParser as configparser
else:
    import configparser

ceph_config_file = '/etc/ceph/ceph.conf' if os.getenv('SRC_CEPH_CONFIG_FILE') is None else os.getenv('SRC_CEPH_CONFIG_FILE')
ceph_keyring = '/etc/ceph/ceph.client.admin.keyring' if os.getenv('SRC_CEPH_KEYRING') is None else os.getenv('SRC_CEPH_KEYRING')
ceph_pool = 'kube' if os.getenv('CEPH_POOL') is None else os.getenv('CEPH_POOL')

restic_data_dir = '/data/ceph/restic'
backy2_data_dir = '/data/ceph/backy2'
restic_password = 'enhjYXNkMTIz'

def gen_backy2_conf():
    if not os.path.exists(backy2_data_dir):
        os.makedirs(backy2_data_dir)
    if not os.path.isfile(backy2_data_dir + '/backy.sqlite'):
        with open('/etc/backy.cfg','w') as f:
            f.write('''
[DEFAULTS]
logfile: /var/log/backy.log
block_size: 4194304
hash_function: sha512
lock_dir: /run
process_name: backy2
disallow_rm_when_younger_than_days: 6
[MetaBackend]
type: backy2.meta_backends.sql
engine: sqlite:///{backy2_data_dir}/backy.sqlite
[DataBackend]
type: backy2.data_backends.file
path: {backy2_data_dir}/data
simultaneous_writes: 5
simultaneous_reads: 5
[NBD]
cachedir: /tmp
[io_file]
simultaneous_reads: 5
[io_rbd]
ceph_conffile: /etc/ceph/ceph.conf
simultaneous_reads: 10
new_image_features:
    RBD_FEATURE_LAYERING
'''.format(backy2_data_dir=backy2_data_dir))
        # 初始化backy2 倉庫
        execute('backy2 initdb', capture=True, check=False, sudo=True)

def kube_v1():
    k8s_api = None

    if "KUBERNETES_SERVICE_HOST" in os.environ:
        config.load_incluster_config()
        k8s_api = client.CoreV1Api()
    else:
        try:
            config.load_kube_config()
            k8s_api = client.CoreV1Api()
        except FileNotFoundError:
            logger.info("No K8s")
            pass

    return k8s_api

def get_all_pv():
    k8s_api = kube_v1()
    all_pv = k8s_api.list_persistent_volume()
    return all_pv

def get_rbd_pv_from_namespace(namespace):
    all_pv = get_all_pv()
    rbd_pv = []
    for pv in all_pv.items:
        if pv.spec.claim_ref.namespace == namespace and pv.spec.storage_class_name == 'ceph-rbd':
            rbd_pv.append(pv)
    return rbd_pv

def get_cephfs_pv_from_namespace(namespace):
    all_pv = get_all_pv()
    cephfs_pv = []
    for pv in all_pv.items:
        if pv.spec.claim_ref.namespace == namespace and pv.spec.storage_class_name == 'cephfs':
            cephfs_pv.append(pv)
    return cephfs_pv

# Create rbd image snapshot
def create_rbd_image_snap(namespace):
    rbd_pv = get_rbd_pv_from_namespace(namespace)
    for pv in rbd_pv:
        rbd_pool = pv.spec.rbd.pool
        rbd_image_name = pv.spec.rbd.image
        snap_name = pv.spec.claim_ref.namespace + '-' + pv.spec.claim_ref.name + '-' + datetime.datetime.now().strftime('%Y%m%d%H%M%S')
        execute('rbd snap create {rbd_pool}/{rbd_image_name}@{snap_name}'.format(rbd_pool=rbd_pool, rbd_image_name=rbd_image_name, snap_name=snap_name), check=False, sudo=True)

# from snapshot restore
def restore_from_rbd_image_snap(namespace):
    rbd_pv = get_rbd_pv_from_namespace(namespace)
    for pv in rbd_pv:
        rbd_pool = pv.spec.rbd.pool
        rbd_image_name = pv.spec.rbd.image
        execute('rbd snap ls {rbd_pool}/{rbd_image_name}'.format(rbd_pool=rbd_pool, rbd_image_name=rbd_image_name), check=False, sudo=True)

# backup rbd image that use backy2
def backup_rbd_image(namespace):
    rbd_pv = get_rbd_pv_from_namespace(namespace)
    for pv in rbd_pv:
        rbd_pool = pv.spec.rbd.pool
        rbd_image_name = pv.spec.rbd.image
        bak_rbd_image_name = pv.spec.claim_ref.namespace + '-' + pv.spec.claim_ref.name
        execute('backy2 backup -t {rbd_pool}/{rbd_image_name} rbd://{rbd_pool}/{rbd_image_name} {bak_rbd_image_name}'.format(rbd_pool=rbd_pool, rbd_image_name=rbd_image_name, bak_rbd_image_name=bak_rbd_image_name), check=False, sudo=True)
    execute('backy2 ls', check=False, sudo=True)

def get_ceph_mon(ceph_conf_file):
    mon_str = execute('ceph mon_status', capture=True, check=False, sudo=True)
    mon_json = json.loads(mon_str)
    return mon_json['monmap']['mons'][0]['public_addr'].split(':')[0]

def get_ceph_admin_secret(ceph_keyring):
    ceph_admin_secret = execute('ceph auth get-key client.admin', capture=True, check=False, sudo=True)
    return ceph_admin_secret

# backup cephfs that use restic
def backup_cephfs(namespace):
    cephfs_pv = get_cephfs_pv_from_namespace(namespace)
    bak_repo = ''
    for pv in cephfs_pv:
        # 備份的tag
        bak_tag = pv.spec.claim_ref.namespace + '-' + pv.spec.claim_ref.name + '-' + datetime.datetime.now().strftime('%Y%m%d%H%M%S')
        # 備份的repo,根據命名空間來區分
        bak_repo = restic_data_dir + '/' + pv.spec.claim_ref.namespace

        # 資料存儲的具體路徑
        cephfs_dir_path = pv.spec.cephfs.path
        cephfs_mountpoint = '/mycephfs'
        bak_path = cephfs_mountpoint + cephfs_dir_path
        if not os.path.exists(cephfs_mountpoint):
            os.makedirs(cephfs_mountpoint)

        # 擷取一個ceph 叢集的monitor 位址
        ceph_mon_addr = get_ceph_mon(ceph_config_file)
        # 擷取通路ceph 叢集的admin secret
        ceph_admin_secret = get_ceph_admin_secret(ceph_keyring)
        # 挂載cephfs檔案系統
        #if execute('mount.ceph {ceph_mon_addr}:/ {cephfs_mountpoint} -o name=admin,secret={ceph_admin_secret}'.format(ceph_mon_addr=ceph_mon_addr, cephfs_mountpoint=cephfs_mountpoint, ceph_admin_secret=ceph_admin_secret), check=False, sudo=True):
        if execute('ceph-fuse -m {ceph_mon_addr}:6789 {cephfs_mountpoint}'.format(ceph_mon_addr=ceph_mon_addr, cephfs_mountpoint=cephfs_mountpoint), check=False, sudo=True):
            # 備份前先建立一個軟連接配接,以友善查找
            link_path = pv.spec.claim_ref.namespace + '-' + pv.spec.claim_ref.name
            os.chdir(cephfs_mountpoint)
            if os.path.exists(link_path):
                os.unlink(link_path)
            os.symlink(cephfs_dir_path.lstrip('/'), link_path)
            os.chdir('/')
            # 設定 restic 的repo 密碼
            os.environ['RESTIC_PASSWORD'] = base64.b64decode(restic_password)
            # 初始化restic 的repo, 如果repo 不存在的話
            if not os.path.exists(bak_repo):
                execute('restic init --repo {bak_repo}'.format(bak_repo=bak_repo), check=False, sudo=True)
            # 對指定命名空間的cephfs 形式的pvc 進行資料備份
            execute('restic -r {bak_repo} backup --tag {bak_tag} {bak_path}'.format(bak_repo=bak_repo, bak_tag=bak_tag, bak_path=bak_path), check=False, sudo=True)
            # 解除安裝cephfs 檔案系統
            execute('umount {cephfs_mountpoint}'.format(cephfs_mountpoint=cephfs_mountpoint), check=False, sudo=True)
    # 檢視備份的資料
    if os.path.exists(bak_repo):
        execute('restic -r {bak_repo} snapshots'.format(bak_repo=bak_repo), check=False, sudo=True)

# How many copies are kept up to date
def keep_last_backup_counts(num=3):
    # for backy2
    bak_diff_types = execute('backy2 -m ls | grep ^version | cut -d"|" -f3 | sort -u', capture=True, check=False, sudo=True).split('\n')[:-1]
    tail_num = num + 1
    backy2_uuid_list = []
    for bak_type_name in bak_diff_types:
        backy2_uuid_list.append(execute('backy2 -m ls |grep {bak_type_name} | sort -t"|" -k2r,2 | cut -d"|" -f7 | tail -n +{tail_num}'.format(bak_type_name=bak_type_name, tail_num=tail_num), capture=True, check=False, sudo=True))
    for uuid in backy2_uuid_list:
        if uuid != '':
            execute('backy2 rm -f {uuid} '.format(uuid=uuid), check=False, sudo=True)
            execute('backy2 cleanup -f', check=False, sudo=True)
    execute('backy2 ls', check=False, sudo=True)
    # for restic
    os.environ['RESTIC_PASSWORD'] = base64.b64decode(restic_password)
    for ns in os.listdir(restic_data_dir):
        execute('restic -r {bak_repo} forget --keep-last={num} --prune'.format(bak_repo=restic_data_dir + '/' + ns, num=num), check=False, sudo=True)

def parse_arguments(args=None):
    parser = argparse.ArgumentParser(description='Used to back up persistent data in the namespace specified in k8s')
    parser.add_argument('-a', '--all', dest='namespace', action="store", help='Specify the namespace in which persistent data needs to be backed up from cephfs and rbd image')
    parser.add_argument('-b', '--rbd', dest='rbd', action="store", help='Specifies the namespace in which persistent data needs to be backed up from rbd image')
    parser.add_argument('-f', '--cephfs', dest='cephfs', action="store", help='Specify the namespace in which persistent data needs to be backed up from cephfs')
    parser.add_argument('-c', '--counts',  dest='counts', action="store", type=int, help='Specifies how many copies to keep up to date')
    parser.add_argument('-s', '--snapshot',  dest='snapshot', action="store", help='Specifies the namespace in which persistent data needs to create snapshot')
    parser.add_argument('-r', '--restore-snap',  dest='restoresnap', action="store", help='Restore rbd image from snapshot')

    try:
        args = parser.parse_args(args=args)
    except IOError as msg:
        parser.error(str(msg))
    if len(sys.argv) == 1:
        parser.print_help(sys.stderr)
        sys.exit(1)
    return args

def main(arg):
    args = parse_arguments(arg)
    if args.rbd:
        backup_rbd_image(args.rbd)
    elif args.counts:
        keep_last_backup_counts(args.counts)
    elif args.snapshot:
        create_rbd_image_snap(args.snapshot)
    elif args.cephfs:
        backup_cephfs(args.cephfs)
    elif args.namespace:
        backup_cephfs(args.namespace)
        backup_rbd_image(args.namespace)
    elif args.restoresnap:
        restore_from_rbd_image_snap(args.restoresnap)

if __name__ == '__main__':
    gen_backy2_conf()
    main(sys.argv[1:])
EOF           

繼續閱讀