天天看點

用Linux的iptables和Python模拟廣域網

問題:有人在豆瓣Python版提出“在一台PC上模拟多個IP位址向伺服器發起socket連接配接,并在建立連接配接後發資料包“的需求。具體要求是“可以在 界面上輸入想要模拟的IP位址的範圍,比如我想模拟100個使用者,就輸入192.168.4.100——192.168.4.200,然後自動以這些IP 位址向伺服器發socket連接配接和一些資料“,“伺服器側是一個注冊系統,以IP和序列号區分不同使用者,我要想進行壓力測試必須模拟不同的IP位址“。環 境是“測試所有使用的都是私網位址,而且環境比較獨立,是以不必考慮NAT和IP沖突問題“。限制:”伺服器側要是能動就好了,就是不能改動“。以下是我 的解決方法。

搭建環境:伺服器端環境不限。用戶端必須是Linux。伺服器和客戶直接通過LAN連接配接。以上隻是邏輯要求,可以用虛拟機在一台電腦上實作。用戶端ip地 址設定為cip,掩碼nm,伺服器端ip位址設定為sip(sip&nm == cip&nm),掩碼nm,預設網關cip(注意,是把伺服器的預設網關位址設定為客戶ip位址。課後練習:為什麼?)。

軟體安裝:在用戶端裝nfqueue-bindings(http://software.inl.fr/trac/wiki/nfqueue- bindings)。有大量的依賴需要安裝,不過不需要Perl,隻要把CMakeLists.txt中“ADD_SUBDIRECTORY(perl) “這一行去掉。另外libnetfilter-queue要自己下載下傳編譯安裝,至少在我的機器上用rpm安裝後nfqueue-bindings不認。

運作ip模拟器:在用戶端以root權限運作wansim.py(代碼在後面),指令行參數為:

python wansim.py sip cip ip_range

其中sip為上面設定的伺服器ip,cip為上面設定的用戶端機器ip,ip_range為要模拟的ip範圍,例如:

1.1.1.1-254

1.1.1-254.1-254

222-235.1-8.2.3-6 

限制:模拟ip不能進入用戶端-伺服器所用LAN的子網ip範圍,即對任何模拟ip位址fip:fip&nm != sip&nm(課後作業:為什麼?)。

ip模拟器開始運作後,所有從用戶端發起到伺服器端的tcp會話将被ip模拟器從指定ip範圍中随機配置設定一個ip。伺服器程式看到的是這個模拟的ip,而不是cip。

運作:運作伺服器。在客戶機上運作用戶端測試軟體。以下是我Windows上伺服器看到的效果:

用Linux的iptables和Python模拟廣域網

很明顯伺服器看到的是模拟出來的客戶ip。

 最後,上代碼,再次聲明,本人不負任何責任。以下代碼進入公共域。

用Linux的iptables和Python模拟廣域網

wansim.py

import sys

sys.path.append(r'/usr/local/lib/python2.6/site-packages')

import nfqueue

import time

from socket import AF_INET, AF_INET6, inet_ntoa, inet_aton

from dpkt import ip, tcp, hexdump

import re

import random

svr_ip=sys.argv[1]

my_ip=sys.argv[2]

def parse():

        for r in sys.argv[3].split('.'):

                if '-' in r: #xxx-yyy

                        m=re.match(r'(\d+)-(\d+)', r)

                        yield range(int(m.group(1)), int(m.group(2))+1)

                else: #xxx

                        yield range(int(r), int(r)+1)

fake_ip_range=list(parse())

def get_fake_ip():

        ip = '.'.join(str(random.choice(r)) for r in fake_ip_range)

        print 'fake ip: ', ip

        return ip

#Track connection by src port. This is very crude. Should watch tcp more closely.

port2ip={}

time0 = None

def cb(i,payload):

        global time0

        data = payload.get_data()

        pkt = ip.IP(data)

        if not time0:

                time0=time.time()

        print time.time()-time0,\

              "source: %s" % inet_ntoa(pkt.src),\

              "dest: %s" % inet_ntoa(pkt.dst)

        if pkt.p == ip.IP_PROTO_TCP:

                #outgoing to target server

                if inet_ntoa(pkt.dst) == svr_ip:

                        if pkt.tcp.sport in port2ip:

                                pkt.src = port2ip[pkt.tcp.sport]

                        else:

                                fip = inet_aton(get_fake_ip())

                                port2ip[pkt.tcp.sport]=fip

                                pkt.src = fip

                #incoming from target server

                elif inet_ntoa(pkt.src) == svr_ip and \

                     inet_ntoa(pkt.dst) != my_ip:

                        #a stronger condition is to check if it is one of our fake ip

                        pkt.dst = inet_aton(my_ip)

                pkt.tcp.sum = 0

                pkt.sum = 0

                payload.set_verdict_modified(

                        nfqueue.NF_ACCEPT,str(pkt),len(pkt))

                return 0

        payload.set_verdict(nfqueue.NF_ACCEPT)

        sys.stdout.flush()

        return 1

def run(cmd):

        from commands import getstatusoutput

        (s, o) = getstatusoutput(cmd)

        print cmd

        print 'exit code ', s

        if len(o):

                print o

cmds = [

        #intercept outgoing packets

        "iptables -t mangle -A POSTROUTING -p tcp -d %s -j NFQUEUE" % svr_ip,

        #intercept incoming packets

        "iptables -t mangle -A PREROUTING  -p tcp -s %s -j NFQUEUE" % svr_ip,

        #"iptables -A INPUT -p tcp -i eth1 -j NFQUEUE "

]

for c in cmds:

        run(c)

q = nfqueue.queue()

print "open"

q.open()

print "bind"

q.bind(AF_INET);

print "setting callback"

q.set_callback(cb)

print "creating queue"

q.create_queue(0)

print "trying to run"

try:

        q.try_run()

except KeyboardInterrupt, e:

        print "interrupted"

print "unbind"

q.unbind(AF_INET)

print "close"

q.close()

        run(c.replace('-A', '-D'))

如果需要更改模拟ip配置設定的方式,隻需修改get_fake_ip這個函數。