天天看點

用騰訊雲函數做 openai 代理,python 的實作方式

作者:哈哈老師的日記
用騰訊雲函數做 openai 代理,python 的實作方式

對于一個程式猿來說,如果對八苦排序,求不得苦應該排第一

雲函數配置

用雲函數不是好方案,但絕對是最簡單的方案。

這是騰訊雲函數的通路位址:

https://console.cloud.tencent.com/scf/list?rid=18&ns=default

配置步驟:

  • 選模版建立
  • Flask 架構模版
  • 按預設走,一直到部署完成
  • 在函數配置裡,将逾時設定為 60 秒 [蠟燭]
  • 在函數代碼裡,修改 app.py (将下面貼出來的代碼直接覆寫即可)
  • 再部署,搞定
  • 部署完成後,下面有生成的通路路徑,那個就是用戶端要通路的位址

網上已經有不少講其他語言如何做的了,我這兒貼一個 python 的簡單實作,支援 SSE。

用騰訊雲函數做 openai 代理,python 的實作方式

app.py

python 代理程式

from flask import Flask, request, abort, Response
import requests
import os
import json

IS_SERVERLESS = bool(os.environ.get('SERVERLESS'))
# print(IS_SERVERLESS)

app = Flask(__name__)

timeout = 60  # 請求逾時時間(秒)

@app.route('/', defaults={'path': ''})
@app.route('/<path:path>', methods=['GET', 'POST', 'PUT', 'DELETE'])
def proxy(path):
    url = f"https://api.openai.com/{path}"

    token = request.headers.get('Authorization', None)
    if token is None:
        abort(403,'No Token')

    openai_key = token.split(' ')[1]
    if openai_key is None:
        abort(403,'No API Key')

    options = {
        'headers': {
            'Content-Type': 'application/json; charset=utf-8',
            'Authorization': f"Bearer {openai_key}"
        }
    }

    # 如果請求資料類型是json
    if request.is_json:
        options['json'] = request.json
    else:
        options['data'] = request.get_data()

    try:
        # 如果是 chat completion 和 text completion 請求,使用 SSE
        if options['json'].get('stream',False):
            response = requests.request(request.method,url,**options, timeout=timeout, stream=True)
            if response.ok:
                def generate():
                    for line in response.iter_lines():
                        if line:
                            yield f"{line.decode('utf-8')}\n\n"
                    yield "data: [DONE]\n\n"

                return Response(generate(), mimetype='text/event-stream')
        else:
            response = requests.request(request.method,url,**options, timeout=timeout)
            return Response(response.content, content_type=response.headers['Content-Type'])

    except requests.exceptions.RequestException as e:
        print(e)
        return Response(json.dumps({"error": str(e)}), status=500, content_type='application/json')

# 啟動服務,監聽 9000 端口,監聽位址為 0.0.0.0
app.run(debug=IS_SERVERLESS != True, port=9000, host='0.0.0.0',threaded=True)
           

幾個注意事項:

  • 據傳,有用香港伺服器後 Key 被拒,是以小心點,選個東京或者首爾
  • 兒童節之後就沒免費午餐了
  • 為了支援 SSE,用戶端就别用 openai 的官方庫通路了
  • 不是長久之計,用來做測試還是可以的

python 用戶端

api 調用參考源碼:

https://github.com/acheong08/ChatGPT/blob/main/src/revChatGPT/V3.py

我為了調試友善,将代碼拷貝過來改了一些内容,不記得改了什麼,這裡貼出我改過的,用這個代碼就不用裝官方庫(openai)了,直接是通過 requests 通路的,主要是支援 SSE 模式。

官方api 替代(支援SSE):chatapi.py

import argparse
import json
import os
import sys
from typing import NoReturn

import requests
import tiktoken

from utils import create_completer
from utils import create_session
from utils import get_input

ENGINE = os.environ.get("GPT_ENGINE") or "gpt-3.5-turbo"


class Chatbot:
    """
    Official ChatGPT API
    """

    def __init__(
        self,
        api_key: str,
        engine: str = None,
        proxy: str = None,
        max_tokens: int = 3000,
        temperature: float = 0.5,
        top_p: float = 1.0,
        presence_penalty: float = 0.0,
        frequency_penalty: float = 0.0,
        reply_count: int = 1,
        system_prompt: str = "You are ChatGPT, a large language model trained by OpenAI. Respond conversationally",
        api_base = 'https://api.openai.com/v1/chat/completions'
    ) -> None:
        """
        Initialize Chatbot with API key (from https://platform.openai.com/account/api-keys)
        """
        self.engine = engine or ENGINE
        self.session = requests.Session()
        self.api_key = api_key
        self.proxy = proxy
        if self.proxy:
            proxies = {
                "http": self.proxy,
                "https": self.proxy,
            }
            self.session.proxies = proxies
        self.conversation: dict = {
            "default": [
                {
                    "role": "system",
                    "content": system_prompt,
                },
            ],
        }
        self.system_prompt = system_prompt
        self.max_tokens = max_tokens
        self.temperature = temperature
        self.top_p = top_p
        self.presence_penalty = presence_penalty
        self.frequency_penalty = frequency_penalty
        self.reply_count = reply_count
        self.api_base = api_base

        if self.get_token_count("default") > self.max_tokens:
            raise Exception("System prompt is too long")

    def add_to_conversation(
        self,
        message: str,
        role: str,
        convo_id: str = "default",
    ) -> None:
        """
        Add a message to the conversation
        """
        self.conversation[convo_id].append({"role": role, "content": message})

    def __truncate_conversation(self, convo_id: str = "default") -> None:
        """
        Truncate the conversation
        """
        while True:
            if (
                self.get_token_count(convo_id) > self.max_tokens
                and len(self.conversation[convo_id]) > 1
            ):
                # Don't remove the first message
                self.conversation[convo_id].pop(1)
            else:
                break

    # https://github.com/openai/openai-cookbook/blob/main/examples/How_to_count_tokens_with_tiktoken.ipynb
    def get_token_count(self, convo_id: str = "default") -> int:
        """
        Get token count
        """
        if self.engine not in ["gpt-3.5-turbo", "gpt-3.5-turbo-0301"]:
            raise NotImplementedError("Unsupported engine {self.engine}")

        encoding = tiktoken.encoding_for_model(self.engine)

        num_tokens = 0
        for message in self.conversation[convo_id]:
            # every message follows <im_start>{role/name}\n{content}<im_end>\n
            num_tokens += 4
            for key, value in message.items():
                num_tokens += len(encoding.encode(value))
                if key == "name":  # if there's a name, the role is omitted
                    num_tokens += -1  # role is always required and always 1 token
        num_tokens += 2  # every reply is primed with <im_start>assistant
        return num_tokens

    def get_max_tokens(self, convo_id: str) -> int:
        """
        Get max tokens
        """
        return 4000 - self.get_token_count(convo_id)

    def ask_stream(
        self,
        prompt: str,
        role: str = "user",
        convo_id: str = "default",
        **kwargs,
    ) -> str:
        """
        Ask a question
        """
        # Make conversation if it doesn't exist
        if convo_id not in self.conversation:
            self.reset(convo_id=convo_id, system_prompt=self.system_prompt)
        self.add_to_conversation(prompt, "user", convo_id=convo_id)
        self.__truncate_conversation(convo_id=convo_id)
        # Get response
        response = self.session.post(
            self.api_base,
            headers={"Authorization": f"Bearer {kwargs.get('api_key', self.api_key)}"},
            json={
                "model": self.engine,
                "messages": self.conversation[convo_id],
                "stream": True,
                # kwargs
                "temperature": kwargs.get("temperature", self.temperature),
                "top_p": kwargs.get("top_p", self.top_p),
                "presence_penalty": kwargs.get("presence_penalty", self.presence_penalty),
                "frequency_penalty": kwargs.get("frequency_penalty", self.frequency_penalty),
                "n": kwargs.get("n", self.reply_count),
                "user": role,
                # "max_tokens": self.get_max_tokens(convo_id=convo_id),
            },
            stream=True,
        )
        if response.status_code != 200:
            raise Exception(
                f"Error: {response.status_code} {response.reason} {response.text}",
            )
        response_role: str = None
        full_response: str = ""
        for line in response.iter_lines():
            if not line:
                continue
            # Remove "data: "
            line = line.decode("utf-8")[6:]
            # print('line:',line)
            if line == "[DONE]":
                break
            resp: dict = json.loads(line)
            choices = resp.get("choices")
            if not choices:
                continue
            delta = choices[0].get("delta")
            if not delta:
                continue
            if "role" in delta:
                response_role = delta["role"]
            if "content" in delta:
                content = delta["content"]
                full_response += content
                yield content
        self.add_to_conversation(full_response, response_role, convo_id=convo_id)

    def ask(
        self,
        prompt: str,
        role: str = "user",
        convo_id: str = "default",
        **kwargs,
    ) -> str:
        """
        Non-streaming ask
        """
        response = self.ask_stream(
            prompt=prompt,
            role=role,
            convo_id=convo_id,
            **kwargs,
        )
        full_response: str = "".join(response)
        return full_response
    
    def rollback(self, n: int = 1, convo_id: str = "default") -> None:
        """
        Rollback the conversation
        """
        for _ in range(n):
            self.conversation[convo_id].pop()

    def reset(self, convo_id: str = "default", system_prompt: str = None) -> None:
        """
        Reset the conversation
        """
        self.conversation[convo_id] = [
            {"role": "system", "content": system_prompt or self.system_prompt},
        ]

    def save(self, file: str, *convo_ids: str) -> bool:
        """
        Save the conversation to a JSON file
        """
        try:
            with open(file, "w", encoding="utf-8") as f:
                if convo_ids:
                    json.dump({k: self.conversation[k] for k in convo_ids}, f, indent=2)
                else:
                    json.dump(self.conversation, f, indent=2)
        except (FileNotFoundError, KeyError):
            return False
        return True
        # print(f"Error: {file} could not be created")

    def load(self, file: str, *convo_ids: str) -> bool:
        """
        Load the conversation from a JSON  file
        """
        try:
            with open(file, encoding="utf-8") as f:
                if convo_ids:
                    convos = json.load(f)
                    self.conversation.update({k: convos[k] for k in convo_ids})
                else:
                    self.conversation = json.load(f)
        except (FileNotFoundError, KeyError, json.decoder.JSONDecodeError):
            return False
        return True
           

折騰半天,最後真正要用的就是下面的用戶端。

用戶端代碼: chat_client.py

import config
from chatapi import Chatbot
api_key = config.OPENAI_API_KEY
api_base = '你的雲函數通路路徑/v1/chat/completions'

msg = [{"role": "user", "content": 'Hello World'}]
bot = Chatbot(api_key=api_key,system_prompt='',api_base=api_base) 

while True:
    prompt = input('Prompt:')
    if len(prompt)<=0 or prompt == 'quit':
        break
    res = bot.ask_stream(prompt)
    print('ChatGPT:')
    print('-'*80)
    for content in res:
        print(content,end = '')
    print('\n','-'*80,end = '\n\n')           

使用說明:

  • api_key = 你自己的 openai key
  • 将 你的雲函數通路路徑,替換成雲函數部署後生成的路徑
  • 将 chatapi.py 和 chat_client 放在同一目錄
  • run chat_client