對于一個程式猿來說,如果對八苦排序,求不得苦應該排第一
雲函數配置
用雲函數不是好方案,但絕對是最簡單的方案。
這是騰訊雲函數的通路位址:
https://console.cloud.tencent.com/scf/list?rid=18&ns=default
配置步驟:
- 選模版建立
- Flask 架構模版
- 按預設走,一直到部署完成
- 在函數配置裡,将逾時設定為 60 秒 [蠟燭]
- 在函數代碼裡,修改 app.py (将下面貼出來的代碼直接覆寫即可)
- 再部署,搞定
- 部署完成後,下面有生成的通路路徑,那個就是用戶端要通路的位址
網上已經有不少講其他語言如何做的了,我這兒貼一個 python 的簡單實作,支援 SSE。
app.py
python 代理程式
from flask import Flask, request, abort, Response
import requests
import os
import json
IS_SERVERLESS = bool(os.environ.get('SERVERLESS'))
# print(IS_SERVERLESS)
app = Flask(__name__)
timeout = 60 # 請求逾時時間(秒)
@app.route('/', defaults={'path': ''})
@app.route('/<path:path>', methods=['GET', 'POST', 'PUT', 'DELETE'])
def proxy(path):
url = f"https://api.openai.com/{path}"
token = request.headers.get('Authorization', None)
if token is None:
abort(403,'No Token')
openai_key = token.split(' ')[1]
if openai_key is None:
abort(403,'No API Key')
options = {
'headers': {
'Content-Type': 'application/json; charset=utf-8',
'Authorization': f"Bearer {openai_key}"
}
}
# 如果請求資料類型是json
if request.is_json:
options['json'] = request.json
else:
options['data'] = request.get_data()
try:
# 如果是 chat completion 和 text completion 請求,使用 SSE
if options['json'].get('stream',False):
response = requests.request(request.method,url,**options, timeout=timeout, stream=True)
if response.ok:
def generate():
for line in response.iter_lines():
if line:
yield f"{line.decode('utf-8')}\n\n"
yield "data: [DONE]\n\n"
return Response(generate(), mimetype='text/event-stream')
else:
response = requests.request(request.method,url,**options, timeout=timeout)
return Response(response.content, content_type=response.headers['Content-Type'])
except requests.exceptions.RequestException as e:
print(e)
return Response(json.dumps({"error": str(e)}), status=500, content_type='application/json')
# 啟動服務,監聽 9000 端口,監聽位址為 0.0.0.0
app.run(debug=IS_SERVERLESS != True, port=9000, host='0.0.0.0',threaded=True)
幾個注意事項:
- 據傳,有用香港伺服器後 Key 被拒,是以小心點,選個東京或者首爾
- 兒童節之後就沒免費午餐了
- 為了支援 SSE,用戶端就别用 openai 的官方庫通路了
- 不是長久之計,用來做測試還是可以的
python 用戶端
api 調用參考源碼:
https://github.com/acheong08/ChatGPT/blob/main/src/revChatGPT/V3.py
我為了調試友善,将代碼拷貝過來改了一些内容,不記得改了什麼,這裡貼出我改過的,用這個代碼就不用裝官方庫(openai)了,直接是通過 requests 通路的,主要是支援 SSE 模式。
官方api 替代(支援SSE):chatapi.py
import argparse
import json
import os
import sys
from typing import NoReturn
import requests
import tiktoken
from utils import create_completer
from utils import create_session
from utils import get_input
ENGINE = os.environ.get("GPT_ENGINE") or "gpt-3.5-turbo"
class Chatbot:
"""
Official ChatGPT API
"""
def __init__(
self,
api_key: str,
engine: str = None,
proxy: str = None,
max_tokens: int = 3000,
temperature: float = 0.5,
top_p: float = 1.0,
presence_penalty: float = 0.0,
frequency_penalty: float = 0.0,
reply_count: int = 1,
system_prompt: str = "You are ChatGPT, a large language model trained by OpenAI. Respond conversationally",
api_base = 'https://api.openai.com/v1/chat/completions'
) -> None:
"""
Initialize Chatbot with API key (from https://platform.openai.com/account/api-keys)
"""
self.engine = engine or ENGINE
self.session = requests.Session()
self.api_key = api_key
self.proxy = proxy
if self.proxy:
proxies = {
"http": self.proxy,
"https": self.proxy,
}
self.session.proxies = proxies
self.conversation: dict = {
"default": [
{
"role": "system",
"content": system_prompt,
},
],
}
self.system_prompt = system_prompt
self.max_tokens = max_tokens
self.temperature = temperature
self.top_p = top_p
self.presence_penalty = presence_penalty
self.frequency_penalty = frequency_penalty
self.reply_count = reply_count
self.api_base = api_base
if self.get_token_count("default") > self.max_tokens:
raise Exception("System prompt is too long")
def add_to_conversation(
self,
message: str,
role: str,
convo_id: str = "default",
) -> None:
"""
Add a message to the conversation
"""
self.conversation[convo_id].append({"role": role, "content": message})
def __truncate_conversation(self, convo_id: str = "default") -> None:
"""
Truncate the conversation
"""
while True:
if (
self.get_token_count(convo_id) > self.max_tokens
and len(self.conversation[convo_id]) > 1
):
# Don't remove the first message
self.conversation[convo_id].pop(1)
else:
break
# https://github.com/openai/openai-cookbook/blob/main/examples/How_to_count_tokens_with_tiktoken.ipynb
def get_token_count(self, convo_id: str = "default") -> int:
"""
Get token count
"""
if self.engine not in ["gpt-3.5-turbo", "gpt-3.5-turbo-0301"]:
raise NotImplementedError("Unsupported engine {self.engine}")
encoding = tiktoken.encoding_for_model(self.engine)
num_tokens = 0
for message in self.conversation[convo_id]:
# every message follows <im_start>{role/name}\n{content}<im_end>\n
num_tokens += 4
for key, value in message.items():
num_tokens += len(encoding.encode(value))
if key == "name": # if there's a name, the role is omitted
num_tokens += -1 # role is always required and always 1 token
num_tokens += 2 # every reply is primed with <im_start>assistant
return num_tokens
def get_max_tokens(self, convo_id: str) -> int:
"""
Get max tokens
"""
return 4000 - self.get_token_count(convo_id)
def ask_stream(
self,
prompt: str,
role: str = "user",
convo_id: str = "default",
**kwargs,
) -> str:
"""
Ask a question
"""
# Make conversation if it doesn't exist
if convo_id not in self.conversation:
self.reset(convo_id=convo_id, system_prompt=self.system_prompt)
self.add_to_conversation(prompt, "user", convo_id=convo_id)
self.__truncate_conversation(convo_id=convo_id)
# Get response
response = self.session.post(
self.api_base,
headers={"Authorization": f"Bearer {kwargs.get('api_key', self.api_key)}"},
json={
"model": self.engine,
"messages": self.conversation[convo_id],
"stream": True,
# kwargs
"temperature": kwargs.get("temperature", self.temperature),
"top_p": kwargs.get("top_p", self.top_p),
"presence_penalty": kwargs.get("presence_penalty", self.presence_penalty),
"frequency_penalty": kwargs.get("frequency_penalty", self.frequency_penalty),
"n": kwargs.get("n", self.reply_count),
"user": role,
# "max_tokens": self.get_max_tokens(convo_id=convo_id),
},
stream=True,
)
if response.status_code != 200:
raise Exception(
f"Error: {response.status_code} {response.reason} {response.text}",
)
response_role: str = None
full_response: str = ""
for line in response.iter_lines():
if not line:
continue
# Remove "data: "
line = line.decode("utf-8")[6:]
# print('line:',line)
if line == "[DONE]":
break
resp: dict = json.loads(line)
choices = resp.get("choices")
if not choices:
continue
delta = choices[0].get("delta")
if not delta:
continue
if "role" in delta:
response_role = delta["role"]
if "content" in delta:
content = delta["content"]
full_response += content
yield content
self.add_to_conversation(full_response, response_role, convo_id=convo_id)
def ask(
self,
prompt: str,
role: str = "user",
convo_id: str = "default",
**kwargs,
) -> str:
"""
Non-streaming ask
"""
response = self.ask_stream(
prompt=prompt,
role=role,
convo_id=convo_id,
**kwargs,
)
full_response: str = "".join(response)
return full_response
def rollback(self, n: int = 1, convo_id: str = "default") -> None:
"""
Rollback the conversation
"""
for _ in range(n):
self.conversation[convo_id].pop()
def reset(self, convo_id: str = "default", system_prompt: str = None) -> None:
"""
Reset the conversation
"""
self.conversation[convo_id] = [
{"role": "system", "content": system_prompt or self.system_prompt},
]
def save(self, file: str, *convo_ids: str) -> bool:
"""
Save the conversation to a JSON file
"""
try:
with open(file, "w", encoding="utf-8") as f:
if convo_ids:
json.dump({k: self.conversation[k] for k in convo_ids}, f, indent=2)
else:
json.dump(self.conversation, f, indent=2)
except (FileNotFoundError, KeyError):
return False
return True
# print(f"Error: {file} could not be created")
def load(self, file: str, *convo_ids: str) -> bool:
"""
Load the conversation from a JSON file
"""
try:
with open(file, encoding="utf-8") as f:
if convo_ids:
convos = json.load(f)
self.conversation.update({k: convos[k] for k in convo_ids})
else:
self.conversation = json.load(f)
except (FileNotFoundError, KeyError, json.decoder.JSONDecodeError):
return False
return True
折騰半天,最後真正要用的就是下面的用戶端。
用戶端代碼: chat_client.py
import config
from chatapi import Chatbot
api_key = config.OPENAI_API_KEY
api_base = '你的雲函數通路路徑/v1/chat/completions'
msg = [{"role": "user", "content": 'Hello World'}]
bot = Chatbot(api_key=api_key,system_prompt='',api_base=api_base)
while True:
prompt = input('Prompt:')
if len(prompt)<=0 or prompt == 'quit':
break
res = bot.ask_stream(prompt)
print('ChatGPT:')
print('-'*80)
for content in res:
print(content,end = '')
print('\n','-'*80,end = '\n\n')
使用說明:
- api_key = 你自己的 openai key
- 将 你的雲函數通路路徑,替換成雲函數部署後生成的路徑
- 将 chatapi.py 和 chat_client 放在同一目錄
- run chat_client