天天看點

鬥魚直播間彈幕爬取2020年最新python

鬥魚直播間彈幕2020年最新

    我最近在學習python的直播間彈幕爬取,但是由于鬥魚官方把第三方api的接口改變了,必須要注冊為開發者才能使用官方提供的方法進行彈幕爬取。是以我通過搜尋教程了解到可以使用浏覽器自帶的爬取功能對彈幕進行爬取。

原理如下:

利用websocket建立wss 連接配接

wss://danmuproxy.douyu.com:8506/'

8501-8507都可以使用。

發送登入資訊

發生入組資訊

發送心跳資料,(和b站不一樣更進階了,有心跳資料了)。

利用wss必須對發送包進行加密,對接收的資料進行解包,

這些操作官方api 有提供,是以我就不再進行解釋了。

就能傳回彈幕資料。完整代碼如下,将roomid = "666743" 改成你需要的id。

以下代碼并非我獨創,有借鑒别人的。

import websocket
import threading
import time
class DyDanmuMsgHandler:
    # 将字元串資料按照鬥魚協定封裝為位元組流
    def dy_encode(self,msg):
        # 頭部8位元組,尾部1位元組,與字元串長度相加即資料長度
        # 為什麼不加最開頭的那個消息長度所占4位元組呢?這得問問鬥魚^^
        data_len = len(msg) + 9
        # 字元串轉化為位元組流
        msg_byte = msg.encode('utf-8')
        # 将資料長度轉化為小端整數位元組流
        len_byte = int.to_bytes(data_len, 4, 'little')
        # 前兩個位元組按照小端順序拼接為0x02b1,轉化為十進制即689(《協定》中規定的用戶端發送消息類型)
        # 後兩個位元組即《協定》中規定的加密字段與保留字段,置0
        send_byte = bytearray([0xb1, 0x02, 0x00, 0x00])
        # 尾部以'\0'結束
        end_byte = bytearray([0x00])
        # 按順序拼接在一起
        data = len_byte + len_byte + send_byte + msg_byte + end_byte
        return data

    def __parse_msg(self,raw_msg):
        '''
        解析資料
        :param raw_msg: 原始response資料
        :return:
        '''
        res = {}
        attrs = raw_msg.split('/')[0:-1]
        for attr in attrs:
            attr = attr.replace('@s','/')
            attr = attr.replace('@A','@')
            couple = attr.split('@=')
            res[couple[0]] = couple[1]
        return res

    def dy_decode(self,msg_byte):
        '''
        解析鬥魚傳回的資料
        :param msg_byte:
        :return:
        '''
        pos = 0
        msg = []
        while pos < len(msg_byte):
            content_length = int.from_bytes(msg_byte[pos: pos + 4], byteorder='little')
            content = msg_byte[pos + 12: pos + 3 + content_length].decode(encoding='utf-8', errors='ignore')
            msg.append(content)
            pos += (4 + content_length)
        return msg
    
    def get_chat_messages(self,msg_byte):
        '''
        從資料擷取chatmsg資料
        :param msg_byte:
        :return:
        '''
        decode_msg = self.dy_decode(msg_byte)
        messages = []
        for msg in decode_msg:
            res = self.__parse_msg(msg)
            if res['type'] !='chatmsg':
                continue
            messages.append(res)
        return messages    

class DyDanmuCrawler:
    def __init__(self,roomid):
        self.__room_id = roomid
        self.__heartbeat_thread = None
        self.__client = DyDanmuWebSocketClient(on_open=self.__prepare,
                                               on_message=self.__receive_msg,
                                               on_close=self.__stop)
        self.__msg_handler =  DyDanmuMsgHandler()
        self.__keep_HeartBeat = True
    
    def start(self):
        '''
        開啟用戶端
        :return:
        '''
        self.__client.start()

    def __stop(self):
        '''
        登出
        停止用戶端
        停止心跳線程
        :return:
        '''
        self.__logout()
        self.__client.stop()
        self.__keep_HeartBeat=False


    def on_error(self, error):
        print(error)

    def on_close(self):
        print('close')

    # 發送入組消息
    def join_group(self):
        '''
        發送群組消息
        :return:
        '''
        join_group_msg = '[email protected]=joingroup/[email protected]=%s/[email protected]=1/' % (self.__room_id)
        msg_bytes = self.__msg_handler.dy_encode(join_group_msg)
        self.__client.send(msg_bytes)

    # 發送登入請求消息
    def login(self):
        '''
        登陸
        :return:
        '''
        login_msg = '[email protected]=loginreq/[email protected]=%s/[email protected][email protected][email protected]@AA=1/' \
                    '[email protected]=%s/[email protected]=%s/[email protected]=20190610/[email protected]=218101901/[email protected]=0/.'%(
            self.__room_id,'99047358','99047358'
        )
        msg_bytes = self.__msg_handler.dy_encode(login_msg)
        self.__client.send(msg_bytes)

    def __start_heartbeat(self):
        self.__heartbeat_thread = threading.Thread(target=self.__heartbeat)
        self.__heartbeat_thread.start()

    def __heartbeat(self):
        heartbeat_msg = '[email protected]=mrkl/'
        heartbeat_msg_byte = self.__msg_handler.dy_encode(heartbeat_msg)
        while True:
            self.__client.send(heartbeat_msg_byte)
            for i in range(90):
                time.sleep(0.5)
                if  not self.__keep_HeartBeat:
                    return

    def __prepare(self):
        self.login()
        # 登入後發送入組消息
        self.join_group()
        self.__start_heartbeat()


    def __receive_msg(self, msg):
        '''
        處理收到的資訊
        :param msg:
        :return:
        '''
        chat_messages =self.__msg_handler.get_chat_messages(msg)
        for message in chat_messages:
            print(f"{message['nn']}:{message['txt']}")
        # 将位元組流轉化為字元串,忽略無法解碼的錯誤(即鬥魚協定中的頭部尾部)
        #print(message.decode(encoding='utf-8', errors='ignore'))

class DyDanmuWebSocketClient:
    def __init__(self,on_open,on_message,on_close):
        self.__url ='wss://danmuproxy.douyu.com:8506/'
        self.__websocket =  websocket.WebSocketApp(self.__url,
                                                   on_open=on_open,
                                                   on_message=on_message,
                                                   on_error=self.__on_error,
                                                   on_close=on_close)

    def start(self):
        self.__websocket.run_forever()

    def stop(self):
        self.__websocket.close()

    def send(self,msg):
        self.__websocket.send(msg)

    def __on_error(self,error):
        print(error)


roomid = "666743"
dy_barrage_crawler = DyDanmuCrawler(roomid)
dy_barrage_crawler.start()