天天看點

問答機器人代碼封裝和對外提供接口代碼封裝和對外提供接口

代碼封裝和對外提供接口

目标

  1. 能夠完成封裝的代碼
  2. 能夠使用grpc對外提供接口
  3. 能夠使用supervisord完成服務的管理

1. 完成代碼的封裝

代碼封裝過程中,需要注意,在整個結構中,我們有很多的結算結果是dump到本地的,為了防止後續每次的重複計算。是以laod的結果,應該提前加載到内容,而不是每次調用load義詞

1.1 完成意圖識别代碼封裝

完成判斷使用者意圖的代碼,即在使用fasttext的模型,判斷使用者輸入句子的分類

import fastText
import re
from lib import jieba_cut

fc_word_mode = fastText.load_model("./classify/data/ft_classify.model")
fc_word_mode = fastText.load_model("./classify/data/ft_classify_words.model")



def is_QA(sentence_info):
    python_qs_list = [" ".join(sentence_info["cuted_sentence"])]
    result = fc_word_mode.predict(python_qs_list)
	
    python_qs_list = [" ".join(sentence_info["cuted_word_sentence"])]
    words_result = fc_word_mode.predict(python_qs_list)
    for index, (label,acc,word_label,word_acc) in enumerate(zip(*result,*words_result)):
        label = label[0]
        acc = acc[0]
        word_label = word_label[0]
        word_acc = word_acc[0]
        #以label_qa為準,如果預測結果是label_chat,則label_qa的機率=1-labele_chat
        if label == "__label__chat":
            label = "__label__QA"
            acc = 1-acc
        if word_label == "__label__chat":
            word_label = "__label__QA"
            word_acc = 1 - word_acc
        if acc>0.95 or word_acc>0.95:
            #是QA
            return True
        else:
            return False
           

1.2 完成對chatbot代碼的封裝

提供predict的接口

"""
準備閑聊的模型
"""
import pickle
from lib import jieba_cut
import numpy as np
from chatbot import Sequence2Sequence

class Chatbot:
    def __init__(self,ws_path="./chatbot/data/ws.pkl",save_path="./chatbot/model/seq2seq_chatbot.ckpt"):
        self.ws_chatbot = pickle.load(open(ws_path, "rb"))
        self.save_path = save_path
		#TODO .....


    def predict(self,s):
        """
        :param s:沒有分詞的
        :param ws:
        :param ws_words:
        :return:
        """
        #TODO ...
        return ans
           

1.3 完成對問答系統召回的封裝

"""
進行召回的方法
"""
import os
import pickle


class Recall:
    def __init__(self,topk=20):
        # 準備問答的mode等子產品
        self.topk = topk

    def predict(self,sentence):
        """
        :param sentence:
        :param debug:
        :return: [recall list],[entity]
        """
        #TODO recall
        return recall_list

    def get_answer(self,s):
        return self.QA_dict[s]

           

1.4 完成對問答排序模型的封裝

"""
深度學習排序
"""
import tensorflow as tf
import pickle
from DNN2 import SiamsesNetwork
from lib import jieba_cut


class DNNSort():
    def __init__(self):
        #使用詞語和單字兩個模型的均值作為最後的結果
        self.dnn_sort_words = DNNSortWords()
        self.dnn_sort_single_word = DNNSortSingleWord()

    def predict(self,s,c_list):
        sort1 = self.dnn_sort_words.predict(s,c_list)
        sort2 = self.dnn_sort_single_word.predict(s,c_list)
        for i in sort1:
            sort1[i] = (sort1[i]+ sort2[i])/2
        sorts = sorted(sort1.items(),key=lambda x:x[-1],reverse=True)
        return sorts[0][0],sorts[0][1]

class DNNSortWords:
    def __init__(self,ws_path="./DNN2/data/ws_80000.pkl",save_path="./DNN2/model_keras/esim_model_softmax.ckpt"):
        self.ws = pickle.load(open(ws_path, "rb"))
        self.save_path = save_path
		#TOOD ...
        
    def predict(self,s,c_list):
        """
        :param s:沒有分詞的
        :param c_list: 帶比較的清單
        :param ws:
        :param ws_words:
        :return:
        """
        #TOOD ...
        return sim_dict

class DNNSortSingleWord:
    def __init__(self,ws_path="./DNN2/data/ws_word.pkl",save_path="./DNN2/data/esim_word_model_softmax.ckpt"):
        self.ws = pickle.load(open(ws_path, "rb"))
        self.save_path = save_path
        #TOOD ...

    def predict(self,s,c_list):
        """
        :param s:沒有分詞的
        :param c_list: 帶比較的清單
        :param ws:
        :param ws_words:
        :return:
        """
		#TOOD ...
        return sim_dict
           

1.5 實作對聊天記錄的儲存

不同的使用者,連續10分鐘内的對話認為是一輪對話,如果10分還沒有下一次對話,認為該輪對話結束,如果10分鐘後開始對話,認為是下一輪對話。是要是為了儲存不同輪中的聊天主題,後續可以實作基本的對話管理。比如使用者剛問了python相關的問題,後續如果問題中不帶主體,那麼就把redis中的python作為其主體

主要實作邏輯為:

  1. 使用redis存儲使用者基本的資料
  2. 使用mongodb存儲對話記錄

具體思路如下:

  1. 根據使用者id,擷取對話id,根據對話id判斷目前的對話是否存在
  2. 如果對話id存在:
    1. 更新對話的entity,上一次對話的時間,設定對話id的過期時間
    2. 儲存資料到mongodb
  3. 如果對話id不存在:
    1. 建立使用者的基礎資訊(user_id,entity,對話時間)
    2. 把使用者的基礎資訊存入redis,同時設定對話id和過期時間
    3. 儲存資料到mongodb中
"""
擷取,更新使用者的資訊
"""
from pymongo import MongoClient
import redis
from uuid import uuid1
import time
import json

"""
### redis
{
user_id:"id",
user_background:{}
last_entity:[]
last_conversation_time:int(time):
}

userid_conversation_id:""

### monodb 存儲對話記錄
{user_id:,conversion_id:,from:user/bot,message:"",create_time,entity:[],attention:[]}
"""

HOST = "localhost"
CNVERSION_EXPERID_TIME = 60 * 10  # 10分鐘,連續10分鐘沒有通信,意味着會話結束


class MessageManager:
    def __init__(self):
        self.client = MongoClient(host=HOST)
        self.m = self.client["toutiao"]["dialogue"]
        self.r = redis.Redis(host=HOST, port=6379, db=10)

    def last_entity(self, user_id):
        """最近一次的entity"""
        return json.loads(self.r.hget(user_id, "entity"))

    def gen_conversation_id(self):
        return uuid1().hex

    def bot_message_pipeline(self, user_id, message):
        """儲存機器人的回複記錄"""
        conversation_id_key = "{}_conversion_id".format(user_id)
        conversation_id = self.user_exist(conversation_id_key)
        if conversation_id:
            # 更新conversation_id的過期時間
            self.r.expire(conversation_id_key, CNVERSION_EXPERID_TIME)
            data = {"user_id": user_id,
                    "conversation_id": conversation_id,
                    "from": "bot",
                    "message": message,
                    "create_time": int(time.time()),
                    }
            self.m.save(data)

        else:
            raise ValueError("沒有會話id,但是機器人嘗試回複....")

    def user_message_pipeline(self, user_id, message, create_time, attention, entity=[]):
        # 确定使用者相關的資訊
        # 1. 使用者是否存在
        # 2.1 使用者存在,傳回使用者的最近的entity,存入最近的對話
        # 3.1 判斷是否為新的對話,如果是新對話,開啟新的回話,update使用者的對話資訊
        # 3.2 如果不是新的對話,update使用者的對話資訊
        # 3. 更新使用者的基本資訊
        # 4  傳回使用者相關資訊
        # 5. 調用預測接口,發來對話的結構

        # 要儲存的data資料,缺少conversation_id
        data = {
            "user_id": user_id,
            "from": "user",
            "message": message,
            "create_time": create_time,
            "entity": json.dumps(entity),
            "attention": attention,
        }

        conversation_id_key = "{}_conversion_id".format(user_id)
        conversation_id = self.user_exist(conversation_id_key)
        print("conversation_id",conversation_id)
        if conversation_id:
            if entity:
                # 更新目前使用者的 last_entity
                self.r.hset(user_id, "last_entity", json.dumps(entity))
            # 更新最後的對話時間
            self.r.hset(user_id, "last_conversion_time", create_time)
            # 設定conversation id的過期時間
            self.r.expire(conversation_id_key, CNVERSION_EXPERID_TIME)

            # 儲存聊天記錄到mongodb中
            data["conversation_id"] = conversation_id

            self.m.save(data)
            print("mongodb 儲存資料成功")

        else:
            # 不存在
            user_basic_info = {
                "user_id": user_id,
                "last_conversion_time": create_time,
                "last_entity": json.dumps(entity)
            }
            self.r.hmset(user_id, user_basic_info)
            print("redis存入 user_basic_info success")
            conversation_id = self.gen_conversation_id()
            print("生成conversation_id",conversation_id)

            # 設定會話的id
            self.r.set(conversation_id_key, conversation_id, ex=CNVERSION_EXPERID_TIME)
            # 儲存聊天記錄到mongodb中
            data["conversation_id"] = conversation_id
            self.m.save(data)
            print("mongodb 儲存資料成功")


    def user_exist(self, conversation_id_key):
        """
        判斷使用者是否存在
        :param user_id:使用者id
        :return:
        """
        conversation_id = self.r.get(conversation_id_key)
        if conversation_id:
            conversation_id = conversation_id.decode()
        print("load conversation_id",conversation_id)
        return conversation_id

           

2. 使用GRPC對外提供服務

2.1 安裝grpc相關環境

gRPC 的安裝:`pip install grpcio`
安裝 ProtoBuf 相關的 python 依賴庫:`pip install protobuf`
安裝 python grpc 的 protobuf 編譯工具:`pip install grpcio-tools`
           

2.2 定義GRPC的接口

//chatbot.proto 檔案
syntax = "proto3";

message ReceivedMessage {
    string user_id = 1; //使用者id
    string user_message = 2; //目前使用者傳遞的消息
    int32 create_time = 3; //目前消息發送的時間
}

message ResponsedMessage {
    string user_response = 1; //傳回給使用者的消息
    int32 create_time = 2; //傳回給使用者的時間
}

service ChatBotService {
  rpc Chatbot (ReceivedMessage) returns (ResponsedMessage);
}
           

2.3 編譯生成protobuf檔案

使用下面的指令編譯,得到

chatbot_pb2.py

chatbot_pb2_grpc.py

檔案

python -m grpc_tools.protoc -I. –python_out=. –grpc_python_out=. ./chatbot.proto
           

2.4 使用grpc提供服務

import dialogue
from classify import is_QA
from dialogue.process_sentence import process_user_sentence

from chatbot_grpc import chatbot_pb2_grpc
from chatbot_grpc import chatbot_pb2
import time



class chatServicer(chatbot_pb2_grpc.ChatBotServiceServicer):

    def __init__(self):
        #提前加載各種模型
        self.recall = dialogue.Recall(topk=20)
        self.dnnsort = dialogue.DNNSort()
        self.chatbot = dialogue.Chatbot()
        self.message_manager = dialogue.MessageManager()

    def Chatbot(self, request, context):
        user_id = request.user_id
        message = request.user_message
        create_time = request.create_time
        #對使用者的輸出進行基礎的處理,如分詞
        message_info = process_user_sentence(message)
        if is_QA(message_info):
            attention = "QA"
            #實作對對話資料的儲存
            self.message_manager.user_message_pipeline(user_id, message, create_time, attention, entity=message_info["entity"])
            recall_list,entity = self.recall.predict(message_info)
            line, score = self.dnnsort.predict(message,recall_list)
            if score > 0.7:
                ans = self.recall.get_answer(line)
                user_response = ans["ans"]

            else:
                user_response = "不好意思,這個問題我還沒學習到..."
        else:
            attention = "chat"
            # 實作對對話資料的儲存
            self.message_manager.user_message_pipeline(user_id,message,create_time,attention,entity=message_info["entity"])
            user_response = self.chatbot.predict(message)

        self.message_manager.bot_message_pipeline(user_id,user_response)

        user_response = user_response
        create_time = int(time.time())
        return chatbot_pb2.ResponsedMessage(user_response=user_response,create_time=create_time)

def serve():
    import grpc
    from concurrent import futures
    # 多線程伺服器
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    # 注冊本地服務
    chatbot_pb2_grpc.add_ChatBotServiceServicer_to_server(chatServicer(), server)
    # 監聽端口
    server.add_insecure_port("[::]:9999")
    # 開始接收請求進行服務
    server.start()
    # 使用 ctrl+c 可以退出服務
    try:
        time.sleep(1000)
    except KeyboardInterrupt:
        server.stop(0)


if __name__ == '__main__':
    serve()
           

3. 使用supervisor完成對服務的管理

3.1 編寫簡單的執行腳本

#!/bin/bash

cd `$dirname`|exit 0
#source activate ds
python grpc_predict.py
           

添加可執行權限:

chmod +x 檔案名

3.2 安裝、配置supervisor

supervisor現在的官方版本還是python2的,但是可以使用下面的指令安裝python3版本

pip3 install git+https://github.com/Supervisor/supervisor    
           
  1. 完成supervisor的配置檔案的編寫,conf中使用分号作為注釋符号
    ;conf.d
    [program:chat_service]
    
    command=/root/chat_service/run.sh  ;執行的指令
    
    stdout_logfile=/root/chat_service/log/out.log ;log的位置
    
    stderr_logfile=/root/chat_service/log/error.log  ;錯誤log的位置
    
    directory=/root/chat_service  ;路徑
    
    autostart=true  ;是否自動啟動
    
    autorestart=true  ;是否自動重新開機
    
    startretries=10 ;失敗的最大嘗試次數
               
  2. 在supervisor的基礎配置中添加上述配置檔案
    ;/etc/supervisord/supervisor.conf 
    [include]
    files=/root/chat_service/conf.d
               
  3. 運作supervisord
    supervisord -c /etc/supervisord/supervisor.conf
               

繼續閱讀