天天看點

微服務網關(四)tcp代理子產品微服務網關(四)tcp代理子產品補充

微服務網關(四)tcp代理子產品

tcp代理伺服器的代理實作:

微服務網關(四)tcp代理子產品微服務網關(四)tcp代理子產品補充

請求流程:

微服務網關(四)tcp代理子產品微服務網關(四)tcp代理子產品補充

代理的啟停方法

//并發執行
go func() {
	tcp_proxy_router.TcpServerRun()
}()

tcp_proxy_router.TcpServerStop()
           

tcp_server

一次完整流程

tcp_server.go

首先是定義TcpServer結構體

type TcpServer struct {
	Addr    string     //監聽位址
	Handler TCPHandler //實際邏輯回調設定handler
	err     error
	BaseCtx context.Context //上下文
	//讀寫逾時設定
	WriteTimeout     time.Duration
	ReadTimeout      time.Duration
	KeepAliveTimeout time.Duration //連接配接一直保持 發資料包的時間
	mu               sync.Mutex //鎖
	inShutdown       int32      //是否關閉
	doneChan         chan struct{}
	l                *onceCloseListener //單次啟動時,listen需要執行的一次性操作的設定
}
           

然後定義TCP開啟和關閉方法

ListenAndServe

func (srv *TcpServer) ListenAndServe() error {
	//驗證服務是否關閉
		//原子方法驗證是否為0
		//atomic.LoadInt32(&srv.inShutdown) != 0
    
	if srv.shuttingDown() {
		return ErrServerClosed
	}
	if srv.doneChan == nil {
		srv.doneChan = make(chan struct{})
	}
	addr := srv.Addr
	if addr == "" {
		return errors.New("need addr")
	}
	//調用核心的官方方法,并傳入到Serve中
	ln, err := net.Listen("tcp", addr)
	if err != nil {
		return err
	}
	return srv.Serve(tcpKeepAliveListener{ln.(*net.TCPListener)})
}
           

Serve

func (srv *TcpServer) Serve(l net.Listener) error {
	srv.l = &onceCloseListener{Listener: l}
	//退出時執行listener關閉
	defer srv.l.Close()
	if srv.BaseCtx == nil {
		srv.BaseCtx = context.Background()
	}
	BaseCtx := srv.BaseCtx
	ctx := context.WithValue(BaseCtx, ServiceContextKey, srv)
	//輪詢Listener的Accept方法,擷取用戶端發來的conn
	for {
		//輪詢Listener的Accept方法,擷取用戶端發來的conn
		rw, err := l.Accept()
		if err != nil {
			select {
			case <-srv.getDoneChan():
				return ErrServerClosed
			default:
			}
			fmt.Printf("accept fail, err: %v\n", err)
			continue
		}
		//拿到便馬上建立連接配接,見下文
		c := srv.newConn(rw)
        //這裡便跳轉到了tcp_conn.go中的serve方法進行協程處理,見下文
		go c.serve(ctx)
	}
	return nil
}
           

tcp_conn.go

跳轉到tcp_conn.go中的newConn、serve方法

func (srv *TcpServer) newConn(rwc net.Conn) *conn {
   c := &conn{
      server: srv,
      rwc:    rwc,
   }
   // 設定逾時時間參數傳回
   if d := c.server.ReadTimeout; d != 0 {
      c.rwc.SetReadDeadline(time.Now().Add(d))
   }
   if d := c.server.WriteTimeout; d != 0 {
      c.rwc.SetWriteDeadline(time.Now().Add(d))
   }
   if d := c.server.KeepAliveTimeout; d != 0 {
      if tcpConn, ok := c.rwc.(*net.TCPConn); ok {
         tcpConn.SetKeepAlive(true)
         tcpConn.SetKeepAlivePeriod(d)
      }
   }
   return c
}
           

server中,使用recover攔截錯誤資訊的原因:因為是用go開啟的協程,是以裡面的error是用的panic,是以使用了recover進行捕獲

func (c conn) serve(ctx context.Context) {
   defer func() {
      //recover攔截錯誤資訊并列印
      if err := recover(); err != nil && err != ErrAbortHandler {
         const size = 64 << 10
         buf := make([]byte, size)
         buf = buf[:runtime.Stack(buf, false)]
         fmt.Printf("tcp: panic serving %v: %v\n%s", c.remoteAddr, err, buf)
      }
      c.close()
   }()
   //擷取連接配接位址、上下文、handler
   c.remoteAddr = c.rwc.RemoteAddr().String()
   ctx = context.WithValue(ctx, LocalAddrContextKey, c.rwc.LocalAddr())
   if c.server.Handler == nil {
      panic("handler empty")
   }
   //接着實作Handler中的ServeTCP即可
   c.server.Handler.ServeTCP(ctx, c.rwc)
}
           

tcp_proxy_router

tcpserver.go

TcpServerRun
  1. 擷取TCP服務清單,将tcp的端口全部打開
func TcpServerRun() {
	// 擷取TCP服務清單,将tcp的端口全部打開
	serviceList := dao.ServiceManagerHandler.GetTcpServiceList()
	for _, serviceItem := range serviceList {
        tempItem := serviceItem
		//通過将tempItem傳入協程 開啟所有的tcp服務
        //具體協程内容見下一個代碼塊
		go func(serviceDetail *dao.ServiceDetail) {
            //配置回調handler
            //......
            //配置上下文
            //......
            //配置tcp伺服器TCPServer
			tcpServer := &tcp_server.TcpServer{
				Addr:    addr,
				Handler: routerHandler,
				BaseCtx: baseCtx,
			}
			//放入切片中
			tcpServerList = append(tcpServerList, tcpServer)
			log.Printf(" [INFO] tcp_proxy_run %v\n", addr)
			//開啟監聽
			if err := tcpServer.ListenAndServe(); err != nil && err != tcp_server.ErrServerClosed {
				log.Fatalf(" [INFO] tcp_proxy_run %v err:%v\n", addr, err)
			}
        }(tempItem)
	}
}
           

協程内部

  1. 擷取端口
  2. 擷取負載均衡
  3. 建構路由及設定中間件
  4. 建構回調handler
    1. 傳入負載均衡政策,路由
  5. 配置TCPServer
  6. 調用tcpServer.ListenAndServe()方法開啟服務監聽
//通過将tempItem傳入協程 開啟所有的tcp服務
go func(serviceDetail *dao.ServiceDetail) {
   //設定tcp伺服器
   //擷取端口
   addr := fmt.Sprintf(":%d", serviceDetail.TCPRule.Port)
   //擷取負載均衡
   rb, err := dao.LoadBalancerHandler.GetLoadBalancer(serviceDetail)
   if err != nil {
      log.Fatalf(" [INFO] GetTcpLoadBalancer %v err:%v\n", addr, err)
      return
   }
   //建構路由及設定中間件
   router := tcp_proxy_middleware.NewTcpSliceRouter()
   router.Group("/").Use(
      tcp_proxy_middleware.TCPFlowCountMiddleware(),
      tcp_proxy_middleware.TCPFlowLimitMiddleware(),
      tcp_proxy_middleware.TCPWhiteListMiddleware(),
      tcp_proxy_middleware.TCPBlackListMiddleware(),
   )

   //建構回調handler
   routerHandler := tcp_proxy_middleware.NewTcpSliceRouterHandler(
      //傳入負載均衡政策,路由
      func(c *tcp_proxy_middleware.TcpSliceRouterContext) tcp_server.TCPHandler {
         return reverse_proxy.NewTcpLoadBalanceReverseProxy(c, rb)
      }, router)
   baseCtx := context.WithValue(context.Background(), "service", serviceDetail)

   // 配置TCPServer
   tcpServer := &tcp_server.TcpServer{
      Addr:    addr,
      Handler: routerHandler,
      BaseCtx: baseCtx,
   }
    //放入切片中
   tcpServerList = append(tcpServerList, tcpServer)
   log.Printf(" [INFO] tcp_proxy_run %v\n", addr)

   //開啟監聽
   if err := tcpServer.ListenAndServe(); err != nil && err != tcp_server.ErrServerClosed {
      log.Fatalf(" [INFO] tcp_proxy_run %v err:%v\n", addr, err)
   }
}(tempItem)
           

注意這裡還要定義切片,然後将tcpServer放入切片中(36行)因為之後我們還需要利用這個切片數組來關閉tcp服務

微服務網關(四)tcp代理子產品微服務網關(四)tcp代理子產品補充
ServerStop

周遊tcp切片清單關閉即可

func TcpServerStop() {
   for _, tcpServer := range tcpServerList {
      tcpServer.Close()
      log.Printf(" [INFO] tcp_proxy_stop %v stopped\n", tcpServer.Addr)
   }
}
           

反向代理

TCP反向代理的源碼實作

reverse_proxy

tcp_reverse_proxy.go

首先是TCP反向代理結構體

// TcpReverseProxy TCP反向代理
type TcpReverseProxy struct {
   ctx                  context.Context //單次請求單獨設定
   Addr                 string
   KeepAlivePeriod      time.Duration //設定
   DialTimeout          time.Duration //設定逾時時間
   DialContext          func(ctx context.Context, netWork, address string) (net.Conn, error)
   OnDialError          func(src net.Conn, dstDialErr error)
   ProxyProtocolVersion int
}
           

接着設定New方法

func NewTcpLoadBalanceReverseProxy(c *tcp_proxy_middleware.TcpSliceRouterContext, lb load_balance.LoadBalance) *TcpReverseProxy {
	return func() *TcpReverseProxy {
		nextAddr, err := lb.Get("")
		if err != nil {
			log.Fatal("get next addr fail")
		}
		//設定上TCP反向代理結構體
		return &TcpReverseProxy{
			ctx:             c.Ctx,
			Addr:            nextAddr,
			KeepAlivePeriod: time.Second,
			DialTimeout:     time.Second,
		}
	}()
}
           

然後就是編寫核心方法ServeTCP

// ServeTCP 傳入上遊 conn,在這裡完成下遊連接配接與資料交換
func (dp *TcpReverseProxy) ServeTCP(ctx context.Context, src net.Conn) {
   //設定連接配接逾時
   var cancel context.CancelFunc
   if dp.DialTimeout >= 0 {
      ctx, cancel = context.WithTimeout(ctx, dp.dialTimeout())
   }
   //開啟與下遊的連接配接,見下文
   dst, err := dp.dialContext()(ctx, "tcp", dp.Addr)
   if cancel != nil {
      cancel()
   }
   if err != nil {
      dp.onDialError()(src, err)
      return
   }
   defer func() { go dst.Close() }() //記得退出下遊連接配接

   //設定dst的 keepAlive 參數,在資料請求之前
   if ka := dp.keepAlivePeriod(); ka > 0 {
      if c, ok := dst.(*net.TCPConn); ok {
         c.SetKeepAlive(true)
         c.SetKeepAlivePeriod(ka)
      }
   }
   errc := make(chan error, 1)
   //開啟協程進行上遊下遊資料交換,見下文
   go dp.proxyCopy(errc, src, dst)
   go dp.proxyCopy(errc, dst, src)
   <-errc
}
           
//開啟與下遊的連接配接
func (dp *TcpReverseProxy) dialContext() func(ctx context.Context, netWork, address string) (net.Conn, error) {
   if dp.DialContext != nil {
      return dp.DialContext
   }
   return (&net.Dialer{
      Timeout:   dp.DialTimeout,     //連接配接逾時
      KeepAlive: dp.KeepAlivePeriod, //設定連接配接的檢測時長
   }).DialContext
}
//上遊下遊資料交換
func (dp *TcpReverseProxy) proxyCopy(errc chan<- error, dst, src net.Conn) {
	_, err := io.Copy(dst, src)
	errc <- err
}

           

結束!

補充

TCP代理特點:

  • 流式資料及無狀态資料推薦使用
  • 對服務管控比較少,隻能做流量控制及請求來源限制
  • 如果有對應協定代理,推薦使用對應協定代理

thrift:

​ 隻使用過一次,記得不太清了,等着以後想起來再回過頭填坑