天天看点

python笔记11--pymongo 使用案例1 介绍2 mongo 基础命令3 pymongo 基础用法4 注意事项5 说明

python笔记11--pymongo 使用案例

  • 1 介绍
  • 2 mongo 基础命令
    • 2.1 docker 安装mongo
    • 2.2 mongo 常见命令
  • 3 pymongo 基础用法
  • 4 注意事项
  • 5 说明

1 介绍

  最近由于工作中需要使用 mongo,因此边看边完成一些基本功能;趁着这个机会把 mongo 中的常用命令、pymongo 中的常见用法,以及相关的注意事项都梳理一下,以便于后续查阅。当然,后续使用到的新知识点和注意事项也会持续更新在这里。

2 mongo 基础命令

2.1 docker 安装mongo

本案例主要介绍mongo基础命令使用和 pymongo api,因此暂未配置认证和分布式集群。

1. 安装 mongodb
docker run -d --name=mongo-4.0.9 -p 27017:27017 mongo:4.0.9
2. 进入容器中使用 mongo
docker exec -it mongo-4.0.9 bash
3. 使用 mongo 命令行工具 
[email protected]:/# mongo
           

2.2 mongo 常见命令

1. 查看所有的db
show dbs

2. 查看所有的集合
show collections

3. 查询所有记录
db.collection.find()
db.cluster.find()

4. 删除一条记录
db.collection.deleteOne({json info})
db.cluster.deleteOne({"_id": ObjectId("60331fa6d9cecd828cd14c33")})

5. 删除多条记录
db.collection.deleteMany({json info})
db.cluster.deleteMany({az:"th"})

6. 删除集合
db.collection.drop()
db.service.drop()

7. 查看所有用户
use admin
db.system.users.find().pretty()
查看当前db的用户直接show users即可
           

3 pymongo 基础用法

以下通过 mongo_helper.py 和 flask_server.py 来介绍pymongo 的基础用法,mongo_helper 封装来多个mongo操作,flask_server 提供来一个简单的web服务,其返回即为mongo的内容。

1 mongo_helper.py 源码

#!/usr/bin/env python
# -*- coding:utf-8 -*-

from pymongo import MongoClient
from bson.json_util import dumps


class MongoHelper:
    def __init__(self, url, port):
        self.url = url
        self.port = port
        self.client = MongoClient('mongodb://{0}:{1}/'.format(self.url, self.port))

    def getDbInfo(self, dbname):
        db_list = self.client.list_database_names()
        print(db_list)
        if dbname not in db_list:
            print("{} is not in db".format(dbname))
            db = self.client[dbname]
            col = db["cluster"]
            post = {
                "az": "th",
                "cluster": "gz0001",
                "region": "gz",
                "auth_region": "gz_gz0001",
                "project": "pj01"
            }
            data_id = col.insert_one(post)
            print(data_id.inserted_id)
            print("db_list: {}, collection_list: {}".format(self.client.list_database_names(), self.client[dbname].list_collection_names()))
            print(db.command("db.stats()"))
            print("db.status: {}".format(self.client[dbname].command("dbstats")))
            print("db.cluster.status: {}".format(self.client[dbname].command("collstats", "cluster")))
        else:
            print("{} is in db".format(dbname))
            print("db_list: {}, collection_list: {}".format(self.client.list_database_names(), self.client[dbname].list_collection_names()))
            print("db.status: {}".format(self.client[dbname].command("dbstats")))
            print("db.cluster.status: {}".format(self.client[dbname].command("collstats", "cluster")))

    def getItem(self, dbname, collectionName, body={}):
        # print("func: get item")
        db = self.client[dbname]
        col = db[collectionName]
        data_dict = col.find_one(body)
        # data_dict = col.find(body)
        # for doc in data_dict:
        #     print(type(doc), dumps(doc))
        # print('qqq', type(data_dict), dumps(data_dict), len(data_dict))
        return data_dict

    def getAllItems(self, dbname, collectionName, body={}):
        # print("func: get item")
        db = self.client[dbname]
        col = db[collectionName]
        data_dict = col.find(body)
        return data_dict

    def judgeItemExist(self, dbname, collectionName, body={}):
        # print("func: judge item exist")
        ret = self.getItem(dbname, collectionName, body)
        if ret is None:
            return False
        else:
            return True

    def insertItem(self, dbname, collectionName, body={}):
        # print("func: insert item")
        if self.judgeItemExist(dbname, collectionName, body):
            print("insert, item exists, please do not repeat")
            return None
        else:
            db = self.client[dbname]
            col = db[collectionName]
            post = body
            data_id = col.insert_one(post)
            if data_id is not None:
                print('insert item success')
            else:
                print('insert item failed')
            return data_id


def getClusterBody(cluster, project):
    '''
    用于生成 查询集群信息的 body
    :param cluster: gz001
    :param project: apps
    :return:
    '''
    body = {
        "project": project,
        "cluster": cluster,
    }
    return body


def getServiceBody(cluster, project, namespace, service_name):
    '''
    用于生成 查询服务信息的 body
    :param cluster: gz001
    :param project: apps
    :param namespace: test-online
    :param service_name: test
    :return: {json body}
    '''
    body = {
        "project": project,
        "cluster": cluster,
        "namespace": namespace,
        "service_name": service_name
    }
    return body


def generateClusterItem(az, cluster, region, auth_region, project):
    '''
    用于生成 插入集群信息的 body
    :param az: th
    :param cluster: gz001
    :param region: gz
    :param auth_region: gz-gz001_sculptor
    :param project: apps
    :return: {json body}
    '''
    body = {
        "az": az,
        "cluster": cluster,
        "region": region,
        "auth_region": auth_region,
        "project": project
    }
    return body


def generateServiceItem(cluster, namespace, project, service_name, service_port, lb_name, lb_port):
    '''
    用于生成 插入服务信息的 body
    :param cluster: gz001
    :param namespace: test-online
    :param project: apps
    :param service_name: test
    :param service_port: 8080
    :param lb_name: lbc-apps-test-flask
    :param lb_port: 80
    :return: {json body}
    '''
    body = {
        "cluster": cluster,
        "namespace": namespace,
        "project": project,
        "service_name": service_name,
        "service_port": service_port,
        "lb_name": lb_name,
        "lb_port": lb_port
    }
    return body


if __name__ == "__main__":
    mongo = MongoHelper('127.0.0.1', '27017')
    # mongo.getDbInfo("apps")

    cluster_item = generateClusterItem('gz', 'gz001', 'gz', 'gz-gz001', 'apps')
    print('insert item: {}'.format(mongo.insertItem('apps', 'cluster', cluster_item)))
    cluster_body = getClusterBody("gz001", "apps")
    print('find item: {}'.format(dumps(mongo.getItem('apps', 'cluster', cluster_body))))

    service_item = generateServiceItem('gz001', "test-online", "apps", "test-flask", 5000, "lbc-apps-test-flask", 80)
    print('insert item: {}'.format(mongo.insertItem('apps', 'service', service_item)))
    service_body = getServiceBody('gz001', "apps", "test-online", "test-flask")
    print('find item: {}'.format(dumps(mongo.getItem('apps', 'service', service_body))))

    print('find all item: {}'.format(dumps(mongo.getAllItems('apps', 'service', {}))))
           

输出:

insert item success
insert item: <pymongo.results.InsertOneResult object at 0x1056c2440>
find item: {"az": "gz", "region": "gz", "project": "apps", "cluster": "gz001", "auth_region": "gz-gz001", "_id": {"$oid": "603a3fb5891644022c7a7d0e"}}
insert item success
insert item: <pymongo.results.InsertOneResult object at 0x1056c2440>
find item: {"service_port": 5000, "lb_name": "lbc-apps-test-flask", "namespace": "test-online", "project": "apps", "cluster": "gz001", "service_name": "test-flask", "lb_port": 80, "_id": {"$oid": "603a3fb5891644022c7a7d0f"}}
find all item: [{"service_port": 5000, "lb_name": "lbc-apps-test-flask", "namespace": "test-online", "project": "apps", "cluster": "gz001", "service_name": "test-flask", "lb_port": 80, "_id": {"$oid": "603a3fb5891644022c7a7d0f"}}]
           

2 flask_server.py 源码

from flask import Flask
from mongo_helper import MongoHelper, getServiceBody, getClusterBody
from bson.json_util import dumps

url = '127.0.0.1'
port = '27017'
app = Flask(__name__)


@app.route('/')
def hello_root():
    print('Hello Root!')
    return 'Hello Root!\n More, please use /help'


@app.route('/help')
def help():
    print('usage!')
    return {
        "usage": {
            "help": "/help",
            "cluster": "/api/cluster_info/<cluster> ,gz001",
            "service": "/api/service_info/<cluster>/<namespace>/<service_name> ,gz001/test-online/test-flask",
            "all_cluster": "/api/cluster_info",
            "all_service": "/api/service_info"
        }
    }


@app.route('/api/cluster_info/<cluster>')
def cluster_info(cluster):
    print('Hello cluster_info!')
    mongo = MongoHelper(url, port)
    findBody = getClusterBody(cluster, 'apps')
    clusterInfo = dumps(mongo.getItem('apps', 'cluster', findBody))
    return clusterInfo


@app.route('/api/cluster_info')
def all_cluster_info():
    print('Hello all_cluster_info!')
    mongo = MongoHelper(url, port)
    clusterInfo = dumps(mongo.getAllItems('apps', 'cluster', {}))
    return clusterInfo


@app.route('/api/service_info/<cluster>/<namespace>/<service_name>')
def service_info(cluster, namespace, service_name):
    print('Hello service_info!')
    mongo = MongoHelper(url, port)
    findBody = getServiceBody(cluster, 'apps', namespace, service_name)
    serviceInfo = dumps(mongo.getItem('apps', 'service', findBody))
    return serviceInfo


@app.route('/api/service_info')
def all_service_info():
    print('Hello all_service_info!')
    mongo = MongoHelper(url, port)
    clusterInfo = dumps(mongo.getAllItems('apps', 'service', {}))
    return clusterInfo


if __name__ == '__main__':
    app.run(host='0.0.0.0', port=8081)
    '''
    test case:
    1)  http://127.0.0.1:8080/api/cluster_info/gz001
    2)  http://127.0.0.1:8080/api/service_info/gz001/test-online/test-flask
    '''
           

输出:

http://127.0.0.1:8081/help
{
	"usage": {
		"all_cluster": "/api/cluster_info",
		"all_service": "/api/service_info",
		"cluster": "/api/cluster_info/<cluster> ,gz001",
		"help": "/help",
		"service": "/api/service_info/<cluster>/<namespace>/<service_name> ,gz001/test-online/test-flask"
	}
}
           

4 注意事项

  1. bson 报错解决方法
    笔者先安装pymongo,然后再安装bson,代码中引用 dumps功能:from bson.json_util import dumps, 执行时候报如下错误:
    ImportError: cannot import name 'abc' from 'bson.py3compat' (/usr/local/lib/python3.7/dist-packages/bson/py3compat.py)
    解决方法:
    卸载掉 bson 和 pymongo,重新先安装 bson,再安装 pymongo 即可恢复正常
               

5 说明

  1. 软件环境

    笔者直接使用 docker 起一个测试 mongo:4.0.9,

  2. 参考文档

    1 mongo官方 --manual/reference/method

    2 mongo官方 --manual/reference/command

    3 pymongo 常见api

    4 flask-doc.readthedocs

    5 Flask系列03–Flask的路由 app.route中的参数, 动态参数路由