iptables 面向对象封装
转载请与作者联系,转载时请务必标明文章原始出处和作者信息及本声明。
<a target="_top" href="http://creativecommons.org/licenses/by/3.0/"></a>

使用案例
########################################
# test
test = firewall()
test.flush()
test.policy()
test.policy(test.input,test.drop)
test.chain('input').accept()
test.interface('-i',"eth0").accept('# error test')
test.chain('input').interface('-i',"eth0").accept('# ok test')
test.chain('output').interface('-o',"eth0").protocol('icmp').accept()
test.output().interface('-i',"eth0").protocol('tcp').accept('')
test.chain('output').inbound("eth0").protocol('tcp').source('172.16.1.0/24').accept('')
test.chain('output').outbound("eth0").protocol('tcp').destination('172.16.1.1').accept('')
test.chain('forward').inbound("eth0").outbound("eth0").protocol('tcp').source('172.16.1.0/24').destination('172.16.1.1').accept()
test.input().interface('-i',"eth0").protocol('tcp').state('new').accept()
test.chain('input').interface('-i',"eth0").protocol('tcp').state('new').dport('21').accept()
test.chain('input').inbound("eth0").protocol('tcp').state('new').dport(('3306','1152','5432')).accept('multiport test')
test.forward().source("172.16.0.1/24").protocol('tcp').string('sex').accept()
test.forward().dport("53").protocol('udp').time('8:00','18:00','mon,tue,wed,thu,fri,sat').accept()
test.forward().proto('udp').dport("53").string('movie').time('8:00','18:00','mon,tue,wed,thu,fri,sat').accept()
test.input().inbound('ppp0').connlimit(20).drop()
test.forward().reject('--reject-with icmp-host-prohibited')
#test.show()
test.save('/tmp/firewall.txt')
# demo desktop pc
single = firewall()
single.policy(single.input,single.drop)
single.policy(single.output,single.accept)
single.policy(single.forward,single.drop)
single.input().protocol('icmp').drop()
single.input().protocol('tcp').dport(('3389','5900')).accept()
single.input().protocol('tcp').dport(('137','138','139','145')).accept()
#single.show()
#single.run()
#single.list()
# demo office server
office = firewall()
office.flush()
office.policy(office.input,office.drop)
office.policy(office.output,office.accept)
office.policy(office.forward,office.drop)
office.input().state(('related','established')).accept()
office.input().protocol('icmp').accept()
office.input().inbound('eth0').protocol('udp').dport(('53','1194')).accept()
office.input().inbound('eth0').protocol('udp').dport(('68','68')).accept()
office.input().protocol('tcp').dport(('20','21','22','80')).accept()
office.input().protocol('tcp').dport(('5800','5900')).accept()
office.input().protocol('tcp').dport(('137','138','139','145')).accept()
office.show()
office.run()
office.list()
# demo idc server
server = firewall()
server.flush()
server.policy(server.input,server.drop)
server.policy(server.output,server.drop)
server.policy(server.forward,server.drop)
server.input().state(('related','established')).accept()
server.input().protocol('icmp').accept()
#server.input().destination('192.168.0.0/24').accept()
server.input().protocol('tcp').dport(('21','22','80')).state('new').accept()
server.input().protocol('udp').dport(('53','1194')).accept()
server.input().protocol('tcp').source('172.16.1.0/24').dport('3306').accept()
server.output().protocol('icmp').accept()
server.output().destination('192.168.0.0/24').accept()
server.output().destination('172.16.0.5').reject()
server.output().destination('172.16.0.0/24').accept()
server.output().protocol('udp').dport('53').accept()
server.output().protocol('tcp').dport(('80','21','20','22','8000')).accept()
server.chain('prerouting').inbound('eth0').proto('tcp').dport('80').dnat('--to-destination 192.168.0.1:3128')
server.output().destination('172.16.0.10').proto('tcp').dport('3306').accept()
#server.show()
#server.run()
#server.list()
# linux gateway via pppoe
gateway = firewall()
gateway.input().drop()
gateway.output().accept()
gateway.inside().state(('related','established')).accept('# match test')
gateway.forward().destination('127.16.0.0/24').accept()
gateway.chain('postrouting').inbound("ppp0").source('172.16.0.0/24').masquerade()
#gateway.show()
# cisco asa style
gateway.inside().accept()
gateway.outside().drop()
# juniper junos style
gateway.trust().accept()
gateway.untrust().drop()
代码
#!/usr/bin/env python
###########################################
# linux firewall management pkg
# homepage: http://netkiller.github.com
# author: neo chen <[email protected]>
# nickname: netkiller
import os, sys
import types
class service():
def __init__(self):
pass
def name(self):
def protocol(self):
def port(self, src, dst):
def www(self):
return '80'
class address():
class protocol():
imcp = 'icmp'
tcp = 'tcp'
udp = 'udp'
class firewall(service, address):
input = 'input'
output = 'output'
forward = 'forward'
prerouting = 'prerouting'
postrouting = 'postrouting'
accept = 'accept'
drop = 'drop'
reject = 'reject'
self.accesslist = []
self.match = []
self.nic = []
self.iptables = 'iptables'
self.a = ''
self.p = ''
self.src = ''
self.dst = ''
self.port = ''
self.ip = ''
self.m = ''
self.err = true
#self.clear()
def clear(self):
def flush(self):
self.accesslist.append('iptables -f')
self.accesslist.append('iptables -f -t nat')
self.accesslist.append('iptables -f -t filter')
self.accesslist.append('iptables -t nat -p prerouting accept')
self.accesslist.append('iptables -t nat -p postrouting accept')
def policy(self,chain = none, target = none):
if chain and target:
self.accesslist.append('iptables -p '+chain+' '+target)
else:
self.accesslist.append('iptables -p input accept')
self.accesslist.append('iptables -p output accept')
self.accesslist.append('iptables -p forward accept')
def chain(self,tmp):
if tmp in ('input', 'output', 'forward'):
self.a = '-a ' + tmp
self.err = false
elif tmp in ('prerouting', 'postrouting'):
self.a = '-t nat -a ' + tmp
self.a = none
self.err = true
return( self )
def input(self):
return self.chain('input')
def output(self):
return self.chain('output')
def forward(self):
return self.chain('forward')
def inside(self):
def outside(self):
def trust(self):
def untrust(self):
return self.chain('input')
def interface(self,inter, name):
if inter and name:
self.nic.append(inter + ' ' + name)
return( self )
def inbound(self,tmp):
if tmp:
self.interface('-i', tmp)
def outbound(self,tmp):
self.interface('-o', tmp)
def protocol(self,tmp):
if tmp in ('tcp', 'udp', 'icmp','gre'):
self.p = "-p " + tmp
self.p = ''
def proto(self,tmp):
return self.protocol(tmp)
def source(self, src):
if src :
self.src = "-s " + src
self.src = ''
def destination(self, dst):
if dst:
self.dst = "-d " + dst
self.dst = ''
def state(self, tmp):
if type(tmp) == types.stringtype:
self.match.append('-m state --state ' + tmp)
elif type(tmp) == types.tupletype:
self.match.append('-m state --state ' + ','.join(tmp))
pass
def string(self, tmp):
self.match.append('-m string --string "' +tmp+'"')
def time(self, start, stop, days):
if start and stop and days:
self.match.append('-m time --timestart '+start+' --timestop '+stop+' --days ' +days+' ')
def connlimit(self, tmp):
self.match.append('-m connlimit --connlimit-above ' +str(tmp)+'')
def sport(self,tmp):
self.match.append('--sport ' + tmp)
self.match.append('-m multiport --sports ' + ','.join(tmp))
def dport(self,tmp):
type(tmp)
self.match.append('--dport ' + str(tmp))
self.match.append('-m multiport --dports ' + ','.join(tmp))
# author: neo chen <[email protected]>
def target(self, targetname, desc = none):
if targetname in ('accept', 'drop', 'reject', 'return', 'queue', 'masquerade', 'dnat', 'snat'):
self.acl_line = []
self.acl_line.append(self.iptables)
if self.a: self.acl_line.append(self.a)
if self.nic: self.acl_line.append(' '.join(self.nic))
if self.p: self.acl_line.append(self.p)
if self.src: self.acl_line.append(self.src)
if self.dst: self.acl_line.append(self.dst)
if self.match: self.acl_line.append(' '.join(self.match))
self.acl_line.append('-j ' + targetname)
if desc:
self.acl_line.append(desc)
if self.err:
acsess_list = '# ' + ' '.join(self.acl_line)
acsess_list = ' '.join(self.acl_line)
self.accesslist.append(acsess_list)
self.clear()
def accept(self,desc = none):
self.target('accept', desc)
def reject(self,desc = none):
self.target('reject', desc)
def drop(self,desc = none):
self.target('drop', desc)
def masquerade(self):
self.target('masquerade')
def dnat(self,desc = none):
self.target('dnat',desc)
def snat(self,desc = none):
self.target('snat',desc)
def show(self):
print('\n'.join(self.accesslist))
def run(self):
for line in self.accesslist:
os.system(line)
def save(self,filename):
try:
ipt = open(filename,'w')
for line in self.accesslist:
ipt.write(line)
ipt.write("\n")
ipt.close()
except ioerror as e:
print(e)
def list(self):
os.system('sudo iptables -s')
#os.system('sudo iptables -l --line-numbers')
输出结果
iptables -f
iptables -f -t nat
iptables -f -t filter
iptables -t nat -p prerouting accept
iptables -t nat -p postrouting accept
iptables -p input accept
iptables -p output accept
iptables -p forward accept
iptables -p input drop
iptables -a input -j accept sss
# iptables -i eth0 -j accept # error test
iptables -a input -i eth0 -j accept # ok test
iptables -a output -o eth0 -p icmp -j accept
iptables -a output -i eth0 -p tcp -j accept
iptables -a output -i eth0 -p tcp -s 172.16.1.0/24 -j accept
iptables -a output -o eth0 -p tcp -d 172.16.1.1 -j accept
iptables -a forward -i eth0 -o eth0 -p tcp -s 172.16.1.0/24 -d 172.16.1.1 -j accept
iptables -a input -i eth0 -p tcp -m state --state new -j accept
iptables -a input -i eth0 -p tcp -m state --state new --dports 21 -j accept
iptables -a input -i eth0 -p tcp -m state --state new -m multiport --dports 3306,1152,5432 -j accept multiport test
iptables -a forward -p tcp -s 172.16.0.1/24 -m string --string "sex" -j accept
iptables -a forward -p udp --dports 53 -m time --timestart 8:00 --timestop 18:00 --days mon,tue,wed,thu,fri,sat -j accept
iptables -a forward -p udp --dports 53 -m string --string "movie" -m time --timestart 8:00 --timestop 18:00 --days mon,tue,wed,thu,fri,sat -j accept
iptables -a input -i ppp0 -m connlimit --connlimit-above 20 -j drop
iptables -a forward -j reject --reject-with icmp-host-prohibited