天天看点

MSE视频直播-实时处理视频流视频数据采集后端接收处理页面实时播放数据流解析

网上很多关于MSE相关文章都有讲过有些注意点讲的并不清楚,比如接收的视频数据格式、flash兼容性问题、imie格式不对、无法做到实时接收数据、刷新后后续数据无法播放等等

现在整体讲一下

该工程通过浏览器页面调用录屏功能将视频数据传到后端,后端通过websocket发送数据到第三方页面进行实时播放

之前尝试过rtmp+ffmpeg+nginx进行推流直播,但是发现ffmpeg需要读取完整视频文件才行和需求不通

环境:chrome 92.0.4515.131

jdk:1.8

视频数据采集

record.html

<html>
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <meta http-equiv="X-UA-Compatible" content="ie=edge">
    <title>Screen Record</title>
</head>
<style>
video {
    border: 1px solid #999;
    width: 98%;
    max-width: 860px;
  }
</style>
<body>
    <p>This example shows you the contents of the selected part of your display.
    Click the Start Capture button to begin.</p>
    <p><button id="start">Start Capture</button>&nbsp;<button id="stop">Stop Capture</button></p>
    <video id="video" autoplay></video>
</body>
<script>
const videoElem = document.getElementById("video");
const startElem = document.getElementById("start");
const stopElem = document.getElementById("stop");

// Options for getDisplayMedia()
const displayMediaOptions = {
    video: {
    cursor: "always"
    ,frameRate: "8"
	,height:500
  },
  audio: false
};
// Set event listeners for the start and stop buttons
startElem.addEventListener("click", function(evt) {
  startCapture();
}, false);
stopElem.addEventListener("click", function(evt) {
  stopCapture();
}, false);

let recorder;
var stream;

async function startCapture() {
  try {
    stream = await navigator.mediaDevices.getDisplayMedia(displayMediaOptions);
	videoElem.srcObject  = stream;
    var options = {
		mimeType: 'video/webm;codecs=vp8'
	}
	recorder = new MediaRecorder(stream, options);
	
	recorder.ondataavailable = e => {
		put(e.data);
	};
	recorder.start(2000);
  } catch(err) {
    console.error("Error: " + err);
  }
}

function stopCapture(evt) {
  //recorder.stop();
  let tracks = videoElem.srcObject.getTracks();
  tracks.forEach(track => track.stop());
  videoElem.srcObject = null;
}

function put(d){
	var aa = [];
	aa.push(d);
	var formData = new FormData();
	formData.append("user", "aaa");
	// JavaScript file-like 对象
	let nd = new Blob(aa);
	formData.append("file", nd);
	var request = new XMLHttpRequest();
	request.open("POST", "/jsmp4");
	request.send(formData);
}
</script>
</html>

​
           

注意:该视频格式为webm格式,编码方式为vp8

后端接收处理

package com.r.controller;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;

import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.multipart.MultipartFile;

import com.r.common.vo.common.RespResult;
import com.r.controller.common.BaseController;
import com.r.util.FuncUtil;


import lombok.extern.slf4j.Slf4j;

@Slf4j
@RestController
public class Test {
	
	@RequestMapping("/jsmp4")
	@ResponseBody
	public RespResult uploadMp4File(MultipartFile file, String user) {
		if(FuncUtil.isNull(user)) {
			return new RespResult(RESULT_ERROR, "参数错误");
		}
		//保存临时文件
		int len = 0;
		try {
			len = file.getBytes().length;
		} catch (Exception e) {
		}
		log.info("file.name==" + user + ", length=" + len);
		File temp = new File("E:\\ftp\\tmp\\"+user +".mp4");
		FileOutputStream fos = null;
		try {
			fos = new FileOutputStream(temp,true);
			fos.write(file.getBytes());
			fos.flush();
			fos.close();
		} catch (FileNotFoundException e) {
			log.error("error:",e);
		} catch (IOException e) {
			log.error("error:",e);
		} finally {
			if(fos!=null) {
				try {
					fos.close();
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
		}
		try {
            //通过websocket推送数据
			RedisWebSocket.sendMassage(file.getBytes());
		} catch (Exception e) {
			e.printStackTrace();
		}
		return new RespResult(RESULT_SUCCESS, "成功");
	}
}
           

websocket推送数据

package com.r.controller;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;

import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;

import org.springframework.stereotype.Component;

import com.r.util.WebmUtils

import lombok.extern.slf4j.Slf4j;

@Slf4j
@Component
@ServerEndpoint("/ws/redis")
public class RedisWebSocket {
	
	//sessionid 为Key作为用户标识
    public static ConcurrentHashMap<String,Session> sessionMap = new ConcurrentHashMap<String,Session>();
    public static ConcurrentHashMap<String,String> sessionData = new ConcurrentHashMap<String,String>();
    
    /**
     * 新的WebSocket请求开启
     */
    @OnOpen
    public void onOpen(Session session) {
    	log.info("ws redis-------open, key=" + session.getId());
		sessionMap.put(session.getId(), session);
    }
    
    public static void sendMassage(String text) {
    	try {
	    	for (String key : sessionMap.keySet()) {
	    		//log.info(text);
	    		if(key == null) {
	    			sessionMap.remove(key);
	        		continue;
	        	}
	    		Session ses = sessionMap.get(key);
	        	if(ses == null || !ses.isOpen()) {
	        		sessionMap.remove(key);
	        		continue;
	        	}
	        	try {
	    	    	// 将实时日志通过WebSocket发送给客户端,给每一行添加一个HTML换行
	        		ses.getBasicRemote().sendText(text);
	            } catch (IOException e) {
	                log.error("websocket error:", e);
	            }
			}
    	} catch (Exception e) {
    		 log.error("websocket error:", e);
		}
    }
    
    
    
    public static void sendMassage(byte[] data) {
    	try {
	    	for (String key : sessionMap.keySet()) {
	    		//log.info(text);
	    		if(key == null) {
	    			sessionMap.remove(key);
	        		continue;
	        	}
	    		Session ses = sessionMap.get(key);
	        	if(ses == null || !ses.isOpen()) {
	        		sessionMap.remove(key);
	        		continue;
	        	}
	        	try {
	        		//有数据,加头
					if(sessionData.containsKey(key)) {
						byte[] new_data = headCheck(data);
						if(new_data == null) {
							System.out.println("add head is null");
							continue;
						}
						System.out.println("add head data:" + data.length + ", new_data:" + new_data.length);
						data = new_data;
						sessionData.remove(key);
					}
					ses.getBasicRemote().sendBinary(ByteBuffer.wrap(data));
	            } catch (Exception e) {
	                log.error("websocket error:", e);
	            }
			}
	    	
    	} catch (Exception e) {
    		 log.error("websocket error:", e);
		}
    }
    
	/**
	 * WebSocket请求关闭
	 */
	@OnClose
	public void onClose(Session session) {
		sessionMap.remove(session.getId());
		log.info("ws redis-------close");
	}
	
	@OnError
	public void onError(Session session, Throwable thr) {
		log.error("websocket error:", thr);
		log.info("ws redis-------error key="+(session!=null ? session.getId() : ""));

	}
	/**
	 * 获取消息响应
	 * @param session 可选参数
	 * @param message 接收到的消息
	 */
	@OnMessage
	public void onMessage(Session session, String message) {
		log.info("session id:"+ session.getId()+", onmessage=" + message + ", session.requestPara=" + session.getRequestParameterMap());
		if(sessionMap.containsKey(session.getId())) {
			sessionData.put(session.getId(), message);
		}
	}

}
           

后端接收页面发送的数据来判断是否添加视频头,使得页面可以重新播放后续数据。

里面用到的headCheck方法将在下面介绍

页面实时播放

stream.html

<html>
<meta charset="utf-8">
<head>
   <style type="text/css">
	   #video {
	  border: 1px solid #999;
	  width: 98%;
	  max-width: 860px;
	}
   </style>
</head>
<body>
	<video id="video" controls muted autoplay>
    </video>
<script>
var mimeCodec = 'video/webm;codecs=vp8';

var video = document.querySelector('#video');
if ('MediaSource' in window && MediaSource.isTypeSupported(mimeCodec)) {
  var mediaSource = new MediaSource;
  //console.log(mediaSource.readyState); // closed
  video.src = URL.createObjectURL(mediaSource);
  mediaSource.addEventListener('sourceopen', sourceOpen);
} else {
  console.error('Unsupported MIME type or codec: ', mimeCodec);
}

function sourceOpen (e) {
	URL.revokeObjectURL(video.src)
  //console.log(this.readyState); // open
  var mediaSource = e.target;
  
  var sourceBuffer = mediaSource.addSourceBuffer(mimeCodec);
	sourceBuffer.mode = 'sequence';
	sourceBuffer.addEventListener('updateend', function (_) {
      mediaSource.endOfStream();
      video.play();
      //console.log(mediaSource.readyState); // ended
    });
 var websocket = new WebSocket('ws://127.0.0.1:36886/ws');
  websocket.binaryType = 'arraybuffer';
  websocket.onopen = function (){
	  websocket.send("add head");
  }
  websocket.onmessage = function(message) {
	  var data = message.data;
   	  //var ct = video.currentTime ;
   		sourceBuffer.appendBuffer(data);
   	  //video.currentTime = ct;
  };
  websocket.onclose = function (event) {
  	console.log("close");
  }
};
</script>
</body>
</html>
           

注意:MediaSource接收的视频格式必须和mime声明的一样,支持fmp4(Fragmented mp4),普通mp4是不行的,录屏数据本身就是他们生成的也是支持的

我的数据是webm的编码方式为vp8,没有音频所以写成 "video/webm;codecs=vp8"

如果是fmp4的需要写成'video/mp4; codecs="mp4a.40.2,avc1.64001f"',其中avc1.64001f是音频编码,根据时间情况修改mime类型。可通过MediaSource.isTypeSupported(mimeCodec)检查浏览器是否支持该类型。

fmp4需要有flash插件支持

参考:H5直播系列二 MSE(Media Source Extensions)

数据流解析

webm遵循EBML规范(Extended Binary Markup Language)

参考:

【多媒体封装格式详解】---MKV【1】

【多媒体封装格式详解】---MKV【2】

【多媒体封装格式详解】---MKV【3】完

官方网站:https://www.matroska.org/technical/elements.html

数据格式是这样的:mkvtoolnix工具可以查看

视频开头包含前122个字节

MSE视频直播-实时处理视频流视频数据采集后端接收处理页面实时播放数据流解析

后续发送的数据都没有是簇 数据,并且数据开始的帧并不完整。工具无法正常打开,因为并不是完整视频文件

MSE视频直播-实时处理视频流视频数据采集后端接收处理页面实时播放数据流解析

 开头122个字节数据基本固定,所以我就直接吧它拼接到后续传过来的数据中写成文件,浏览器可以正常打开播放,但是用MSE时间上打不开,浏览器实际上有一定的容错性。

将后续不完整数据进行检查,逐个自己检查是否包含“簇”的ID(0x1F43B675),找到后在检查后续自己是否符合格式,不符合继续检查,符合加上开头122个字节,可以正常播放。

尝试过检查简单块的ID(0xA3)相当于一个帧,0xA3的字节后续数据也符合格式将该帧之前不完整的数据去掉,然后再前面加上固定的开头122个字节和补齐簇相关代码。发现视频还是无法播放,应该是和帧数不足以及簇时间戳不对有关,只能放弃这个想法。

com.r.util.WebmUtils

package com.r.util;

import com.r.common.KeyValuePair;
import com.r.util.AESUtil;

public class WebmUtils {

    private static String webm_vp8_head = "1A45DFA39F4286810142F7810142F2810442F381084282847765626D42878104428581021853806701FFFFFFFFFFFFFF1549"
			+ "A966992AD7B1830F42404D80864368726F6D655741864368726F6D651654AE6BABAEA9D7810173C5874F0AABE521FEC58381"
			+ "0155EE81018685565F565038E08CB0820600BA82036053C08101";  
	private static byte[] webm_vp8_head_byte = AESUtil.parseHexStr2Byte(webm_vp8_head);

    public static byte[] headCheck(byte[] data) {
		KeyValuePair<String, Integer> kv = getEbmlIndex(data);
		int h_len = webm_vp8_head_byte.length;
		int d_len = data.length - kv.getValue();
		byte[] new_data = null;
		if(kv.getKey().equals("0x1F43B675")) {
			new_data = new byte[h_len + d_len];
			System.arraycopy(webm_vp8_head_byte, 0, new_data, 0, h_len);
			System.arraycopy(data, kv.getValue(), new_data, h_len, d_len);
		} else if(kv.getKey().equals("0xA3")) {
			String ebml_cluster = "1F43B67501FFFFFFFFFFFFFFE78100";//时间戳从0开始
			byte[] ebml_cluster_byte = AESUtil.parseHexStr2Byte(ebml_cluster);
			int cl_len = ebml_cluster_byte.length;
			new_data = new byte[h_len + cl_len + d_len];
			System.arraycopy(webm_vp8_head_byte, 0, new_data, 0, h_len);
			System.arraycopy(ebml_cluster_byte, 0, new_data, h_len, cl_len);
			System.arraycopy(data, kv.getValue(), new_data, h_len + cl_len, d_len);
			//new_data[h_len + cl_len + 6] = Integer.valueOf("10000000", 2).byteValue();//改成关键帧?
		}
		return new_data;
	}
	
	
	
	/**
	 * 找0xA3和0x1F43B675
	 * 0xA3帧数不足无法播放,不在找
	 * @param data
	 * @return
	 */
	private static KeyValuePair<String, Integer> getEbmlIndex(byte[] data) {
		long begin = System.currentTimeMillis();
		long totallength = data.length;
		System.out.println("数据总长度: " + data.length);
		String ebml = "";
		int offset = 0;
		int index = 0;
		for (; index < totallength; index++) {
			offset = index;
			/*if((data[offset] & 0xFF) == 0xA3) {
				System.out.println();
				System.out.println("0xA3 发现 offset: " + offset);
				
				if(offset + 1 >= totallength) {
					System.out.println("0xA3 长度读取为-1, 结束");
					break;
				}
				offset++;
				byte a3_len_1byte = data[offset];
				if((a3_len_1byte & 0xFF) == 0) {
					System.out.println("0xA3 长度是0, 重新找, offset: " + offset);
					continue;
				}
				String a3_len_1byte_bin = Integer.toBinaryString(a3_len_1byte & 0xFF);
				System.out.println("0xA3 长度首字节二进制数: " + a3_len_1byte_bin);
				int a3_len_1byte_after = 8 - a3_len_1byte_bin.length();
				System.out.println("0xA3 长度字节数: "+ (a3_len_1byte_after + 1));
				
				if(offset + a3_len_1byte_after >= totallength) {
					System.out.println("0xA3 长度后续读取为不足-1, 重新找, offset: " + offset);
					continue;
				}
				byte[] a3_len_byte = new byte[a3_len_1byte_after + 1];
				a3_len_byte[0] = a3_len_1byte;
				for (int j = 1; j < a3_len_byte.length; j++) {
					offset++;
					a3_len_byte[j] = data[offset];
				}
				System.out.println("0xA3 长度十六进制: "+ AESUtil.parseByte2HexStr(a3_len_byte));
				long a3_len_val = ebmlLen(a3_len_byte);
				System.out.println("0xA3 长度去符号后的值: " + a3_len_val);
				if(offset + a3_len_val >= totallength) {
					System.out.println("0xA3 长度 " + a3_len_val + " 大于剩余 " + (totallength - offset) + ", 重新找, offset: " + offset);
					continue;
				}
				//跳过数据
				offset += a3_len_val;
				//0xA3后续读取0x1F43B675或0xA3
				//0x1F43B675   0xA3
				byte[] a3Or1f = new byte[4];
				if(offset + 1 >= totallength) {
					System.out.println("0xA3 后续读取 0xA3 or 0x1F43B675 读取为-1, 重新找, offset: " + offset);
					continue;
				}
				offset++;
				a3Or1f[0] = data[offset];
				int a3_0 = a3Or1f[0] & 0xFF;
				if(a3_0 != 0xA3 && a3_0 != 0x1F) {
					System.out.println("0xA3 后续读 0x1F43B675 or 0xA3 没有发现: " + Integer.toHexString(a3_0) + ", 重新找, offset: " + offset);
					continue;
				}
				if(a3_0 == 0xA3) {
					System.out.println("0xA3 后续读 0xA3 找到: 位置为: " + offset);
					ebml="0xA3";
					break;
				}
				//s 368821 o 5885
				if(offset + 3 >= totallength) {
					System.out.println("0xA3 后续读取 0x1F43B675 读取为-1, 重新找, offset: " + offset);
					continue;
				}
				for (int j = 1; j < a3Or1f.length; j++) {
					offset++;
					a3Or1f[j] = data[offset];
				}
				String a3_1f43b675_hex = AESUtil.parseByte2HexStr(a3Or1f);
				if("1F43B675".equalsIgnoreCase(a3_1f43b675_hex)) {
					System.out.println("0xA3 后续读 0x1F43B675 找到, 位置为: " + (offset-3));
					ebml = "0x1F43B675";
					break;
				} else {
					System.out.println("0xA3 后续读 0x1F43B675没有发现: " + a3_1f43b675_hex + ", 重新找, offset: " + offset);
					continue;
				}
			} else */ if((data[offset] & 0xFF) == 0x1F){
				
				System.out.println();
				System.out.println("0x1F43B675 发现0x1F offset: "+offset);
				if(offset + 3 >= totallength) {
					System.out.println("0x1F43B675 后续读取 0x1F43B675 读取为-1, 结束");
					break;
				}
				byte[] x1f43b675 = new byte[4];
				x1f43b675[0] = data[offset];
				for (int j = 1; j < x1f43b675.length; j++) {
					offset++;
					x1f43b675[j] = data[offset];
				}
				String x1f43b675_hex = AESUtil.parseByte2HexStr(x1f43b675);
				if(!"1F43B675".equalsIgnoreCase(x1f43b675_hex)) {
					System.out.println("0x1F43B675 后续错误: " + x1f43b675_hex + ", 重新找, offset: " + offset);
					continue;
				}
				System.out.println("0x1F43B675 找到, 位置为: " + offset + ", 检查后续长度");
				if(offset + 1 >= totallength) {
					System.out.println("0x1F43B675 长度读取为-1, 重新找, offset: " + offset);
					continue;
				}
				offset++;
				if((data[offset] & 0xFF) == 0) {
					System.out.println("0x1F43B675 长度是0, 重新找, offset: " + offset);
					continue;
				}
				String x1f43b675_len_1byte = Integer.toBinaryString(data[offset] & 0xFF);
				System.out.println("0x1F43B675 长度首字节二进制数: " + x1f43b675_len_1byte);
				int x1f43b675_len_1byte_after = 8 - x1f43b675_len_1byte.length();
				System.out.println("0x1F43B675 长度字节数: "+ (x1f43b675_len_1byte_after + 1));
				//不检查簇长度、未知的
				if(offset + x1f43b675_len_1byte_after >= totallength) {
					System.out.println("0x1F43B675 跳过长度失败: " + x1f43b675_len_1byte_after + ", 结束");
					continue;
				}
				offset+=x1f43b675_len_1byte_after;
				//检查数据
				if(offset + 1 >= totallength) {
					System.out.println("0x1F43B675 找0xE7读取为-1, 重新找, offset: " + offset);
					continue;
				}
				offset++;
				if((data[offset] & 0xFF) != 0xE7) {
					System.out.println("0x1F43B675 0xE7 没有找到 , 重新找, offset: " + offset);
					continue;
				}
				if(offset + 1 >= totallength) {
					System.out.println("0x1F43B675 0xE7 长度读取为-1, 重新找, offset: " + offset);
					continue;
				}
				offset++;
				byte e7_len_1byte = data[offset];
				if((e7_len_1byte & 0xFF) == 0) {
					System.out.println("0x1F43B675 0xE7长度是0, 重新找, offset: " + offset);
					continue;
				}
				String e7_len_1byte_bin = Integer.toBinaryString(e7_len_1byte & 0xFF);
				System.out.println("0x1F43B675 0xE7长度首字节二进制数: " + e7_len_1byte_bin);
				int e7_len_1byte_after = 8 - e7_len_1byte_bin.length();
				System.out.println("0x1F43B675 0xE7长度字节数: "+ (e7_len_1byte_after + 1));
				if(offset + e7_len_1byte_after >= totallength) {
					System.out.println("0x1F43B675 0xE7长度读取不足, 重新找, offset: " + offset);
					continue;
				}
				byte[] e7_len_byte = new byte[e7_len_1byte_after + 1];
				e7_len_byte[0] = e7_len_1byte;
				for (int j = 1; j < e7_len_byte.length; j++) {
					offset++;
					e7_len_byte[j] = data[offset];
				}
				
				System.out.println("0x1F43B675 0xE7长度十六进制: " + AESUtil.parseByte2HexStr(e7_len_byte));
				long a7_len_val = ebmlLen(e7_len_byte);
				System.out.println("0x1F43B675 0xE7 长度去符号后的值: " + a7_len_val);
				if(offset + a7_len_val >= totallength) {
					System.out.println("0x1F43B675 0xE7  长度" + a7_len_val + " 大于剩余 " + (totallength - offset) + ", 重新找, offset: " + offset);
					continue;
				}
				//跳过数据
				offset += a7_len_val;
				//读取0x1F43B675后0xA3
				if(offset + 1 >= totallength) {
					System.out.println("0x1F43B675 后续读取0xA3 读取为-1, 重新找, offset: " + offset);
					continue;
				}
				offset++;
				byte x1F43B675_a3 = data[offset];
				if((x1F43B675_a3 & 0xFF) == 0xA3) {
					System.out.println("0x1F43B675 0xA3 找到,位置为:" + offset);
					ebml = "0x1F43B675";
					break;
				} else {
					String x1F43B675_a3_hex = Integer.toHexString(x1F43B675_a3 & 0xFF);
					System.out.println("0x1F43B675 0xA3  没有找到: " + x1F43B675_a3_hex + ", 重新找, offset: " + offset);
					continue;
				}
			}
		}
		long end = System.currentTimeMillis();
		System.out.println(ebml + " index: " + index + " hex: " + Long.toHexString(index) + ", time: " + (end-begin) +"ms");
		return new KeyValuePair<String, Integer>(ebml, index);
	}

    private static long ebmlLen(byte[] lens) {
		String len0 = Integer.toBinaryString(lens[0] & 0xFF);
		long val = 0;
		if(!len0.equals("1")) {
			int tmp = Integer.valueOf(len0.substring(1), 2);
			val = tmp;
		}
		for (int i = 1; i < lens.length; i++) {
			val = (val << 8) + (lens[i] & 0xFF);
		}
		return val;
	}

           

该方法针对chrom产生的webm视频数据有效,其他视频格式自行修改 

com.r.util.AESUtil.java

public static String parseByte2HexStr(byte buf[]) {
		StringBuffer sb = new StringBuffer();
		for (int i = 0; i < buf.length; i++){
			String hex = Integer.toHexString(buf[i] & 0xFF);
			if (hex.length() == 1){
				hex = '0' + hex;
			}
			sb.append(hex.toUpperCase());
		}
		return sb.toString();
	}

	/**
	 * Desc:将16进制转换为二进制 param:String hexStr return:byte[] Author:WangPeiqiang
	 * Date:2017/8/22
	 **/
	public static byte[] parseHexStr2Byte(String hexStr) {
		if (hexStr.length() < 1)
			return null;
		byte[] result = new byte[hexStr.length() / 2];
		for (int i = 0; i < hexStr.length() / 2; i++){
			int high = Integer.parseInt(hexStr.substring(i * 2, i * 2 + 1), 16);
			int low = Integer.parseInt(hexStr.substring(i * 2 + 1, i * 2 + 2), 16);
			result[i] = (byte) (high * 16 + low);
		}
		return result;
	}
           

初次接触视频流处理方面的相关知识,有错误的地方请指正。