天天看点

squid 日志分析 - hadoop hive

 #!/usr/bin/env python

#-*-coding:UTF-8-*-

"""

@Item   :  Hadoop analysis squid log

@Author :  Villiam Sheng

@Group  :  Linux Group

@Date   :  2012-09-13

@Funtion:

            Use hadoop squid processing log ......................

import os,re,threading,tarfile,shutil,time,sys,datetime

import sys,traceback

sys.path.append('/usr/local/hive/lib/py')

from thrift import Thrift

from thrift.transport import TSocket

from thrift.transport import TTransport

from thrift.protocol import TBinaryProtocol

from hive_service import ThriftHive

from hive_service.ttypes import HiveServerException

class split_log(threading.Thread):

    def __init__(self,files,lock):

        threading.Thread.__init__(self)

        self.files  = files

        self.path = '/data1/squid_log'

        self.lock = lock

    def log(self,info):

        files = open('%s/split_log'%self.path,'a')

        try:

            files.write(info)

        except IOError:

            files.close()

        files.close()

        size = os.path.getsize('%s/split_log'%self.path) / 1024

        if size >= 1024:

            fp = open('%s/split_log'%self.path,'w')

            fp.write('')

            fp.close()

    def run(self):

            if not os.path.isdir(self.files):

                tar = tarfile.open('%s/%s'%(self.path,self.files))

                for name in tar.getnames():

                    tar.extract(name,'/%s/%s/'%(self.path,self.files.split('.tgz')[0]))

                tar.close()

                self.log('%s file:%s run.info:tar done \n'%(time.ctime(),self.files))

        except Exception,e:

            self.log('%s files:%s run.info:%s \n'%(time.ctime(),self.files,e))

            fp = open('%s/%s/access.log.0'%(self.path,self.files.split('.tgz')[0]),'r')

            for i in fp.readlines():

                b = i.split()

                fps = open('%s/%s/tmp.log'%(self.path,self.files.split('.tgz')[0]),'a+')

                fps.write("%s\t%s\t%s\t%s\t%s\n" %(datetime.datetime.strptime(b[3].split('[')[1],'%d/%b/%Y:%H:%M:%S').strftime('%Y%m%d%H%M%S'),b[0],i.split('/')[4],b[-5],b[-1]))

            fps.close()

             self.log('%s file:%s inert.info:insert done \n'%(time.ctime(),self.files.split('.tgz')[0]))

            asql="load data local inpath \'%s/%s/tmp.log\' overwrite into table squid.squid_tmp partition(pt_ip = \'%s\')" %(self.path,self.files.split('.tgz')[0],self.files.split('.tgz')[0])

            self.lock.acquire()

            transport = TSocket.TSocket('127.0.0.1',10000)

            transport = TTransport.TBufferedTransport(transport)

            protocol = TBinaryProtocol.TBinaryProtocol(transport)

            client = ThriftHive.Client(protocol)

            transport.open()

            try:

                client.execute(asql)

            except:

                pass

            transport.close()

            self.lock.release()

            os.remove ('%s/%s'%(self.path,self.files))

            shutil.rmtree('%s/%s' %(self.path,self.files.split('.tgz')[0]))

        except:

            self.log('Exception error : %s '%traceback.print_exc())

def work():

    ip_list = re.compile('^(([1-9]|([1-9]\d)|(1\d\d)|(2([0-4]\d|5[0-5])))\.)(([1-9]|([1-9]\d)|(1\d\d)|(2([0-4]\d|5[0-5])))\.){2}([1-9]|([1-9]\d)|(1\d\d)|(2([0-4]\d|5[0-5])))$')

    lock = threading.RLock()

    for i in os.listdir('/data1/squid_log/'):

        if not ip_list.match(i.split('.tgz')[0]):

            continue

        st = split_log(i,lock)

        st.start()

    while threading.active_count() != 1:

        time.sleep(1)

    transport = TSocket.TSocket('127.0.0.1',10000)

    transport = TTransport.TBufferedTransport(transport)

    protocol = TBinaryProtocol.TBinaryProtocol(transport)

    client = ThriftHive.Client(protocol)

    transport.open()

    client.execute("use squid")

    client.execute("set hive.exec.dynamic.partition=true")

    client.execute("set hive.exec.dynamic.partition.mode=nonstrict")

    sql = "insert into table squid_log partition(pt_ip, pt_dt) select visittime, clientip, visitdom, visiturl, visitstat, pt_ip, substr(visittime, 1, 8) as pt_dt from squid_tmp"

    client.execute(sql)

    transport.close()

if __name__ == "__main__":

    try:

        pid = os.fork()

        if pid > 0 :

            sys.exit(0)

        os.setsid()

        os.chdir('/')

        sys.stdin = open("/dev/null","r+")

        sys.stdout = os.dup(sys.stdin.fileno())

        sys.stderr = os.dup(sys.stdin.fileno())

        while True:

            work()

            time.sleep(300)

        work()

    except:

        pass

本文转自 swq499809608 51CTO博客,原文链接:http://blog.51cto.com/swq499809608/1142212