天天看點

第三章 打造高性能的視訊彈幕系統

第三章 打造高性能的視訊彈幕系統

  • 場景分析:用戶端針對某一視訊建立了彈幕,發送給後端進行處理,後端需要對所有正在觀看該視訊的使用者推送該彈幕
  • 兩種實作方式:使用短連接配接進行通信或使用長連接配接進行通信
短連接配接實作方案:
  • 所有觀看視訊的用戶端不斷輪詢後端,若有新的彈幕則拉取後進行顯示
  • 缺點:輪詢的效率低,非常浪費資源(因為HTTP協定隻能由用戶端向服務端發起,故必須不停連接配接後端)
長連接配接實作方案:
  • 采用​

    ​WebSocket​

    ​ 進行前後端通信
  • 為什麼要用 WebSocket:HTTP 協定的通信隻能由用戶端發起,做不到伺服器主動向用戶端推送資訊。

WebSocket 協定

  • WebSocket簡介:WebSocket 協定是基于TCP的一種新的網絡協定。它實作了浏覽器與伺服器​

    ​全雙工​

    ​(Full-Duplex)通信。
  • 全雙工(Full-Duplex)通信:用戶端可以主動發送資訊給服務端,服務端也可以主動發送資訊給用戶端。
  • WebSocket協定優點:封包體積小、支援長連接配接。

彈幕系統架構設計

第三章 打造高性能的視訊彈幕系統

優化方向一(後端接收前端發來的彈幕,并将彈幕推送給前端展示)

  • 假設前端傳過來2萬條請求(彈幕),後端需要推送這2萬條請求到前端,那麼就相當于後端總共需要處理4萬條請求,後端将這4萬條請求分成10批次,每一批就是4000個請求,
  • 但是這10批次裡面的第一批我們首先進行處理,第2~10批我們先不進行處理,把它們先放到 MQ 裡面進行排隊,這個就是削峰;
  • 将第一批的4000條請求,好好利用伺服器的并行處理能力,給它進行并發處理,同一時間段内進行并發處理4000條請求的耗時可能也就幾百毫秒,
  • 這樣在 2~4 秒的時間段内,伺服器就能完成這4萬條請求的處理;
  • 在前端的使用者感覺來看,實際就是使用者發送了一條彈幕,2~4秒後就可以在頁面上看到自己所發送的彈幕了,體驗感較好。

優化方向二(後端接收彈幕後,将彈幕持久化到資料庫)

  • 後端接收前端傳過來的彈幕後,将彈幕通過​

    ​MQ​

    ​ 進行異步持久化到資料庫,并且采用 MQ 的目的是為了限流削峰,減輕資料庫的壓力;
  • 并且由于是異步操作,主線程是另外開了一條線程在進行持久化資料庫的操作,這樣子不會影響主線程的其他操作(例如同步儲存彈幕到 Redis 裡面)
  • 假設有 2 萬條彈幕同時過來資料庫,先将彈幕資料儲存到 MQ 裡面,這樣子 MQ 可以每秒處理 2000 個請求,這樣的速度儲存到資料庫中,不至于會使資料庫崩潰,能夠有效降低資料庫的壓力。

優化方向三(将彈幕資料寫到redis,再次查詢可以快速讀取)

  • 在将彈幕資料儲存到資料庫中時,也要将彈幕資料同步儲存到 redis(緩存)中,
  • 為了我們在下一次加載到視訊詳情頁的時候,能夠把我們目前或者當天的彈幕資料給快速查詢出來;
  • 如果某個視訊在今天儲存了很大的彈幕資料量,如果每次都從資料庫中進行查詢的話,一是速度慢,二是可能會對資料庫造成讀取壓力(如果有多個視訊進行查詢,其他視訊可能需要排隊查詢);
  • 如果将今天生成的彈幕資料都儲存到 redis 中,在下次進行頁面重新整理的時候,會調用一個彈幕資料查詢的操作,就可以直接從 redis 裡面進行讀取,這樣的速度是非常快的,因為它是從記憶體裡面查詢資料。
  • redis 單機最大處理量可以達到 10 ~ 50 萬左右。

SpringBoot 整合 WebSocket

導入依賴

<!-- WebSocket依賴 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>      

工具類

@Configuration
public class WebSocketConfig {

    /**
     * 用來發現WebSocket服務的
     *
     * @return
     */
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }

}      

業務類

@Component
@ServerEndpoint("/imserver")
public class WebSocketService {

    private final Logger logger = LoggerFactory.getLogger(WebSocketService.class);

    /**
     * 目前長連接配接的數量(線上人數的統計)
     * 也就是目前有多少用戶端通過WebSocket連接配接到服務端
     */
    private static final AtomicInteger ONLINE_COUNT = new AtomicInteger(0);

    /**
     * 一個用戶端 關聯 一個WebSocketService
     * 是一個多例模式的,這裡需要注意下
     */
    private static final ConcurrentHashMap<String, WebSocketService> WEBSOCKET_MAP = new ConcurrentHashMap<>();

    /**
     * 服務端 和 用戶端 進行通信的一個會話
     * 當我們有一個用戶端進來了,然後保持連接配接成功了,那麼我們就會儲存一個跟這個用戶端關聯的session
     */
    private Session session;

    /**
     * 唯一辨別
     */
    private String sessionId;

    /**
     * 打開連接配接
     *
     * @param session
     * @OnOpen 連接配接成功後會自動調用該方法
     */
    @OnOpen
    public void openConnection(Session session) {
        // 儲存session相關資訊到本地
        this.sessionId = session.getId();
        this.session = session;

        // 判斷WEBSOCKET_MAP是否含有sessionId,有的話先删除再重新添加
        if (WEBSOCKET_MAP.containsKey(sessionId)) {
            WEBSOCKET_MAP.remove(sessionId);
            WEBSOCKET_MAP.put(sessionId, this);
        } else { // 沒有的話就直接新增
            WEBSOCKET_MAP.put(sessionId, this);
            // 線上人數加一
            ONLINE_COUNT.getAndIncrement();
        }
        logger.info("使用者連接配接成功:" + sessionId + ",目前線上人數為:" + ONLINE_COUNT.get());

        // 連接配接成功之後需要通知用戶端,友善用戶端進行後續操作
        try {
            this.sendMessage("0");
        } catch (Exception e) {
            logger.error("連接配接異常!");
        }

    }

    /**
     * 用戶端重新整理頁面,或者關閉頁面,服務端斷開連接配接等等操作,都需要關閉連接配接
     */
    @OnClose
    public void closeConnection() {
        if (WEBSOCKET_MAP.containsKey(sessionId)) {
            WEBSOCKET_MAP.remove(sessionId);
            // 線上人數減一
            ONLINE_COUNT.getAndDecrement();
            logger.info("使用者退出:" + sessionId + ",目前線上人數為:" + ONLINE_COUNT.get());
        }
    }

    /**
     * 用戶端發送消息給後端
     *
     * @param message
     */
    @OnMessage
    public void onMessage(String message) {

    }

    /**
     * 發生錯誤之後的處理
     *
     * @param error
     */
    @OnError
    public void onError(Throwable error) {

    }

    /**
     * 後端發送消息給用戶端
     *
     * @param message
     * @throws IOException
     */
    private void sendMessage(String message) throws IOException {
        this.session.getBasicRemote().sendText(message);
    }

}      

多例模式下引發的Bean注入為null的問題

在啟動類中将ApplicationContext傳給WebSocketService中的*​

​APPLICATION_CONTEXT​

​*
@SpringBootApplication
@EnableTransactionManagement
public class ImoocBilibiliApplication {

    public static void main(String[] args) {
        ApplicationContext app = SpringApplication.run(ImoocBilibiliApplication.class, args);
        WebSocketService.setApplicationContext(app);
    }

}      
WebSocketService
  • ​@Autowired​

    ​​ 在多例模式下是不會自動進行加載的,是以這裡我們不能使用​

    ​@Autowired​

    ​進行注入;
  • 而我們的啟動類生成的ApplicationContext,是可以通過getBean( )方法擷取到Spring容器中所有Bean的;
/**
     * 全局的上下文變量
     */
    private static ApplicationContext APPLICATION_CONTEXT;

    /**
     * 通用的上下文環境變量的方法,每個WebSocketService都會共用同一個ApplicationContext
     *
     * @param applicationContext
     */
    public static void setApplicationContext(ApplicationContext applicationContext) {
        WebSocketService.APPLICATION_CONTEXT = applicationContext;
    }      

彈幕系統實作

資料庫表設計及相關實體類設計

彈幕記錄表
第三章 打造高性能的視訊彈幕系統

業務層

WebSocketService.java
@Component
@ServerEndpoint("/imserver/{token}")
public class WebSocketService {

    private final Logger logger = LoggerFactory.getLogger(WebSocketService.class);

    /**
     * 目前長連接配接的數量(線上人數的統計)
     * 也就是目前有多少用戶端通過WebSocket連接配接到服務端
     */
    private static final AtomicInteger ONLINE_COUNT = new AtomicInteger(0);

    /**
     * 一個用戶端 關聯 一個WebSocketService
     * 是一個多例模式的,這裡需要注意下
     */
    private static final ConcurrentHashMap<String, WebSocketService> WEBSOCKET_MAP = new ConcurrentHashMap<>();

    /**
     * 服務端 和 用戶端 進行通信的一個會話
     * 當我們有一個用戶端進來了,然後保持連接配接成功了,那麼我們就會儲存一個跟這個用戶端關聯的session
     */
    private Session session;

    /**
     * 唯一辨別
     */
    private String sessionId;

    private Long userId;

    /**
     * 全局的上下文變量
     */
    private static ApplicationContext APPLICATION_CONTEXT;

    /**
     * 打開連接配接
     *
     * @param session
     * @param token
     * @OnOpen 連接配接成功後會自動調用該方法
     * @PathParam("token") 擷取 @ServerEndpoint("/imserver/{token}") 後面的參數
     */
    @OnOpen
    public void openConnection(Session session, @PathParam("token") String token) {
        // 如果是遊客觀看視訊,雖然有彈幕,但是沒有使用者資訊,是以需要用try
        try {
            this.userId = TokenUtil.verifyToken(token);
        } catch (Exception ignored) {
        }
        // 儲存session相關資訊到本地
        this.sessionId = session.getId();
        this.session = session;

        // 判斷WEBSOCKET_MAP是否含有sessionId,有的話先删除再重新添加
        if (WEBSOCKET_MAP.containsKey(sessionId)) {
            WEBSOCKET_MAP.remove(sessionId);
            WEBSOCKET_MAP.put(sessionId, this);
        } else { // 沒有的話就直接新增
            WEBSOCKET_MAP.put(sessionId, this);
            // 線上人數加一
            ONLINE_COUNT.getAndIncrement();
        }
        logger.info("使用者連接配接成功:" + sessionId + ",目前線上人數為:" + ONLINE_COUNT.get());

        // 連接配接成功之後需要通知用戶端,友善用戶端進行後續操作
        try {
            this.sendMessage("0");
        } catch (Exception e) {
            logger.error("連接配接異常!");
        }
    }

    /**
     * 用戶端發送消息給服務端
     *
     * @param message
     */
    @OnMessage
    public void onMessage(String message) {
        logger.info("使用者資訊:" + sessionId + ",封包:" + message);
        if (!StringUtils.isNullOrEmpty(message)) {
            try {
                // 群發消息(服務端拿到某一個用戶端發來的消息,然後群發到所有與它連接配接的用戶端)
                for (Map.Entry<String, WebSocketService> entry : WEBSOCKET_MAP.entrySet()) {
                    // 擷取每一個和服務端連接配接的用戶端
                    WebSocketService webSocketService = entry.getValue();
                    // 判斷會話是否還處于打開狀态
                    if (webSocketService.session.isOpen()) {
                        webSocketService.sendMessage(message);
                    }
                }
                if (this.userId != null) {
                    // --------- 儲存彈幕到資料庫 ----------
                    // 将message轉換成Danmu實體類的資料
                    Danmu danmu = JSONObject.parseObject(message, Danmu.class);
                    danmu.setUserId(userId);
                    danmu.setCreateTime(new Date());
                    DanmuService danmuService = (DanmuService) APPLICATION_CONTEXT.getBean("danmuService");
                    danmuService.addDanmu(danmu);

                    // ----------- 儲存彈幕到redis -----------
                    danmuService.addDanmusToRedis(danmu);
                }
            } catch (Exception e) {
                logger.error("彈幕接收出現問題!");
                e.printStackTrace();
            }
        }
    }

}      
DanmuService.java
@Service
public class DanmuService {

    private static final String DANMU_KEY = "dm-video-";

    @Autowired
    private DanmuDao danmuDao;

    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    /**
     * 添加彈幕
     *
     * @param danmu
     */
    public void addDanmu(Danmu danmu) {
        danmuDao.addDanmu(danmu);
    }

    /**
     * 查詢彈幕
     *
     * @param danmu
     */
    @Async
    public void asyncAddDanmu(Danmu danmu) {
        danmuDao.addDanmu(danmu);
    }

    /**
     * 添加彈幕到redis
     * 下次加載頁面時,可以快速從緩存中擷取彈幕
     *
     * @param danmu
     */
    public void addDanmusToRedis(Danmu danmu) {
        String key = DANMU_KEY + danmu.getVideoId();
        String value = redisTemplate.opsForValue().get(key);
        List<Danmu> list = new ArrayList<>();
        if (!StringUtil.isNullOrEmpty(value)) {
            // 将從redis中查詢到的資料轉換成list集合
            list = JSONArray.parseArray(value, Danmu.class);
        }
        // 将新的彈幕添加到list中
        list.add(danmu);
        redisTemplate.opsForValue().set(key, JSONObject.toJSONString(list));
    }

}      

推送彈幕性能優化

WebSocketService.java
/**
     * 用戶端發送消息給服務端
     *
     * @param message
     */
    @OnMessage
    public void onMessage(String message) {
        logger.info("使用者資訊:" + sessionId + ",封包:" + message);
        if (!StringUtils.isNullOrEmpty(message)) {
            try {
                // 群發消息(服務端拿到某一個用戶端發來的消息,然後群發到所有與它連接配接的用戶端)
                for (Map.Entry<String, WebSocketService> entry : WEBSOCKET_MAP.entrySet()) {
                    // 擷取每一個和服務端連接配接的用戶端
                    WebSocketService webSocketService = entry.getValue();

                    // 擷取到彈幕生産者
                    DefaultMQProducer danmusProducer = (DefaultMQProducer) APPLICATION_CONTEXT.getBean("danmusProducer");
                    JSONObject jsonObject = new JSONObject();
                    jsonObject.put("message", message);
                    jsonObject.put("sessionId", webSocketService.getSessionId());
                    Message msg = new Message(UserMomentsConstant.TOPIC_DANMUS, jsonObject.toJSONString().getBytes(StandardCharsets.UTF_8));
                    // 異步發送消息
                    RocketMQUtil.asyncSendMsg(danmusProducer, msg);
                }      
RocketMQConfig.java
@Configuration
public class RocketMQConfig {

    @Value("${rocketmq.name.server.address}")
    private String nameServerAddr;

    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    @Autowired
    private UserFollowingService userFollowingService;

    /**
     * 彈幕生産者
     *
     * @return
     * @throws Exception
     */
    @Bean("danmusProducer")
    public DefaultMQProducer danmusProducer() throws Exception {
        // 執行個體化消息生産者Producer
        DefaultMQProducer producer = new DefaultMQProducer(UserMomentsConstant.GROUP_MOMENTS);
        // 設定NameServer的位址
        producer.setNamesrvAddr(nameServerAddr);
        // 啟動Producer執行個體
        producer.start();
        return producer;
    }

    /**
     * 彈幕消費者
     *
     * @return
     * @throws Exception
     */
    @Bean("danmusConsumer")
    public DefaultMQPushConsumer danmusConsumer() throws Exception {
        // 執行個體化消費者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(UserMomentsConstant.GROUP_DANMUS);
        // 設定NameServer的位址
        consumer.setNamesrvAddr(nameServerAddr);
        // 訂閱一個或者多個Topic,以及Tag來過濾需要消費的消息
        consumer.subscribe(UserMomentsConstant.TOPIC_DANMUS, "*");
        // 注冊回調實作類來處理從broker拉取回來的消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                MessageExt msg = msgs.get(0);
                if (msg == null) {
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
                String bodyStr = new String(msg.getBody());
                JSONObject jsonObject = JSONObject.parseObject(bodyStr);
                String sessionId = jsonObject.getString("sessionId");
                String message = jsonObject.getString("message");
                // 根據sessionId擷取對應的webSocketService
                WebSocketService webSocketService = WebSocketService.WEBSOCKET_MAP.get(sessionId);

                // 判斷會話是否還處于打開狀态
                if (webSocketService.getSession().isOpen()) {
                    try {
                        // 伺服器發送消息給用戶端
                        webSocketService.sendMessage(message);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }

                // 标記該消息已經被成功消費
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 啟動消費者執行個體
        consumer.start();
        return consumer;
    }
}      

彈幕消息異步存儲優化

優化一

使用SpringBoot的 ​

​@Async​

​ 注解進行異步儲存彈幕
  • DanmuService.java
/**
     * 異步儲存彈幕
     *
     * @param danmu
     * @Async 辨別該方法調用的時候是使用異步的方式
     */
    @Async
    public void asyncAddDanmu(Danmu danmu) {
        danmuDao.addDanmu(danmu);
    }      
  • WebSocketService.java
第三章 打造高性能的視訊彈幕系統

優化二

使用 MQ 進行削峰操作
  • RocketMQConfig.java
/**
     * 異步儲存彈幕生産者
     *
     * @return
     * @throws Exception
     */
    @Bean("asyncadddanmusProducer")
    public DefaultMQProducer asyncAddDanmusProducer() throws Exception {
        // 執行個體化消息生産者Producer
        DefaultMQProducer producer = new DefaultMQProducer(UserMomentsConstant.GROUP_ASYNCADDDANMUS);
        // 設定NameServer的位址
        producer.setNamesrvAddr(nameServerAddr);
        // 啟動Producer執行個體
        producer.start();
        return producer;
    }

    /**
     * 異步儲存彈幕消費者
     *
     * @return
     * @throws Exception
     */
    @Bean("asyncadddanmusConsumer")
    public DefaultMQPushConsumer asyncAddDanmusConsumer() throws Exception {
        // 執行個體化消費者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(UserMomentsConstant.GROUP_ASYNCADDDANMUS);
        // 設定NameServer的位址
        consumer.setNamesrvAddr(nameServerAddr);
        // 訂閱一個或者多個Topic,以及Tag來過濾需要消費的消息
        consumer.subscribe(UserMomentsConstant.TOPIC_ASYNCADDDANMUS, "*");
        // 注冊回調實作類來處理從broker拉取回來的消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                MessageExt msg = msgs.get(0);
                if (msg == null) {
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
                String bodyStr = new String(msg.getBody());
                // 将接收到消息轉換成DanMu實體類
                Danmu danmu = JSONObject.toJavaObject(JSONObject.parseObject(bodyStr), Danmu.class);
                // 異步儲存彈幕
                danmuService.asyncAddDanmu(danmu);

                // 标記該消息已經被成功消費
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 啟動消費者執行個體
        consumer.start();
        return consumer;
    }      
  • WebSocketService.java
第三章 打造高性能的視訊彈幕系統

線上人數統計

/**
     * 定時任務,每5秒群發一次消息到與伺服器相連的所有用戶端
     *
     * @throws IOException
     * @Scheduled(fixedRate = 5000) 辨別該方法是一個定時任務,并且每隔5秒執行該方法
     */
    @Scheduled(fixedRate = 5000)
    private void noticeOnlineCount() throws IOException {
        for (Map.Entry<String, WebSocketService> entry : WebSocketService.WEBSOCKET_MAP.entrySet()) {
            WebSocketService webSocketService = entry.getValue();
            if (webSocketService.session.isOpen()) {
                JSONObject jsonObject = new JSONObject();
                jsonObject.put("onlineCount", ONLINE_COUNT.get());
                jsonObject.put("msg", "目前線上人數為" + ONLINE_COUNT.get());
                // 服務端發送消息給用戶端
                webSocketService.sendMessage(jsonObject.toJSONString());
            }
        }
    }      

彈幕查詢功能實作

@RestController
public class DanmuApi {

    @Autowired
    private DanmuService danmuService;

    @Autowired
    private UserSupport userSupport;

    /**
     * 查詢彈幕
     * 在遊客模式下,是沒有辦法進行彈幕時間段篩選的
     * 使用者進行登入之後,就可以指定時間段進行彈幕查詢
     *
     * @param videoId   視訊id
     * @param startTime 開始時間
     * @param endTime   結束時間
     * @return
     * @throws Exception
     */
    @GetMapping("/danmus")
    public JsonResponse<List<Danmu>> getDanmus(@RequestParam Long videoId, String startTime, String endTime) throws Exception {
        List<Danmu> list;
        try {
            // 判斷目前是遊客模式還是使用者登入模式
            userSupport.getCurrentUserId();
            // 若是使用者登入模式,則允許使用者進行時間段篩選
            list = danmuService.getDanmus(videoId, startTime, endTime);
        } catch (Exception ignored) {
            // 若為遊客模式,則不允許使用者進行時間段篩選
            list = danmuService.getDanmus(videoId, null, null);
        }
        return new JsonResponse<>(list);
    }

}      

繼續閱讀