微服務網關(四)tcp代理子產品
tcp代理伺服器的代理實作:
請求流程:
代理的啟停方法
//并發執行
go func() {
tcp_proxy_router.TcpServerRun()
}()
tcp_proxy_router.TcpServerStop()
tcp_server
一次完整流程
tcp_server.go
首先是定義TcpServer結構體
type TcpServer struct {
Addr string //監聽位址
Handler TCPHandler //實際邏輯回調設定handler
err error
BaseCtx context.Context //上下文
//讀寫逾時設定
WriteTimeout time.Duration
ReadTimeout time.Duration
KeepAliveTimeout time.Duration //連接配接一直保持 發資料包的時間
mu sync.Mutex //鎖
inShutdown int32 //是否關閉
doneChan chan struct{}
l *onceCloseListener //單次啟動時,listen需要執行的一次性操作的設定
}
然後定義TCP開啟和關閉方法
ListenAndServe
func (srv *TcpServer) ListenAndServe() error {
//驗證服務是否關閉
//原子方法驗證是否為0
//atomic.LoadInt32(&srv.inShutdown) != 0
if srv.shuttingDown() {
return ErrServerClosed
}
if srv.doneChan == nil {
srv.doneChan = make(chan struct{})
}
addr := srv.Addr
if addr == "" {
return errors.New("need addr")
}
//調用核心的官方方法,并傳入到Serve中
ln, err := net.Listen("tcp", addr)
if err != nil {
return err
}
return srv.Serve(tcpKeepAliveListener{ln.(*net.TCPListener)})
}
Serve
func (srv *TcpServer) Serve(l net.Listener) error {
srv.l = &onceCloseListener{Listener: l}
//退出時執行listener關閉
defer srv.l.Close()
if srv.BaseCtx == nil {
srv.BaseCtx = context.Background()
}
BaseCtx := srv.BaseCtx
ctx := context.WithValue(BaseCtx, ServiceContextKey, srv)
//輪詢Listener的Accept方法,擷取用戶端發來的conn
for {
//輪詢Listener的Accept方法,擷取用戶端發來的conn
rw, err := l.Accept()
if err != nil {
select {
case <-srv.getDoneChan():
return ErrServerClosed
default:
}
fmt.Printf("accept fail, err: %v\n", err)
continue
}
//拿到便馬上建立連接配接,見下文
c := srv.newConn(rw)
//這裡便跳轉到了tcp_conn.go中的serve方法進行協程處理,見下文
go c.serve(ctx)
}
return nil
}
tcp_conn.go
跳轉到tcp_conn.go中的newConn、serve方法
func (srv *TcpServer) newConn(rwc net.Conn) *conn {
c := &conn{
server: srv,
rwc: rwc,
}
// 設定逾時時間參數傳回
if d := c.server.ReadTimeout; d != 0 {
c.rwc.SetReadDeadline(time.Now().Add(d))
}
if d := c.server.WriteTimeout; d != 0 {
c.rwc.SetWriteDeadline(time.Now().Add(d))
}
if d := c.server.KeepAliveTimeout; d != 0 {
if tcpConn, ok := c.rwc.(*net.TCPConn); ok {
tcpConn.SetKeepAlive(true)
tcpConn.SetKeepAlivePeriod(d)
}
}
return c
}
server中,使用recover攔截錯誤資訊的原因:因為是用go開啟的協程,是以裡面的error是用的panic,是以使用了recover進行捕獲
func (c conn) serve(ctx context.Context) {
defer func() {
//recover攔截錯誤資訊并列印
if err := recover(); err != nil && err != ErrAbortHandler {
const size = 64 << 10
buf := make([]byte, size)
buf = buf[:runtime.Stack(buf, false)]
fmt.Printf("tcp: panic serving %v: %v\n%s", c.remoteAddr, err, buf)
}
c.close()
}()
//擷取連接配接位址、上下文、handler
c.remoteAddr = c.rwc.RemoteAddr().String()
ctx = context.WithValue(ctx, LocalAddrContextKey, c.rwc.LocalAddr())
if c.server.Handler == nil {
panic("handler empty")
}
//接着實作Handler中的ServeTCP即可
c.server.Handler.ServeTCP(ctx, c.rwc)
}
tcp_proxy_router
tcpserver.go
TcpServerRun
- 擷取TCP服務清單,将tcp的端口全部打開
func TcpServerRun() {
// 擷取TCP服務清單,将tcp的端口全部打開
serviceList := dao.ServiceManagerHandler.GetTcpServiceList()
for _, serviceItem := range serviceList {
tempItem := serviceItem
//通過将tempItem傳入協程 開啟所有的tcp服務
//具體協程内容見下一個代碼塊
go func(serviceDetail *dao.ServiceDetail) {
//配置回調handler
//......
//配置上下文
//......
//配置tcp伺服器TCPServer
tcpServer := &tcp_server.TcpServer{
Addr: addr,
Handler: routerHandler,
BaseCtx: baseCtx,
}
//放入切片中
tcpServerList = append(tcpServerList, tcpServer)
log.Printf(" [INFO] tcp_proxy_run %v\n", addr)
//開啟監聽
if err := tcpServer.ListenAndServe(); err != nil && err != tcp_server.ErrServerClosed {
log.Fatalf(" [INFO] tcp_proxy_run %v err:%v\n", addr, err)
}
}(tempItem)
}
}
協程内部
- 擷取端口
- 擷取負載均衡
- 建構路由及設定中間件
- 建構回調handler
- 傳入負載均衡政策,路由
- 配置TCPServer
- 調用tcpServer.ListenAndServe()方法開啟服務監聽
//通過将tempItem傳入協程 開啟所有的tcp服務
go func(serviceDetail *dao.ServiceDetail) {
//設定tcp伺服器
//擷取端口
addr := fmt.Sprintf(":%d", serviceDetail.TCPRule.Port)
//擷取負載均衡
rb, err := dao.LoadBalancerHandler.GetLoadBalancer(serviceDetail)
if err != nil {
log.Fatalf(" [INFO] GetTcpLoadBalancer %v err:%v\n", addr, err)
return
}
//建構路由及設定中間件
router := tcp_proxy_middleware.NewTcpSliceRouter()
router.Group("/").Use(
tcp_proxy_middleware.TCPFlowCountMiddleware(),
tcp_proxy_middleware.TCPFlowLimitMiddleware(),
tcp_proxy_middleware.TCPWhiteListMiddleware(),
tcp_proxy_middleware.TCPBlackListMiddleware(),
)
//建構回調handler
routerHandler := tcp_proxy_middleware.NewTcpSliceRouterHandler(
//傳入負載均衡政策,路由
func(c *tcp_proxy_middleware.TcpSliceRouterContext) tcp_server.TCPHandler {
return reverse_proxy.NewTcpLoadBalanceReverseProxy(c, rb)
}, router)
baseCtx := context.WithValue(context.Background(), "service", serviceDetail)
// 配置TCPServer
tcpServer := &tcp_server.TcpServer{
Addr: addr,
Handler: routerHandler,
BaseCtx: baseCtx,
}
//放入切片中
tcpServerList = append(tcpServerList, tcpServer)
log.Printf(" [INFO] tcp_proxy_run %v\n", addr)
//開啟監聽
if err := tcpServer.ListenAndServe(); err != nil && err != tcp_server.ErrServerClosed {
log.Fatalf(" [INFO] tcp_proxy_run %v err:%v\n", addr, err)
}
}(tempItem)
注意這裡還要定義切片,然後将tcpServer放入切片中(36行)因為之後我們還需要利用這個切片數組來關閉tcp服務
ServerStop
周遊tcp切片清單關閉即可
func TcpServerStop() {
for _, tcpServer := range tcpServerList {
tcpServer.Close()
log.Printf(" [INFO] tcp_proxy_stop %v stopped\n", tcpServer.Addr)
}
}
反向代理
TCP反向代理的源碼實作
reverse_proxy
tcp_reverse_proxy.go
首先是TCP反向代理結構體
// TcpReverseProxy TCP反向代理
type TcpReverseProxy struct {
ctx context.Context //單次請求單獨設定
Addr string
KeepAlivePeriod time.Duration //設定
DialTimeout time.Duration //設定逾時時間
DialContext func(ctx context.Context, netWork, address string) (net.Conn, error)
OnDialError func(src net.Conn, dstDialErr error)
ProxyProtocolVersion int
}
接着設定New方法
func NewTcpLoadBalanceReverseProxy(c *tcp_proxy_middleware.TcpSliceRouterContext, lb load_balance.LoadBalance) *TcpReverseProxy {
return func() *TcpReverseProxy {
nextAddr, err := lb.Get("")
if err != nil {
log.Fatal("get next addr fail")
}
//設定上TCP反向代理結構體
return &TcpReverseProxy{
ctx: c.Ctx,
Addr: nextAddr,
KeepAlivePeriod: time.Second,
DialTimeout: time.Second,
}
}()
}
然後就是編寫核心方法ServeTCP
// ServeTCP 傳入上遊 conn,在這裡完成下遊連接配接與資料交換
func (dp *TcpReverseProxy) ServeTCP(ctx context.Context, src net.Conn) {
//設定連接配接逾時
var cancel context.CancelFunc
if dp.DialTimeout >= 0 {
ctx, cancel = context.WithTimeout(ctx, dp.dialTimeout())
}
//開啟與下遊的連接配接,見下文
dst, err := dp.dialContext()(ctx, "tcp", dp.Addr)
if cancel != nil {
cancel()
}
if err != nil {
dp.onDialError()(src, err)
return
}
defer func() { go dst.Close() }() //記得退出下遊連接配接
//設定dst的 keepAlive 參數,在資料請求之前
if ka := dp.keepAlivePeriod(); ka > 0 {
if c, ok := dst.(*net.TCPConn); ok {
c.SetKeepAlive(true)
c.SetKeepAlivePeriod(ka)
}
}
errc := make(chan error, 1)
//開啟協程進行上遊下遊資料交換,見下文
go dp.proxyCopy(errc, src, dst)
go dp.proxyCopy(errc, dst, src)
<-errc
}
//開啟與下遊的連接配接
func (dp *TcpReverseProxy) dialContext() func(ctx context.Context, netWork, address string) (net.Conn, error) {
if dp.DialContext != nil {
return dp.DialContext
}
return (&net.Dialer{
Timeout: dp.DialTimeout, //連接配接逾時
KeepAlive: dp.KeepAlivePeriod, //設定連接配接的檢測時長
}).DialContext
}
//上遊下遊資料交換
func (dp *TcpReverseProxy) proxyCopy(errc chan<- error, dst, src net.Conn) {
_, err := io.Copy(dst, src)
errc <- err
}
結束!
補充
TCP代理特點:
- 流式資料及無狀态資料推薦使用
- 對服務管控比較少,隻能做流量控制及請求來源限制
- 如果有對應協定代理,推薦使用對應協定代理
thrift:
隻使用過一次,記得不太清了,等着以後想起來再回過頭填坑