天天看點

mysql, mongodb, sqlite3導出資料庫

1 mysql 導出資料

導出需求以及mysql導出資料代碼:

# 需要導出的資料:image_id, url, first, second, definition, is_skip, 沒有的置換為空字元串
    # {"url": "http://114.112.107.58:8000/fashionai_attribute_images/train_round1/Images/pant_length_labels/a70477e45965920304934bbd122d5f7c.jpg",
    # "definition": "Y型", "first": "", "second": "",
    # "imgae_id": "a70477e45965920304934bbd122d5f7c"}

    # 原始資料 mysql
    # {'id': 11122132343234, 'task_id': '5c233454f71ec32e1d0716e5',
    #  'image_id': '61624acb91611eef11d8ffadd378f2bb', 'subtask_id': '5c23347df71ec32e1d0716fa',
    #  'user_id': '5c19e1c7f71ec393a62fe79b', 'updated_at': datetime.datetime(2018, 12, 26, 7, 58, 3, 57000),
    #  'definition': '長袖', 'first': None, 'is_skip': False, 'second': None, 'timestamp': 1545811083.054819,
    #  'user_worked': True}


           
# coding=utf-8

# 提前修改好标題,根據任務标題來導出相關資料
# ssh [email protected]
# pwd: xxxxxxxxxxxxx
# cd /home/home/exportjson/hello
# python3 exportdb.py
import json
import pymysql

def convert(fields):
    if isinstance(fields, bytes):
        return str(fields, encoding='utf-8')
    else:
        return fields


def get_pid(project_name):
    try:
        conn = pymysql.connect(host='127.0.0.1', user='root', passwd='root', db='annosys', port=3306, charset='utf8')
        cur = conn.cursor()
        sql = "select id from DjangoRESTImage_project where title = '{project_name}';".format(project_name=project_name)
        cur.execute(sql)
        data = cur.fetchall()
        print('fetchall()傳回的資料', data)
        cur.close()
        conn.close()
    except:
        print('MySQL connect fail...')
        return "failed"

    return data[0][0]


def TableToJson(project_name):

    pid = get_pid(project_name)
    if pid:
        try:
            # connection對象支援的方法有cursor(),commit(),rollback(),close()
            conn = pymysql.connect(host='127.0.0.1', user='root', passwd='root', db='annosys', port=3306, charset='utf8')

            # cursor對象支援的方法有execute(sql語句),fetchone(),fetchmany(size),fetchall(),rowcount,close()
            cur = conn.cursor()
            sql = "select DjangoRESTImage_image.title as title, DjangoRESTImage_image.image_src as url, confirm_tag, first_tag, second_tag, project_id, user_id from DjangoRESTImage_relation left join DjangoRESTImage_image on DjangoRESTImage_relation.image_id = DjangoRESTImage_image.id where project_id={id};".format(id=pid)
            # execute可執行資料庫查詢select和指令insert,delete,update三種指令(這三種指令需要commit()或rollback())
            cur.execute(sql)

            # 傳回的資料類型是元組類型,每個條資料元素為元組類型:(('第一條資料的字段1的值','第一條資料的字段2的值',...,'第一條資料的字段N的值'),(第二條資料),...,(第N條資料))
            data = cur.fetchall()
            # ('14653a9e6d106511b9a34b04d166e419', '跳過', None, None, 77, 12)
            print('fetchall()傳回的資料', data)

            cur.close()
            conn.close()

        except:
            print('MySQL connect fail...')
            return "failed"

        # 開始處理字段
        jsonData = []
        for line in data:
            print(line)
            dict_one = {}
            dict_one['imgae_id'] = line[0]
            dict_one['url'] = line[1]

            confirm = line[2]
            if confirm is None:
                dict_one['confirm'] = ""
            else:
                dict_one['confirm'] = convert(confirm)

            first = line[3]
            if first is None:
                dict_one['first'] = ""
            else:
                dict_one['first'] = convert(first)

            second = line[4]
            if second is None:
                dict_one['second'] = ""
            else:
                dict_one['second'] = convert(second)

            print("xxxxx", dict_one)
            jsonData.append(dict_one)

        return jsonData

    else:
        print("project id  dont exist!")
        return "error project id  dont exist!"



if __name__ == '__main__':


    project_name = "上衣廓形-20181122-10000張-01"
    jsonData = TableToJson(project_name)

    with open(file="{project_name}.json".format(project_name=project_name), mode="w", encoding="utf-8") as f1:
        for line in jsonData:
            print("per:", line)
            f1.write(json.dumps(line, ensure_ascii=False) + "\n")

    print("len jsonData:", len(jsonData))
           

  

2 mongodb 導出資料

子產品:pymongo 3.7.0 以及pymongo腳本導出代碼

1         # 正常下載下傳資料檔案
2         # filename="哈喽.json"
3         # dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
4         # directory = os.path.join(dir, "static/downloads")
5         # print(directory)
6         # return send_from_directory(directory, filename, as_attachment=True)
7         # # return send_from_directory(directory, filename.encode('utf-8').decode('utf-8'), as_attachment=True)
8           

flask 下載下傳代碼

from flask import send_file, send_from_directory
def export_file(*args, **kwargs):

    """
    
    :param args: 
    :param kwargs: 
    :return: 
    """

    # 需要導出的資料:image_id, url, first, second, definition, is_skip, 沒有的置換為空字元串
    # {"url": "http://114.112.107.58:8000/fashionai_attribute_images/train_round1/Images/pant_length_labels/a70477e45965920304934bbd122d5f7c.jpg",
    # "definition": "Y型", "first": "", "second": "",
    # "imgae_id": "a70477e45965920304934bbd122d5f7c"}

    # 原始資料 mongodb
    # {'_id': ObjectId('5c233454f71ec32e1d0716e7'), 'task_id': '5c233454f71ec32e1d0716e5',
    #  'image_id': '61624acb91611eef11d8ffadd378f2bb', 'subtask_id': '5c23347df71ec32e1d0716fa',
    #  'user_id': '5c19e1c7f71ec393a62fe79b', 'updated_at': datetime.datetime(2018, 12, 26, 7, 58, 3, 57000),
    #  'definition': '長袖', 'first': None, 'is_skip': False, 'second': None, 'timestamp': 1545811083.054819,
    #  'user_worked': True}

    client = MongoClient('mongodb://localhost:27017/')
    db = client.annosys

    task_query = {"task_id": "5c233454f71ec32e1d0716e5"}
    annos = db.image_annotation.find(task_query)

    filename = "export.json"
    dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
    filepath = os.path.join(dir, "static/downloads", filename)
    print(filepath)


    with open(filepath, "w", encoding="utf-8") as f:
        for anno in annos:
            print(anno)
            anno_item = {}

            image_id = anno.get("image_id")
            definition = anno.get("definition")
            first = anno.get("first")
            second = anno.get("second")
            is_skip = anno.get("is_skip")
            image_query = {"image_id": image_id}
            image = db.image.find_one(image_query)
            url = image.get("url")


            anno_item["image_id"] = image_id
            anno_item["url"] = url
            anno_item["is_skip"] = is_skip
            
            if definition:
                anno_item["definition"] = definition
            else:
                anno_item["definition"] = ""

            if first:
                anno_item["first"] = first
            else:
                anno_item["first"] = ""

            if second:
                anno_item["second"] = second
            else:
                anno_item["second"] = ""

            f.write(json.dumps(anno_item, ensure_ascii=False) + "\n")

    try:
        directory, filename = os.path.split(filepath)
        return send_from_directory(directory, filename, as_attachment=True)
    except Exception as e:
        return jsonify({"code": 40004, "message": "download failed!", "data": str(e)})
           

  

mongoengine 導出代碼:

def export_file_by_mongoengine(*args, **kwargs):

    """
    
    :param args: 
    :param kwargs: 
    :return: 
    """

    image_annotations = ImageAnnotation.objects.filter(task_id="5c233454f71ec32e1d0716e5")

    filename = "export.json"
    dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
    filepath = os.path.join(dir, "static/downloads", filename)
    print(filepath)


    with open(filepath, "w", encoding="utf-8") as f:
        for anno in image_annotations:
            print(anno)
            anno_item = {}

            image_id = anno.image_id
            definition = anno.definition
            first = anno.first
            second = anno.second
            is_skip = anno.is_skip

            image = Image.objects.filter(image_id=image_id).first()
            url = image.url


            anno_item["image_id"] = image_id
            anno_item["url"] = url
            anno_item["is_skip"] = is_skip

            if definition:
                anno_item["definition"] = definition
            else:
                anno_item["definition"] = ""

            if first:
                anno_item["first"] = first
            else:
                anno_item["first"] = ""

            if second:
                anno_item["second"] = second
            else:
                anno_item["second"] = ""

            f.write(json.dumps(anno_item, ensure_ascii=False) + "\n")

    try:
        directory, filename = os.path.split(filepath)
        return send_from_directory(directory, filename, as_attachment=True)
    except Exception as e:
        return jsonify({"code": 40004, "message": "download failed!", "data": str(e)})
           

  

pymongo導出mongodb腳本:

import os
import json
from pymongo import MongoClient

task_id = "5c32b74ff71ec31435f8e1c9"

client = MongoClient("mongodb://localhost:27017/")
db = client.annosys

task_query = {"task_id": task_id}
annos = db.image_annotation.find(task_query)

filename = "export_tag_{task_id}.json".format(task_id=task_id)
filepath = os.path.join("/app", filename)
# filepath = os.path.join("/users/alex/desktop/static", filename)
print(filepath)

print("start")

with open(filepath, "w", encoding="utf-8") as f:
    for anno in annos:
        print(anno)
        anno_item = {}

        image_id = anno.get("image_id")
        definition = anno.get("definition")
        first = anno.get("first")
        second = anno.get("second")
        is_skip = anno.get("is_skip")
        image_query = {"image_id": image_id}
        image = db.image.find_one(image_query)
        url = image.get("url")

        anno_item["image_id"] = image_id
        anno_item["url"] = url
        anno_item["is_skip"] = is_skip

        if definition:
            anno_item["definition"] = definition
        else:
            anno_item["definition"] = ""

        if first:
            anno_item["first"] = first
        else:
            anno_item["first"] = ""

        if second:
            anno_item["second"] = second
        else:
            anno_item["second"] = ""

        print(anno_item)
        f.write(json.dumps(anno_item, ensure_ascii=False) + "\n")

print("finished!!!!!")
           

  

mongoengine導出腳本;

def export_file_for_tag(*args, **kwargs):
    """

    :param args: 
    :param kwargs: 
    :return: 
    """
    task_id = request.args.get("task_id")

    image_annotations = ImageAnnotation.objects.filter(task_id=task_id)
    task = Task.objects.filter(id=str(task_id)).first()
    task_title = task.task_title

    filename = "export_pick_{task_id}_{task_title}.json".format(task_id=task_id, task_title=task_title)
    download_path = DOWNLOAD_PATH

    if not os.path.exists(download_path):
        os.makedirs(download_path, 0o777)

    filepath = os.path.join(download_path, filename)
    print(filepath)


    with open(filepath, "w", encoding="utf-8") as f:
        for anno in image_annotations:
            # print(anno)
            anno_item = {}

            image_id = anno.image_id
            definition = anno.definition
            first = anno.first
            second = anno.second
            is_skip = anno.is_skip

            image = Image.objects.filter(image_id=image_id).first()
            url = image.url

            anno_item["image_id"] = image_id
            anno_item["url"] = url
            anno_item["is_skip"] = is_skip

            if definition:
                anno_item["definition"] = definition
            else:
                anno_item["definition"] = ""

            if first:
                anno_item["first"] = first
            else:
                anno_item["first"] = ""

            if second:
                anno_item["second"] = second
            else:
                anno_item["second"] = ""

            f.write(json.dumps(anno_item, ensure_ascii=False) + "\n")

    try:
        directory, filename = os.path.split(filepath)
        # return send_from_directory(directory, filename, as_attachment=True) # 直接傳回
        result = send_from_directory(directory, filename, as_attachment=True)

        response = make_response(result)
        mime_type = mimetypes.guess_type(filename)[0]
        print("mime_type: ", mime_type)
        # response.headers['Content-Type'] = mime_type
        response.headers['Content-Type'] = "application/x-download; charset=utf-8"
        response.headers["Content-Disposition"] = "attachment; filename={}".format(filename.encode("utf-8").decode('utf-8'))
        return response
    except Exception as e:
        result = {"code": 40004, "message": "download failed!", "data": str(e)}
        errlog.logger.error("Exception: {result}".format(result=result))
        return jsonify(result)


def export_file_for_pick(*args, **kwargs):
    """

    :param args: 
    :param kwargs: 
    :return: 
    """
    task_id = request.args.get("task_id")
    image_picks = ImagePick.objects.filter(task_id=task_id)
    task = Task.objects.filter(id=str(task_id)).first()
    task_title = task.task_title

    filename = "export_pick_{task_id}_{task_title}.json".format(task_id=task_id, task_title=task_title)
    download_path = DOWNLOAD_PATH
    if not os.path.exists(download_path):
        os.makedirs(download_path, 0o777)

    filepath = os.path.join(download_path, filename)
    print(filepath)

    with open(filepath, "w", encoding="utf-8") as f:
        for pick in image_picks:
            pick_item = {}

            image_id = pick.image_id
            pick_tag = pick.pick_tag
            status = pick.status

            image = Image.objects.filter(image_id=image_id).first()
            url = image.url

            pick_item["image_id"] = image_id
            pick_item["url"] = url

            if pick_tag:
                pick_item["pick_tag"] = pick_tag
            else:
                pick_item["pick_tag"] = ""

            pick_item["status"] = status

            f.write(json.dumps(pick_item, ensure_ascii=False) + "\n")

    try:
        directory, filename = os.path.split(filepath)
        # return send_from_directory(directory, filename, as_attachment=True)
        result = send_from_directory(directory, filename, as_attachment=True)
        response = make_response(result)
        mime_type = mimetypes.guess_type(filename)[0]
        print("mime_type: ", mime_type)
        # response.headers['Content-Type'] = mime_type
        response.headers['Content-Type'] = "application/octet-stream";

        # response.headers['Content-Type'] = "application/x-download; charset=utf-8"

        response.headers["Content-Disposition"] = "attachment; filename={}".format(
            filename.encode("utf-8").decode('utf-8'))
        return response
    except Exception as e:
        result = {"code": 40004, "message": "download failed!", "data": str(e)}
        errlog.logger.error("Exception: {result}".format(result=result))
        return jsonify(result)
           

  

轉載于:https://www.cnblogs.com/adamans/articles/10180062.html