1 mysql 導出資料
導出需求以及mysql導出資料代碼:
# 需要導出的資料:image_id, url, first, second, definition, is_skip, 沒有的置換為空字元串
# {"url": "http://114.112.107.58:8000/fashionai_attribute_images/train_round1/Images/pant_length_labels/a70477e45965920304934bbd122d5f7c.jpg",
# "definition": "Y型", "first": "", "second": "",
# "imgae_id": "a70477e45965920304934bbd122d5f7c"}
# 原始資料 mysql
# {'id': 11122132343234, 'task_id': '5c233454f71ec32e1d0716e5',
# 'image_id': '61624acb91611eef11d8ffadd378f2bb', 'subtask_id': '5c23347df71ec32e1d0716fa',
# 'user_id': '5c19e1c7f71ec393a62fe79b', 'updated_at': datetime.datetime(2018, 12, 26, 7, 58, 3, 57000),
# 'definition': '長袖', 'first': None, 'is_skip': False, 'second': None, 'timestamp': 1545811083.054819,
# 'user_worked': True}
# coding=utf-8
# 提前修改好标題,根據任務标題來導出相關資料
# ssh [email protected]
# pwd: xxxxxxxxxxxxx
# cd /home/home/exportjson/hello
# python3 exportdb.py
import json
import pymysql
def convert(fields):
if isinstance(fields, bytes):
return str(fields, encoding='utf-8')
else:
return fields
def get_pid(project_name):
try:
conn = pymysql.connect(host='127.0.0.1', user='root', passwd='root', db='annosys', port=3306, charset='utf8')
cur = conn.cursor()
sql = "select id from DjangoRESTImage_project where title = '{project_name}';".format(project_name=project_name)
cur.execute(sql)
data = cur.fetchall()
print('fetchall()傳回的資料', data)
cur.close()
conn.close()
except:
print('MySQL connect fail...')
return "failed"
return data[0][0]
def TableToJson(project_name):
pid = get_pid(project_name)
if pid:
try:
# connection對象支援的方法有cursor(),commit(),rollback(),close()
conn = pymysql.connect(host='127.0.0.1', user='root', passwd='root', db='annosys', port=3306, charset='utf8')
# cursor對象支援的方法有execute(sql語句),fetchone(),fetchmany(size),fetchall(),rowcount,close()
cur = conn.cursor()
sql = "select DjangoRESTImage_image.title as title, DjangoRESTImage_image.image_src as url, confirm_tag, first_tag, second_tag, project_id, user_id from DjangoRESTImage_relation left join DjangoRESTImage_image on DjangoRESTImage_relation.image_id = DjangoRESTImage_image.id where project_id={id};".format(id=pid)
# execute可執行資料庫查詢select和指令insert,delete,update三種指令(這三種指令需要commit()或rollback())
cur.execute(sql)
# 傳回的資料類型是元組類型,每個條資料元素為元組類型:(('第一條資料的字段1的值','第一條資料的字段2的值',...,'第一條資料的字段N的值'),(第二條資料),...,(第N條資料))
data = cur.fetchall()
# ('14653a9e6d106511b9a34b04d166e419', '跳過', None, None, 77, 12)
print('fetchall()傳回的資料', data)
cur.close()
conn.close()
except:
print('MySQL connect fail...')
return "failed"
# 開始處理字段
jsonData = []
for line in data:
print(line)
dict_one = {}
dict_one['imgae_id'] = line[0]
dict_one['url'] = line[1]
confirm = line[2]
if confirm is None:
dict_one['confirm'] = ""
else:
dict_one['confirm'] = convert(confirm)
first = line[3]
if first is None:
dict_one['first'] = ""
else:
dict_one['first'] = convert(first)
second = line[4]
if second is None:
dict_one['second'] = ""
else:
dict_one['second'] = convert(second)
print("xxxxx", dict_one)
jsonData.append(dict_one)
return jsonData
else:
print("project id dont exist!")
return "error project id dont exist!"
if __name__ == '__main__':
project_name = "上衣廓形-20181122-10000張-01"
jsonData = TableToJson(project_name)
with open(file="{project_name}.json".format(project_name=project_name), mode="w", encoding="utf-8") as f1:
for line in jsonData:
print("per:", line)
f1.write(json.dumps(line, ensure_ascii=False) + "\n")
print("len jsonData:", len(jsonData))
2 mongodb 導出資料
子產品:pymongo 3.7.0 以及pymongo腳本導出代碼
1 # 正常下載下傳資料檔案
2 # filename="哈喽.json"
3 # dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
4 # directory = os.path.join(dir, "static/downloads")
5 # print(directory)
6 # return send_from_directory(directory, filename, as_attachment=True)
7 # # return send_from_directory(directory, filename.encode('utf-8').decode('utf-8'), as_attachment=True)
8
flask 下載下傳代碼
from flask import send_file, send_from_directory
def export_file(*args, **kwargs):
"""
:param args:
:param kwargs:
:return:
"""
# 需要導出的資料:image_id, url, first, second, definition, is_skip, 沒有的置換為空字元串
# {"url": "http://114.112.107.58:8000/fashionai_attribute_images/train_round1/Images/pant_length_labels/a70477e45965920304934bbd122d5f7c.jpg",
# "definition": "Y型", "first": "", "second": "",
# "imgae_id": "a70477e45965920304934bbd122d5f7c"}
# 原始資料 mongodb
# {'_id': ObjectId('5c233454f71ec32e1d0716e7'), 'task_id': '5c233454f71ec32e1d0716e5',
# 'image_id': '61624acb91611eef11d8ffadd378f2bb', 'subtask_id': '5c23347df71ec32e1d0716fa',
# 'user_id': '5c19e1c7f71ec393a62fe79b', 'updated_at': datetime.datetime(2018, 12, 26, 7, 58, 3, 57000),
# 'definition': '長袖', 'first': None, 'is_skip': False, 'second': None, 'timestamp': 1545811083.054819,
# 'user_worked': True}
client = MongoClient('mongodb://localhost:27017/')
db = client.annosys
task_query = {"task_id": "5c233454f71ec32e1d0716e5"}
annos = db.image_annotation.find(task_query)
filename = "export.json"
dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
filepath = os.path.join(dir, "static/downloads", filename)
print(filepath)
with open(filepath, "w", encoding="utf-8") as f:
for anno in annos:
print(anno)
anno_item = {}
image_id = anno.get("image_id")
definition = anno.get("definition")
first = anno.get("first")
second = anno.get("second")
is_skip = anno.get("is_skip")
image_query = {"image_id": image_id}
image = db.image.find_one(image_query)
url = image.get("url")
anno_item["image_id"] = image_id
anno_item["url"] = url
anno_item["is_skip"] = is_skip
if definition:
anno_item["definition"] = definition
else:
anno_item["definition"] = ""
if first:
anno_item["first"] = first
else:
anno_item["first"] = ""
if second:
anno_item["second"] = second
else:
anno_item["second"] = ""
f.write(json.dumps(anno_item, ensure_ascii=False) + "\n")
try:
directory, filename = os.path.split(filepath)
return send_from_directory(directory, filename, as_attachment=True)
except Exception as e:
return jsonify({"code": 40004, "message": "download failed!", "data": str(e)})
mongoengine 導出代碼:
def export_file_by_mongoengine(*args, **kwargs):
"""
:param args:
:param kwargs:
:return:
"""
image_annotations = ImageAnnotation.objects.filter(task_id="5c233454f71ec32e1d0716e5")
filename = "export.json"
dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
filepath = os.path.join(dir, "static/downloads", filename)
print(filepath)
with open(filepath, "w", encoding="utf-8") as f:
for anno in image_annotations:
print(anno)
anno_item = {}
image_id = anno.image_id
definition = anno.definition
first = anno.first
second = anno.second
is_skip = anno.is_skip
image = Image.objects.filter(image_id=image_id).first()
url = image.url
anno_item["image_id"] = image_id
anno_item["url"] = url
anno_item["is_skip"] = is_skip
if definition:
anno_item["definition"] = definition
else:
anno_item["definition"] = ""
if first:
anno_item["first"] = first
else:
anno_item["first"] = ""
if second:
anno_item["second"] = second
else:
anno_item["second"] = ""
f.write(json.dumps(anno_item, ensure_ascii=False) + "\n")
try:
directory, filename = os.path.split(filepath)
return send_from_directory(directory, filename, as_attachment=True)
except Exception as e:
return jsonify({"code": 40004, "message": "download failed!", "data": str(e)})
pymongo導出mongodb腳本:
import os
import json
from pymongo import MongoClient
task_id = "5c32b74ff71ec31435f8e1c9"
client = MongoClient("mongodb://localhost:27017/")
db = client.annosys
task_query = {"task_id": task_id}
annos = db.image_annotation.find(task_query)
filename = "export_tag_{task_id}.json".format(task_id=task_id)
filepath = os.path.join("/app", filename)
# filepath = os.path.join("/users/alex/desktop/static", filename)
print(filepath)
print("start")
with open(filepath, "w", encoding="utf-8") as f:
for anno in annos:
print(anno)
anno_item = {}
image_id = anno.get("image_id")
definition = anno.get("definition")
first = anno.get("first")
second = anno.get("second")
is_skip = anno.get("is_skip")
image_query = {"image_id": image_id}
image = db.image.find_one(image_query)
url = image.get("url")
anno_item["image_id"] = image_id
anno_item["url"] = url
anno_item["is_skip"] = is_skip
if definition:
anno_item["definition"] = definition
else:
anno_item["definition"] = ""
if first:
anno_item["first"] = first
else:
anno_item["first"] = ""
if second:
anno_item["second"] = second
else:
anno_item["second"] = ""
print(anno_item)
f.write(json.dumps(anno_item, ensure_ascii=False) + "\n")
print("finished!!!!!")
mongoengine導出腳本;
def export_file_for_tag(*args, **kwargs):
"""
:param args:
:param kwargs:
:return:
"""
task_id = request.args.get("task_id")
image_annotations = ImageAnnotation.objects.filter(task_id=task_id)
task = Task.objects.filter(id=str(task_id)).first()
task_title = task.task_title
filename = "export_pick_{task_id}_{task_title}.json".format(task_id=task_id, task_title=task_title)
download_path = DOWNLOAD_PATH
if not os.path.exists(download_path):
os.makedirs(download_path, 0o777)
filepath = os.path.join(download_path, filename)
print(filepath)
with open(filepath, "w", encoding="utf-8") as f:
for anno in image_annotations:
# print(anno)
anno_item = {}
image_id = anno.image_id
definition = anno.definition
first = anno.first
second = anno.second
is_skip = anno.is_skip
image = Image.objects.filter(image_id=image_id).first()
url = image.url
anno_item["image_id"] = image_id
anno_item["url"] = url
anno_item["is_skip"] = is_skip
if definition:
anno_item["definition"] = definition
else:
anno_item["definition"] = ""
if first:
anno_item["first"] = first
else:
anno_item["first"] = ""
if second:
anno_item["second"] = second
else:
anno_item["second"] = ""
f.write(json.dumps(anno_item, ensure_ascii=False) + "\n")
try:
directory, filename = os.path.split(filepath)
# return send_from_directory(directory, filename, as_attachment=True) # 直接傳回
result = send_from_directory(directory, filename, as_attachment=True)
response = make_response(result)
mime_type = mimetypes.guess_type(filename)[0]
print("mime_type: ", mime_type)
# response.headers['Content-Type'] = mime_type
response.headers['Content-Type'] = "application/x-download; charset=utf-8"
response.headers["Content-Disposition"] = "attachment; filename={}".format(filename.encode("utf-8").decode('utf-8'))
return response
except Exception as e:
result = {"code": 40004, "message": "download failed!", "data": str(e)}
errlog.logger.error("Exception: {result}".format(result=result))
return jsonify(result)
def export_file_for_pick(*args, **kwargs):
"""
:param args:
:param kwargs:
:return:
"""
task_id = request.args.get("task_id")
image_picks = ImagePick.objects.filter(task_id=task_id)
task = Task.objects.filter(id=str(task_id)).first()
task_title = task.task_title
filename = "export_pick_{task_id}_{task_title}.json".format(task_id=task_id, task_title=task_title)
download_path = DOWNLOAD_PATH
if not os.path.exists(download_path):
os.makedirs(download_path, 0o777)
filepath = os.path.join(download_path, filename)
print(filepath)
with open(filepath, "w", encoding="utf-8") as f:
for pick in image_picks:
pick_item = {}
image_id = pick.image_id
pick_tag = pick.pick_tag
status = pick.status
image = Image.objects.filter(image_id=image_id).first()
url = image.url
pick_item["image_id"] = image_id
pick_item["url"] = url
if pick_tag:
pick_item["pick_tag"] = pick_tag
else:
pick_item["pick_tag"] = ""
pick_item["status"] = status
f.write(json.dumps(pick_item, ensure_ascii=False) + "\n")
try:
directory, filename = os.path.split(filepath)
# return send_from_directory(directory, filename, as_attachment=True)
result = send_from_directory(directory, filename, as_attachment=True)
response = make_response(result)
mime_type = mimetypes.guess_type(filename)[0]
print("mime_type: ", mime_type)
# response.headers['Content-Type'] = mime_type
response.headers['Content-Type'] = "application/octet-stream";
# response.headers['Content-Type'] = "application/x-download; charset=utf-8"
response.headers["Content-Disposition"] = "attachment; filename={}".format(
filename.encode("utf-8").decode('utf-8'))
return response
except Exception as e:
result = {"code": 40004, "message": "download failed!", "data": str(e)}
errlog.logger.error("Exception: {result}".format(result=result))
return jsonify(result)
轉載于:https://www.cnblogs.com/adamans/articles/10180062.html