天天看點

Melang之協程并發代理實戰

版權聲明:本文為部落客原創文章,遵循<a href="http://creativecommons.org/licenses/by-sa/4.0/"target="_blank" rel="noopener"> CC 4.0 BY-SA 版權協定,轉載請附上原文出處連結和本聲明。本文連結

之前的文章中,給大家介紹了一種新的協程語言——Melang。 今天,給大家帶來的是這款語言的企業首戰,雖然是個較小的項目,但對于一款新語言的意義無疑是巨大的。并且,利用這款語言,讓整個程式結構極為清晰與子產品化。 由于筆者公司想搭建一個代理服務供其他網段機器上網之用,是以有了本文的項目。注意:本文隻是用于介紹語言特性和使用,并不鼓勵讀者違背國家政策法規,請勿将此文内容用于技術讨論外的一切其他用途。

程式結構

在之前的文章中我們介紹過,Melang的每一個腳本任務都是一個協程,協程之間的排程是搶占式的,協程之間的運作環境是隔離的,且一個協程還可以拉起其他協程。 是以,本文的socks5代理将采用協程并發的模式,協程結構如下:

Melang之協程并發代理實戰

每一個工作協程獨立處理一個TCP連接配接。 故此,在通路web站點時,會由浏覽器發起多個TCP到本代理,由主協程完成TCP的建立,然後拉起一個獨立的工作協程處理該TCP上的協定和資料收發。當TCP連接配接斷開收尾工作結束後,工作協程退出釋放。 注:本文給出的代理目前僅支援TCP代理。

Socks5

這裡捎帶提及一下socks5協定。 這個協定是比較簡單的,大緻流程如下:

  1. 建立TCP後,代理伺服器會收到用戶端的握手封包
  2. 伺服器端驗證握手封包中的驗證方式,并回複響應封包
  3. 用戶端收到後驗證完會發送本次代理資料的目的位址(可能是域名或IP)和端口
  4. 代理伺服器嘗試向該位址建立TCP,然後給用戶端傳回響應封包
  5. 目前4步完成後,就進入了資料透傳的階段

實作

廢話再多不如代碼上桌。 代碼分為兩個檔案,一個是主協程腳本proxy.mln,另一個是工作協程腳本worker.mln,我們分别給出:

proxy.mln
recvTimeout = 50;
fd = @mln_tcpListen('0.0.0.0', '1080');
@mln_print('Ready');
while (1) {
  sockfd = @mln_tcpAccept(fd);
  if (sockfd) {
    conf = [
      'fd': sockfd,
      'recvTimeout': recvTimeout,
    ];
    @mln_eval('worker.mln', @mln_json_encode(conf));
  } fi
}複制代碼      

可以看到,主協程的任務非常簡單:

  1. 建立監聽套接字
  2. 死循環建立TCP,并為每個TCP拉起一個worker.mln任務進行處理。
worker.mln
conf = @mln_json_decode(EVAL_DATA);
localFd = @mln_int(conf['fd']);
recvTimeout = @mln_int(conf['recvTimeout']);
state = 1;
localSend = '';
remoteSend = '';
remoteFd = nil;

@closeLocalSock()
{
  @mln_tcpClose(_localFd);
  _localFd = nil;
  _localSend = '';
}

@closeRemoteSock()
{
  @mln_tcpClose(_remoteFd);
  _remoteFd = nil;
  _remoteSend = '';
}

@localRecvHandler()
{
  if (_state == 1) {
    if (@mln_strlen(_remoteSend) < 3 || @mln_bin2int(_remoteSend[0]) != 5) {
      @closeLocalSock();
      return;
    } fi
    n = @mln_bin2int(_remoteSend[1]);
    if (@mln_strlen(_remoteSend) < n+2) {
      @closeLocalSock();
      return;
    } fi
    for (i = 0; i < n; ++i) {
      if (@mln_bin2int(_remoteSend[2+i]) == 0) {
        break;
      } fi
    }
    if (i >= n) {
      @closeLocalSock();
      return;
    } fi
    ret = @mln_tcpSend(_localFd, @mln_int2bin(5)[-1]+@mln_int2bin(0)[-1]);
    if (!ret) {
      @closeLocalSock();
      return;
    } fi
    _remoteSend = @mln_split(_remoteSend, n+2);
    _state = 2;
  } else if (_state == 2) {
    arr = [5, 7, 0, 1, 0, 0, 0, 0, 0, 0];
    err = '';
    for (i = 0; i < @mln_size(arr); ++i) {
      err += @mln_int2bin(arr[i])[-1];
    }
    len = @mln_strlen(_remoteSend);
    if (len < 8 || @mln_bin2int(_remoteSend[0]) != 5 || @mln_bin2int(_remoteSend[1]) != 1 || @mln_bin2int(_remoteSend[2]) != 0) {
      goto fail;
    } fi
    type = @mln_bin2int(_remoteSend[3]);
    addr = '';
    if (type == 1) {
      if (len < 10) {
        goto fail;
      } fi
      for (i = 0; i < 4; ++i) {
        addr += @mln_str(@mln_bin2int(_remoteSend[4+i]));
        if (i < 3) {
          addr += '.';
        } fi
      }
      n = 8;
    } else if (type == 3) {
      n = 5+@mln_bin2int(_remoteSend[4]);
      if (len < n+2) {
        goto fail;
      } fi
      addr = @mln_split(_remoteSend, 5, @mln_bin2int(_remoteSend[4]));
    } else if (type == 4) {
      if (len < 22) {
        goto fail;
      } fi
      for (i = 0; i < 8; ++i) {
        addr += @mln_bin2hex(_remoteSend[4+i*2]);
        addr += @mln_bin2hex(_remoteSend[4+i*2+1]);
        if (i < 7) {
          addr += ':';
        } fi
      }
      n = 20;
    } else {
      goto fail;
    }
    if (len < n + 2) {
      goto fail;
    } fi
    port = (@mln_bin2int(_remoteSend[n])<<8)|@mln_bin2int(_remoteSend[n+1]);

    @mln_print('connect['+addr+']');
    ret = @mln_tcpConnect(addr, @mln_str(port), 30000);
    if (!ret) {
      goto fail;
    } fi
    _remoteFd = ret;

    ret = ''+@mln_int2bin(5)[-1]+@mln_int2bin(0)[-1]+@mln_int2bin(0)[-1]+_remoteSend[3];
    ret += @mln_split(_remoteSend, 4, n - 2);
    ret = @mln_tcpSend(_localFd, ret);
    if (!ret) {
      @closeRemoteSock();
      @closeLocalSock();
      return;
    } fi
    _remoteSend = @mln_split(_remoteSend, n+2);
    _state = 3;
  } else {
    ret = @mln_tcpSend(_remoteFd, _remoteSend);
    if (!ret) {
      @closeRemoteSock();
    } else {
      _remoteSend = '';
    }
  }
  return;

fail:
  @mln_tcpSend(_localFd, err);
  @closeLocalSock();
  return;
}

//@mln_print(''+localFd);
while (1) {
  if (localFd) {
      if (state == 3 && !remoteFd && !localSend) {
        @closeLocalSock();
      } else {
        res = @mln_tcpRecv(localFd, recvTimeout);
        if (res) {
          if (@mln_isBool(res)) {
            @closeLocalSock();
          } else {
            remoteSend += res;
          }
        } else if (@mln_isBool(res)) {
          @closeLocalSock();
        } fi
      }
  } fi
  if (remoteFd) {
      if (state == 3 && !localFd && !remoteSend) {
        @closeRemoteSock();
      } else {
        res = @mln_tcpRecv(remoteFd, recvTimeout);
        if (res) {
          if (@mln_isBool(res)) {
            @closeRemoteSock();
          } else {
            localSend += res;
          }
        } else if (@mln_isBool(res)) {
            @closeRemoteSock();
        } fi
      }
  } fi
  if (@mln_isNil(localFd) && @mln_isNil(remoteFd)) {
    break;
  } fi
  if (remoteSend) {
    @localRecvHandler();
  } fi
  if (localSend) {
    ret = @mln_tcpSend(_localFd, _localSend);
    _localSend = '';
    if (!ret) {
      @closeLocalSock();
    } fi
  } fi
}
@mln_print('quit');複制代碼      
  • conf是從主協程傳過來的配置;
  • locaFd就是剛剛建立的套接字;
  • recvTimeout是tcp接收資料的逾時時間(毫秒),由于Melang的代碼是同步代碼(底層異步執行),是以需要有防止長時間阻塞在一個函數上的機制。但由于是同步模式代碼,是以執行流程是非常清晰的;
  • state是用來标記socks5的狀态:1-處理握手封包階段,2-處理目的位址封包并建立連接配接階段,3-資料透傳階段;
  • remoteFd是state為2時向目的位址建立的TCP套接字;
  • localSend和remoteSend是兩個發送緩沖區,localSend是發送給localFd的資料,而remoteSend是發送給remoteFd的資料(state不為3時不會發給remoteFd);
  • closeLocalSock函數就是用來關閉與用戶端通信的TCP連接配接并清空發送緩沖區;
  • closeRemoteSock函數就是用來關閉與目的位址通信的TCP連接配接并清空發送緩沖區;
  • localRecvHandler函數用來處理與用戶端通信的套接字資料的,由于有兩個階段的socks5相關處理,是以相對冗長一些;
  • while死循環,這個循環就是用來接收兩個TCP上資料,然後處理,然後再由指定的TCP發送出去;

結尾