第三章 打造高性能的視訊彈幕系統
- 場景分析:用戶端針對某一視訊建立了彈幕,發送給後端進行處理,後端需要對所有正在觀看該視訊的使用者推送該彈幕
- 兩種實作方式:使用短連接配接進行通信或使用長連接配接進行通信
短連接配接實作方案:
- 所有觀看視訊的用戶端不斷輪詢後端,若有新的彈幕則拉取後進行顯示
- 缺點:輪詢的效率低,非常浪費資源(因為HTTP協定隻能由用戶端向服務端發起,故必須不停連接配接後端)
長連接配接實作方案:
- 采用
進行前後端通信WebSocket
- 為什麼要用 WebSocket:HTTP 協定的通信隻能由用戶端發起,做不到伺服器主動向用戶端推送資訊。
WebSocket 協定
- WebSocket簡介:WebSocket 協定是基于TCP的一種新的網絡協定。它實作了浏覽器與伺服器
(Full-Duplex)通信。全雙工
- 全雙工(Full-Duplex)通信:用戶端可以主動發送資訊給服務端,服務端也可以主動發送資訊給用戶端。
- WebSocket協定優點:封包體積小、支援長連接配接。
彈幕系統架構設計

優化方向一(後端接收前端發來的彈幕,并将彈幕推送給前端展示)
- 假設前端傳過來2萬條請求(彈幕),後端需要推送這2萬條請求到前端,那麼就相當于後端總共需要處理4萬條請求,後端将這4萬條請求分成10批次,每一批就是4000個請求,
- 但是這10批次裡面的第一批我們首先進行處理,第2~10批我們先不進行處理,把它們先放到 MQ 裡面進行排隊,這個就是削峰;
- 将第一批的4000條請求,好好利用伺服器的并行處理能力,給它進行并發處理,同一時間段内進行并發處理4000條請求的耗時可能也就幾百毫秒,
- 這樣在 2~4 秒的時間段内,伺服器就能完成這4萬條請求的處理;
- 在前端的使用者感覺來看,實際就是使用者發送了一條彈幕,2~4秒後就可以在頁面上看到自己所發送的彈幕了,體驗感較好。
優化方向二(後端接收彈幕後,将彈幕持久化到資料庫)
- 後端接收前端傳過來的彈幕後,将彈幕通過
進行異步持久化到資料庫,并且采用 MQ 的目的是為了限流削峰,減輕資料庫的壓力;MQ
- 并且由于是異步操作,主線程是另外開了一條線程在進行持久化資料庫的操作,這樣子不會影響主線程的其他操作(例如同步儲存彈幕到 Redis 裡面)
- 假設有 2 萬條彈幕同時過來資料庫,先将彈幕資料儲存到 MQ 裡面,這樣子 MQ 可以每秒處理 2000 個請求,這樣的速度儲存到資料庫中,不至于會使資料庫崩潰,能夠有效降低資料庫的壓力。
優化方向三(将彈幕資料寫到redis,再次查詢可以快速讀取)
- 在将彈幕資料儲存到資料庫中時,也要将彈幕資料同步儲存到 redis(緩存)中,
- 為了我們在下一次加載到視訊詳情頁的時候,能夠把我們目前或者當天的彈幕資料給快速查詢出來;
- 如果某個視訊在今天儲存了很大的彈幕資料量,如果每次都從資料庫中進行查詢的話,一是速度慢,二是可能會對資料庫造成讀取壓力(如果有多個視訊進行查詢,其他視訊可能需要排隊查詢);
- 如果将今天生成的彈幕資料都儲存到 redis 中,在下次進行頁面重新整理的時候,會調用一個彈幕資料查詢的操作,就可以直接從 redis 裡面進行讀取,這樣的速度是非常快的,因為它是從記憶體裡面查詢資料。
- redis 單機最大處理量可以達到 10 ~ 50 萬左右。
SpringBoot 整合 WebSocket
導入依賴
<!-- WebSocket依賴 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
工具類
@Configuration
public class WebSocketConfig {
/**
* 用來發現WebSocket服務的
*
* @return
*/
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
業務類
@Component
@ServerEndpoint("/imserver")
public class WebSocketService {
private final Logger logger = LoggerFactory.getLogger(WebSocketService.class);
/**
* 目前長連接配接的數量(線上人數的統計)
* 也就是目前有多少用戶端通過WebSocket連接配接到服務端
*/
private static final AtomicInteger ONLINE_COUNT = new AtomicInteger(0);
/**
* 一個用戶端 關聯 一個WebSocketService
* 是一個多例模式的,這裡需要注意下
*/
private static final ConcurrentHashMap<String, WebSocketService> WEBSOCKET_MAP = new ConcurrentHashMap<>();
/**
* 服務端 和 用戶端 進行通信的一個會話
* 當我們有一個用戶端進來了,然後保持連接配接成功了,那麼我們就會儲存一個跟這個用戶端關聯的session
*/
private Session session;
/**
* 唯一辨別
*/
private String sessionId;
/**
* 打開連接配接
*
* @param session
* @OnOpen 連接配接成功後會自動調用該方法
*/
@OnOpen
public void openConnection(Session session) {
// 儲存session相關資訊到本地
this.sessionId = session.getId();
this.session = session;
// 判斷WEBSOCKET_MAP是否含有sessionId,有的話先删除再重新添加
if (WEBSOCKET_MAP.containsKey(sessionId)) {
WEBSOCKET_MAP.remove(sessionId);
WEBSOCKET_MAP.put(sessionId, this);
} else { // 沒有的話就直接新增
WEBSOCKET_MAP.put(sessionId, this);
// 線上人數加一
ONLINE_COUNT.getAndIncrement();
}
logger.info("使用者連接配接成功:" + sessionId + ",目前線上人數為:" + ONLINE_COUNT.get());
// 連接配接成功之後需要通知用戶端,友善用戶端進行後續操作
try {
this.sendMessage("0");
} catch (Exception e) {
logger.error("連接配接異常!");
}
}
/**
* 用戶端重新整理頁面,或者關閉頁面,服務端斷開連接配接等等操作,都需要關閉連接配接
*/
@OnClose
public void closeConnection() {
if (WEBSOCKET_MAP.containsKey(sessionId)) {
WEBSOCKET_MAP.remove(sessionId);
// 線上人數減一
ONLINE_COUNT.getAndDecrement();
logger.info("使用者退出:" + sessionId + ",目前線上人數為:" + ONLINE_COUNT.get());
}
}
/**
* 用戶端發送消息給後端
*
* @param message
*/
@OnMessage
public void onMessage(String message) {
}
/**
* 發生錯誤之後的處理
*
* @param error
*/
@OnError
public void onError(Throwable error) {
}
/**
* 後端發送消息給用戶端
*
* @param message
* @throws IOException
*/
private void sendMessage(String message) throws IOException {
this.session.getBasicRemote().sendText(message);
}
}
多例模式下引發的Bean注入為null的問題
在啟動類中将ApplicationContext傳給WebSocketService中的* APPLICATION_CONTEXT
*
@SpringBootApplication
@EnableTransactionManagement
public class ImoocBilibiliApplication {
public static void main(String[] args) {
ApplicationContext app = SpringApplication.run(ImoocBilibiliApplication.class, args);
WebSocketService.setApplicationContext(app);
}
}
WebSocketService
-
在多例模式下是不會自動進行加載的,是以這裡我們不能使用@Autowired
進行注入;@Autowired
- 而我們的啟動類生成的ApplicationContext,是可以通過getBean( )方法擷取到Spring容器中所有Bean的;
/**
* 全局的上下文變量
*/
private static ApplicationContext APPLICATION_CONTEXT;
/**
* 通用的上下文環境變量的方法,每個WebSocketService都會共用同一個ApplicationContext
*
* @param applicationContext
*/
public static void setApplicationContext(ApplicationContext applicationContext) {
WebSocketService.APPLICATION_CONTEXT = applicationContext;
}
彈幕系統實作
資料庫表設計及相關實體類設計
彈幕記錄表
業務層
WebSocketService.java
@Component
@ServerEndpoint("/imserver/{token}")
public class WebSocketService {
private final Logger logger = LoggerFactory.getLogger(WebSocketService.class);
/**
* 目前長連接配接的數量(線上人數的統計)
* 也就是目前有多少用戶端通過WebSocket連接配接到服務端
*/
private static final AtomicInteger ONLINE_COUNT = new AtomicInteger(0);
/**
* 一個用戶端 關聯 一個WebSocketService
* 是一個多例模式的,這裡需要注意下
*/
private static final ConcurrentHashMap<String, WebSocketService> WEBSOCKET_MAP = new ConcurrentHashMap<>();
/**
* 服務端 和 用戶端 進行通信的一個會話
* 當我們有一個用戶端進來了,然後保持連接配接成功了,那麼我們就會儲存一個跟這個用戶端關聯的session
*/
private Session session;
/**
* 唯一辨別
*/
private String sessionId;
private Long userId;
/**
* 全局的上下文變量
*/
private static ApplicationContext APPLICATION_CONTEXT;
/**
* 打開連接配接
*
* @param session
* @param token
* @OnOpen 連接配接成功後會自動調用該方法
* @PathParam("token") 擷取 @ServerEndpoint("/imserver/{token}") 後面的參數
*/
@OnOpen
public void openConnection(Session session, @PathParam("token") String token) {
// 如果是遊客觀看視訊,雖然有彈幕,但是沒有使用者資訊,是以需要用try
try {
this.userId = TokenUtil.verifyToken(token);
} catch (Exception ignored) {
}
// 儲存session相關資訊到本地
this.sessionId = session.getId();
this.session = session;
// 判斷WEBSOCKET_MAP是否含有sessionId,有的話先删除再重新添加
if (WEBSOCKET_MAP.containsKey(sessionId)) {
WEBSOCKET_MAP.remove(sessionId);
WEBSOCKET_MAP.put(sessionId, this);
} else { // 沒有的話就直接新增
WEBSOCKET_MAP.put(sessionId, this);
// 線上人數加一
ONLINE_COUNT.getAndIncrement();
}
logger.info("使用者連接配接成功:" + sessionId + ",目前線上人數為:" + ONLINE_COUNT.get());
// 連接配接成功之後需要通知用戶端,友善用戶端進行後續操作
try {
this.sendMessage("0");
} catch (Exception e) {
logger.error("連接配接異常!");
}
}
/**
* 用戶端發送消息給服務端
*
* @param message
*/
@OnMessage
public void onMessage(String message) {
logger.info("使用者資訊:" + sessionId + ",封包:" + message);
if (!StringUtils.isNullOrEmpty(message)) {
try {
// 群發消息(服務端拿到某一個用戶端發來的消息,然後群發到所有與它連接配接的用戶端)
for (Map.Entry<String, WebSocketService> entry : WEBSOCKET_MAP.entrySet()) {
// 擷取每一個和服務端連接配接的用戶端
WebSocketService webSocketService = entry.getValue();
// 判斷會話是否還處于打開狀态
if (webSocketService.session.isOpen()) {
webSocketService.sendMessage(message);
}
}
if (this.userId != null) {
// --------- 儲存彈幕到資料庫 ----------
// 将message轉換成Danmu實體類的資料
Danmu danmu = JSONObject.parseObject(message, Danmu.class);
danmu.setUserId(userId);
danmu.setCreateTime(new Date());
DanmuService danmuService = (DanmuService) APPLICATION_CONTEXT.getBean("danmuService");
danmuService.addDanmu(danmu);
// ----------- 儲存彈幕到redis -----------
danmuService.addDanmusToRedis(danmu);
}
} catch (Exception e) {
logger.error("彈幕接收出現問題!");
e.printStackTrace();
}
}
}
}
DanmuService.java
@Service
public class DanmuService {
private static final String DANMU_KEY = "dm-video-";
@Autowired
private DanmuDao danmuDao;
@Autowired
private RedisTemplate<String, String> redisTemplate;
/**
* 添加彈幕
*
* @param danmu
*/
public void addDanmu(Danmu danmu) {
danmuDao.addDanmu(danmu);
}
/**
* 查詢彈幕
*
* @param danmu
*/
@Async
public void asyncAddDanmu(Danmu danmu) {
danmuDao.addDanmu(danmu);
}
/**
* 添加彈幕到redis
* 下次加載頁面時,可以快速從緩存中擷取彈幕
*
* @param danmu
*/
public void addDanmusToRedis(Danmu danmu) {
String key = DANMU_KEY + danmu.getVideoId();
String value = redisTemplate.opsForValue().get(key);
List<Danmu> list = new ArrayList<>();
if (!StringUtil.isNullOrEmpty(value)) {
// 将從redis中查詢到的資料轉換成list集合
list = JSONArray.parseArray(value, Danmu.class);
}
// 将新的彈幕添加到list中
list.add(danmu);
redisTemplate.opsForValue().set(key, JSONObject.toJSONString(list));
}
}
推送彈幕性能優化
WebSocketService.java
/**
* 用戶端發送消息給服務端
*
* @param message
*/
@OnMessage
public void onMessage(String message) {
logger.info("使用者資訊:" + sessionId + ",封包:" + message);
if (!StringUtils.isNullOrEmpty(message)) {
try {
// 群發消息(服務端拿到某一個用戶端發來的消息,然後群發到所有與它連接配接的用戶端)
for (Map.Entry<String, WebSocketService> entry : WEBSOCKET_MAP.entrySet()) {
// 擷取每一個和服務端連接配接的用戶端
WebSocketService webSocketService = entry.getValue();
// 擷取到彈幕生産者
DefaultMQProducer danmusProducer = (DefaultMQProducer) APPLICATION_CONTEXT.getBean("danmusProducer");
JSONObject jsonObject = new JSONObject();
jsonObject.put("message", message);
jsonObject.put("sessionId", webSocketService.getSessionId());
Message msg = new Message(UserMomentsConstant.TOPIC_DANMUS, jsonObject.toJSONString().getBytes(StandardCharsets.UTF_8));
// 異步發送消息
RocketMQUtil.asyncSendMsg(danmusProducer, msg);
}
RocketMQConfig.java
@Configuration
public class RocketMQConfig {
@Value("${rocketmq.name.server.address}")
private String nameServerAddr;
@Autowired
private RedisTemplate<String, String> redisTemplate;
@Autowired
private UserFollowingService userFollowingService;
/**
* 彈幕生産者
*
* @return
* @throws Exception
*/
@Bean("danmusProducer")
public DefaultMQProducer danmusProducer() throws Exception {
// 執行個體化消息生産者Producer
DefaultMQProducer producer = new DefaultMQProducer(UserMomentsConstant.GROUP_MOMENTS);
// 設定NameServer的位址
producer.setNamesrvAddr(nameServerAddr);
// 啟動Producer執行個體
producer.start();
return producer;
}
/**
* 彈幕消費者
*
* @return
* @throws Exception
*/
@Bean("danmusConsumer")
public DefaultMQPushConsumer danmusConsumer() throws Exception {
// 執行個體化消費者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(UserMomentsConstant.GROUP_DANMUS);
// 設定NameServer的位址
consumer.setNamesrvAddr(nameServerAddr);
// 訂閱一個或者多個Topic,以及Tag來過濾需要消費的消息
consumer.subscribe(UserMomentsConstant.TOPIC_DANMUS, "*");
// 注冊回調實作類來處理從broker拉取回來的消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
MessageExt msg = msgs.get(0);
if (msg == null) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
String bodyStr = new String(msg.getBody());
JSONObject jsonObject = JSONObject.parseObject(bodyStr);
String sessionId = jsonObject.getString("sessionId");
String message = jsonObject.getString("message");
// 根據sessionId擷取對應的webSocketService
WebSocketService webSocketService = WebSocketService.WEBSOCKET_MAP.get(sessionId);
// 判斷會話是否還處于打開狀态
if (webSocketService.getSession().isOpen()) {
try {
// 伺服器發送消息給用戶端
webSocketService.sendMessage(message);
} catch (Exception e) {
e.printStackTrace();
}
}
// 标記該消息已經被成功消費
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 啟動消費者執行個體
consumer.start();
return consumer;
}
}
彈幕消息異步存儲優化
優化一
使用SpringBoot的 @Async
注解進行異步儲存彈幕
- DanmuService.java
/**
* 異步儲存彈幕
*
* @param danmu
* @Async 辨別該方法調用的時候是使用異步的方式
*/
@Async
public void asyncAddDanmu(Danmu danmu) {
danmuDao.addDanmu(danmu);
}
- WebSocketService.java
優化二
使用 MQ 進行削峰操作
- RocketMQConfig.java
/**
* 異步儲存彈幕生産者
*
* @return
* @throws Exception
*/
@Bean("asyncadddanmusProducer")
public DefaultMQProducer asyncAddDanmusProducer() throws Exception {
// 執行個體化消息生産者Producer
DefaultMQProducer producer = new DefaultMQProducer(UserMomentsConstant.GROUP_ASYNCADDDANMUS);
// 設定NameServer的位址
producer.setNamesrvAddr(nameServerAddr);
// 啟動Producer執行個體
producer.start();
return producer;
}
/**
* 異步儲存彈幕消費者
*
* @return
* @throws Exception
*/
@Bean("asyncadddanmusConsumer")
public DefaultMQPushConsumer asyncAddDanmusConsumer() throws Exception {
// 執行個體化消費者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(UserMomentsConstant.GROUP_ASYNCADDDANMUS);
// 設定NameServer的位址
consumer.setNamesrvAddr(nameServerAddr);
// 訂閱一個或者多個Topic,以及Tag來過濾需要消費的消息
consumer.subscribe(UserMomentsConstant.TOPIC_ASYNCADDDANMUS, "*");
// 注冊回調實作類來處理從broker拉取回來的消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
MessageExt msg = msgs.get(0);
if (msg == null) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
String bodyStr = new String(msg.getBody());
// 将接收到消息轉換成DanMu實體類
Danmu danmu = JSONObject.toJavaObject(JSONObject.parseObject(bodyStr), Danmu.class);
// 異步儲存彈幕
danmuService.asyncAddDanmu(danmu);
// 标記該消息已經被成功消費
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 啟動消費者執行個體
consumer.start();
return consumer;
}
- WebSocketService.java
線上人數統計
/**
* 定時任務,每5秒群發一次消息到與伺服器相連的所有用戶端
*
* @throws IOException
* @Scheduled(fixedRate = 5000) 辨別該方法是一個定時任務,并且每隔5秒執行該方法
*/
@Scheduled(fixedRate = 5000)
private void noticeOnlineCount() throws IOException {
for (Map.Entry<String, WebSocketService> entry : WebSocketService.WEBSOCKET_MAP.entrySet()) {
WebSocketService webSocketService = entry.getValue();
if (webSocketService.session.isOpen()) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("onlineCount", ONLINE_COUNT.get());
jsonObject.put("msg", "目前線上人數為" + ONLINE_COUNT.get());
// 服務端發送消息給用戶端
webSocketService.sendMessage(jsonObject.toJSONString());
}
}
}
彈幕查詢功能實作
@RestController
public class DanmuApi {
@Autowired
private DanmuService danmuService;
@Autowired
private UserSupport userSupport;
/**
* 查詢彈幕
* 在遊客模式下,是沒有辦法進行彈幕時間段篩選的
* 使用者進行登入之後,就可以指定時間段進行彈幕查詢
*
* @param videoId 視訊id
* @param startTime 開始時間
* @param endTime 結束時間
* @return
* @throws Exception
*/
@GetMapping("/danmus")
public JsonResponse<List<Danmu>> getDanmus(@RequestParam Long videoId, String startTime, String endTime) throws Exception {
List<Danmu> list;
try {
// 判斷目前是遊客模式還是使用者登入模式
userSupport.getCurrentUserId();
// 若是使用者登入模式,則允許使用者進行時間段篩選
list = danmuService.getDanmus(videoId, startTime, endTime);
} catch (Exception ignored) {
// 若為遊客模式,則不允許使用者進行時間段篩選
list = danmuService.getDanmus(videoId, null, null);
}
return new JsonResponse<>(list);
}
}