傳輸H.264的多點傳播RTSP服務
參考:從零開始寫一個RTSP伺服器(八)一個多點傳播的RTSP伺服器
h264檔案:test.h264檔案位址
和java實作傳輸H.264的RTSP服務差別
- 服務端往多點傳播ip+port發送H.264Rtp資料,循環發送
- rtsp的響應:DESCRIBE和SETUP修改
代碼
-
RtspTcpServer.java
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Arrays;
import java.util.Date;
// Linux核心對TCP連接配接的識别是通過四元組來區分:源ip,源port,目标ip,目标port
public class RtspTcpServer {
public static void main(String[] args) throws IOException, InterruptedException {
int rtcpPort = MulticastServer.start();//開啟往多點傳播組發送資料
ServerSocket serverSocket = new ServerSocket(8888);//1.建立服務端對象
System.out.println("TCP服務端端啟動===>"+serverSocket.getLocalSocketAddress());
while (true){
Socket socket = serverSocket.accept(); //阻塞式,2.擷取連接配接過來的用戶端對象
//擷取到連接配接,則開啟一個線程處理目前連接配接
new Thread(new Runnable() {
@Override
public void run() {
InputStream inputStream = null;
OutputStream outputStream = null;
try {
System.out.println("TCP已連接配接===>"+socket.getRemoteSocketAddress());
inputStream = socket.getInputStream();//3.通過socket對象擷取輸入流,要讀取用戶端發來的資料
outputStream = socket.getOutputStream();//3.通過socket對象擷取輸入流,要讀取用戶端發來的資料
byte[] buffer = new byte[1024*1024];
int readNum = 0;
while((readNum=inputStream.read(buffer))!=-1){
if(readNum>0){
byte[] receive = Arrays.copyOfRange(buffer,0,readNum);
System.out.println("讀取的位元組數:"+readNum);
System.out.println("讀取的位元組數:"+receive.length);
System.out.println("緩沖區大小:"+buffer.length);
handlerReceiveData(outputStream,receive,rtcpPort);
}
}
} catch (IOException e) {
e.printStackTrace();
}finally {
System.out.println("斷開連接配接");
if(inputStream!=null){
try {
inputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if(outputStream!=null){
try {
outputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if(socket!=null){
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}).start();
}
}
public static void handlerReceiveData(OutputStream outputStream, byte[] buffer,int rtcpPort){
String receiveStr=new String(buffer);
System.out.println("TCP-----------------接收receiveStr----------------------");
System.out.println(receiveStr);
System.out.println("TCP-----------------接收receiveStr----------------------");
String lines[] = receiveStr.split("\\r?\\n");//按行分割
int cseq=0;
int clientRtpPort=0;
int clientRtcpPort=0;
String url=null;
String localIp=null;
{
for(String line:lines){
if(line.indexOf("rtsp://")>-1){
url = line.split("\\s+")[1];
String[] split = line.split(":");
localIp = split[1].substring(2);
}
if(line.startsWith("CSeq:")){
String[] split = line.split(": ");
cseq = Integer.parseInt(split[1].trim());
}
if(line.startsWith("Transport:")){
String[] split = line.split(";");
for(String i : split){
if(i.startsWith("client_port=")){
String substring = i.substring(12);
String[] split1 = substring.split("-");
clientRtpPort = Integer.parseInt(split1[0].trim());
clientRtcpPort = Integer.parseInt(split1[1].trim());
}
}
}
}
}//擷取cseq
String responseStr=null;
if (receiveStr.startsWith("OPTIONS")){
//OPTIONS 請求服務端支援的RTSP方法清單;也可以定時發送這個請求來保活RTSP會話。
responseStr=String.format("RTSP/1.0 200 OK\r\n"+
"CSeq: %d\r\n"+
"Public: OPTIONS, DESCRIBE, SETUP, PLAY\r\n"+
"\r\n",cseq);
}else if(receiveStr.startsWith("SETUP")){
//SETUP:用于配置資料互動的方法(比如制定音視訊的傳輸方式TCP或UDP)。
responseStr=String.format("RTSP/1.0 200 OK\r\n"+
"CSeq: %d\r\n"+
"Transport: RTP/AVP;multicast;destination=%s;source=%s;port=%d-%d;ttl=255\r\n"+
"Session: 66334873\r\n"+
"\r\n",
cseq,
MulticastServer.MULTICAST_ADDRESS,
url,
MulticastServer.PORT,
rtcpPort);
}else if(receiveStr.startsWith("DESCRIBE")){
//DESCRIBE:請求指定的媒體流的SDP描述資訊(詳細包括音視訊流的幀率、編碼類型等媒體資訊)。
String sdp=String.format("v=0\r\n"+
"o=- 9%d 1 IN IP4 %s\r\n"+
"t=0 0\r\n"+
"a=control:*\r\n"+
"a=type:broadcast\r\n"+
"a=rtcp-unicast: reflection\r\n"+
"m=video %d RTP/AVP 96\r\n"+
"c=IN IP4 %s/255\r\n"+
"a=rtpmap:96 H264/90000\r\n"+
"a=framerate:25\r\n"+
"a=control:track0\r\n",
new Date().getTime(),
localIp,
MulticastServer.PORT,
MulticastServer.MULTICAST_ADDRESS);
//sdp指定多點傳播位址和端口
responseStr=String.format("RTSP/1.0 200 OK\r\nCSeq: %d\r\n"+
"Content-Base: %s\r\n"+
"Content-type: application/sdp\r\n"+
"Content-length: %d\r\n\r\n"+
"%s",
cseq,
url,
sdp.length(),
sdp);
}else if(receiveStr.startsWith("PLAY")){
//PLAY:用于啟動(當暫停時重新開機)傳遞資料給用戶端。
responseStr=String.format("RTSP/1.0 200 OK\r\n"+
"CSeq: %d\r\n"+
"Range: npt=0.000-\r\n"+
"Session: 66334873; timeout=60\r\n" +
"\r\n",
cseq);
}else if(receiveStr.startsWith("PAUSE")){
//PAUSE:用于臨時停止服務端的資料的互動(使用PLAY來重新啟動資料互動)。
responseStr=String.format("RTSP/1.0 200 OK\r\n" +
"CSeq: %d\r\n" +
"\r\n",cseq);
}else if(receiveStr.startsWith("TEARDOWN")){
//TEARDOWN:請求終止來自服務端的資料的傳輸。
responseStr=String.format("RTSP/1.0 200 OK\r\n" +
"CSeq: %d\r\n" +
"\r\n",cseq);
}
try {
outputStream.write(responseStr.getBytes());
outputStream.flush();
System.out.println("TCP-----------------響應responseStr----------------------");
System.out.println(responseStr);
System.out.println("TCP-----------------響應responseStr----------------------");
} catch (IOException e) {
e.printStackTrace();
}
}
}
-
MulticastServer.java
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.MulticastSocket;
/*
* IP多點傳播:
* 多點傳播IP位址:用于辨別一個IP多點傳播組,範圍是從224.0.0.0到239.255.255.255。
* 多點傳播組可以是永久的也可以是臨時的:
* - 多點傳播組位址中,有一部分由官方配置設定的,稱為永久多點傳播組。
* - 永久多點傳播組保持不變的是它的ip位址,組中的成員構成可以發生變化。
* - 永久多點傳播組中成員的數量都可以是任意的,甚至可以為零。
* - 那些沒有保留下來供永久多點傳播組使用的ip多點傳播位址,可以被臨時多點傳播組利用。
* - 224.0.0.0~224.0.0.255為預留的多點傳播位址(永久組位址)。
* - 224.0.1.0~238.255.255.255為使用者可用的多點傳播位址(臨時組位址),全網範圍内有效。
* - 239.0.0.0~239.255.255.255為本地管理多點傳播位址,僅在特定的本地範圍内有效。
* 多點傳播源:資訊的發送者,多點傳播源不一定屬于多點傳播組,它向多點傳播組發送資料,自己不一定是接收者。多個多點傳播源可以同時向一個多點傳播組發送封包。
* 多點傳播組:資訊接收者,加入同一多點傳播組的接收者成員可以廣泛分布在網絡中的任何地方,沒有地域限制。
* 多點傳播路由器:支援多點傳播資訊傳輸的所有路由器
*
*
* 1.IP協定規定多點傳播位址的範圍是224.0.0.0 ~ 239.255.255.255,協定軟體底層設定好的
* 2.java提供了
* - socket=new MulticastSocket(多點傳播port);建立多點傳播成員(port端口)
* - socket.joinGroup(InetAddress.getByName(多點傳播ip));加入多點傳播位址
* - 該socket可以接收發往多點傳播位址的包
* 3.往多點傳播位址發送udp包,所有加入該多點傳播ip+port的成員都能收到
* - byte[] data="哈哈哈".getBytes();
* - DatagramPacket packet=new DatagramPacket(data,data.length,InetAddress.getByName(多點傳播ip),多點傳播port);
* - DatagramSocket datagramSocket=new DatagramSocket();//建立DatagramSocket對象
* - datagramSocket.send(packet);//向目标ip+port端發送資料報
* - datagramSocket.close();
* */
public class MulticastServer {
public static String MULTICAST_ADDRESS = "225.0.0.1";//多點傳播ip
public static int PORT = 6666;//多點傳播端口
private static MulticastSocket multicastSocket;
public static int start() throws IOException {
/*往多點傳播組添加一個成員MulticastSocket,用于發送資料(當然也可以是普通的DatagramSocket)*/
multicastSocket = new MulticastSocket(PORT);
multicastSocket.joinGroup(InetAddress.getByName(MULTICAST_ADDRESS));
/*開啟一個專門接收資料的線程(有可能收到誰(-_-)往多點傳播組發的消息)*/
new Thread(new Runnable() {
@Override
public void run() {
while (true) {
try {
byte[] buf = new byte[1024*1024];//建立資料包
DatagramPacket datagramPacket = new DatagramPacket(buf,buf.length);
multicastSocket.receive(datagramPacket);
handlerReceiveData(0,datagramPacket);
} catch (IOException e) {
e.printStackTrace();
}
}
}
}).start();
/*開啟一個線程專門往多點傳播組發送h264資料rtp包*/
RTPH264Server rtph264Server = new RTPH264Server(multicastSocket, InetAddress.getByName(MULTICAST_ADDRESS), PORT);
new Thread(new Runnable() {
@Override
public void run() {
try {
rtph264Server.startSendRtpPackage();
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
/*建立一個rtcp單點傳播Socket*/
DatagramSocket rtcpUdpSocket = new DatagramSocket();//建立socket服務
new Thread(new Runnable() {
@Override
public void run() {
byte[] buf = new byte[1024*1024];//建立資料包
DatagramPacket datagramPacket = new DatagramPacket(buf,buf.length);
while (true){
try {
rtcpUdpSocket.receive(datagramPacket); //阻塞式,3.使用接收方法将資料存儲到資料包中
System.out.println("rtp UDP接收包===>"+datagramPacket.getSocketAddress());
handlerReceiveData(1,datagramPacket);
} catch (IOException e) {
e.printStackTrace();
break;
}
}
}
});
return rtcpUdpSocket.getLocalPort();
}
public static void handlerReceiveData(int type,DatagramPacket datagramPacket){
if(type==0){
// System.out.println("多點傳播");
// System.out.println("接收到參數:"+datagramPacket.getSocketAddress());
// System.out.println(datagramPacket.getData());
// System.out.println(datagramPacket.getLength());
}else{
System.out.println("rtcp");
// System.out.println("接收到參數:"+datagramPacket.getSocketAddress());
// System.out.println(datagramPacket.getData());
// System.out.println(datagramPacket.getLength());
}
}
}
-
RTPH264Server.java
import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.List;
public class RTPH264Server {
private RandomAccessFile in;
private List<Long> NALUIndexs = new ArrayList<>() ;//用來記錄每個NALU的起始位置
DatagramSocket rtpUdpSocket;//發送資料的socket
InetAddress clientAddress;//目标位址
int clientRtpPort;//目标端口
public RTPH264Server(DatagramSocket socket, InetAddress address, int port) throws SocketException {
rtpUdpSocket = socket;
clientAddress = address;
clientRtpPort = port;
}
public void startSendRtpPackage() throws Exception {
String fileName = RTPH264Server.class.getResource("test.h264").getPath();
in = new RandomAccessFile(fileName, "r");
parseIndexs();//擷取所有起始下标
sendNALURtpPackage();
in.close();
}
/*
* 擷取所有NAUL的起始位置
*/
public void parseIndexs() throws IOException {
while(true) {
if(in.length()>0&&parseNALU()>0) {
//parseNALU尋找NALU的起始位置(001或0001後面的位置)
NALUIndexs.add(in.getFilePointer());//getFilePointer()傳回此檔案中的目前偏移量。
}
if(in.length()-in.getFilePointer()<4) {
//讀到檔案尾部,跳出
break;
}
// System.out.println(in.getFilePointer());
// in.seek(in.getFilePointer()-4);//getFilePointer()傳回此檔案中的目前偏移量。
// System.out.println(in.getFilePointer());
// in.readByte();//從此檔案中讀取一個帶符号的八位值。
// System.out.println(in.getFilePointer());
}
}
/*
* H.264原始碼流:由多個NALU組成
* 每個NALU之間用起始碼(0x000001(3Byte)或0x00000001(4Byte))分割
* H.264編碼時,在每個NAL前添加起始碼0x000001,解碼器在碼流中檢測到起始碼,目前NAL結束;
* 為了防止NAL内部出現0x000001的資料,h.264又提出'防止競争 emulation prevention"機制,
* 在編碼完一個NAL時,如果檢測出有連續兩個0x00位元組,就在後面插入一個0x03;
* 當解碼器在NAL内部檢測到 0x000003的資料,就把0x03抛棄,恢複原始資料
* */
public int parseNALU() throws IOException {
int head = in.readInt();//從該檔案讀取一個帶符号的32位整數,一次讀32位=4byte;0x00 00 00 01
if(head==1) {//0x00000001?
return 4;
}else if(head>>8 == 1) {//0x000001?
in.seek(in.getFilePointer()-1);//getFilePointer()傳回此檔案中的目前偏移量;seek()設定檔案指針偏移,從該檔案的開頭測量,發生下一次讀取或寫入
return 3;
}
return -1;
}
/*
* 擷取每一幀NALU 并存入集合
*/
public void sendNALURtpPackage() throws IOException, InterruptedException {
int framerate = 25;//framerate是幀率,每秒多少幀,每秒多少張圖檔。
int timestamp_increse = (int) (90000.0 / framerate);//碼率=90000bits/s,一幀則用90000/10=9000bits表示
int PT = 96;//負載類型号96:h264
int packageSize = 1400;//最大負載長度
int seqNum = 1;//序列号
int ts_current = 0;//目前時間戳
boolean isEnd=false;
for(int i=0;i<NALUIndexs.size();) {
in.seek(NALUIndexs.get(i));//設定檔案指針偏移
int len = 0;
if(i!=NALUIndexs.size()-1) {
len = (int) (NALUIndexs.get(i+1)-NALUIndexs.get(i));
}else {
//最後一個NALU
len = (int) (in.length() - NALUIndexs.get(i));
}
byte[] h264NALUArr=new byte[len];
in.read(h264NALUArr);
List<byte[]> bytes = h264DataToRtp(h264NALUArr, packageSize, PT, seqNum, ts_current, 0x88923423);
// System.out.println(bytes.size());
for(byte[] arr:bytes){
//2.建立資料報,包含響應的資料資訊
DatagramPacket packet2=new DatagramPacket(arr, arr.length,clientAddress,clientRtpPort);
try {
rtpUdpSocket.send(packet2);//3.響應用戶端
} catch (IOException e) {
e.printStackTrace();
isEnd=true;
break;
}
}
ts_current+=timestamp_increse;
seqNum += bytes.size();
Thread.sleep(30);
/*實作循環播放*/
if(i==NALUIndexs.size()-1){
i=0;
}else{
i++;
}
if(isEnd){
break;
}
}
}
/*
* H.264的RTP打包方式:
* 1.單NALU打包:一個RTP包中包含一個完整的NALU
* 2.聚合打包:對于較小的NALU,一個RTP包可包含多個完整的NALU
* 3.分片打包:對于較大的NALU,一個NALU可以分為多個RTP包發送
* - 在RTP載荷開始有兩個位元組的資訊,然後再是NALU的内容
* - 第一個位元組位(F1|R2|Type5)Type=28
* - 第二個位元組位(S1|E1|Type5)S是否第一包;E是否最後一包;
* */
public List<byte[]> h264DataToRtp(byte[] h264NALUArr,int packageSize,int PT,int seq,int timestamp,int ssrc){
List<byte[]> res=new ArrayList();//需要發送的rtp包資料
int seqNum = seq;//包序列号
if(h264NALUArr.length<=packageSize){
byte[] rtpHeader = initRTPHeader(PT, seqNum, timestamp, ssrc);//12個位元組的rtpHeader
//1.單NALU打包:一個RTP包中包含一個完整的NALU
byte[] rtpPackage=new byte[12+h264NALUArr.length];
System.arraycopy(rtpHeader,0,rtpPackage,0,12);//從源數組的第幾位,複制到目标數組開始下标,n位
System.arraycopy(h264NALUArr,0,rtpPackage,12,h264NALUArr.length);//從源數組的第幾位,複制到目标數組開始下标,n位
res.add(rtpPackage);
}else{
//3.分片打包:對于較大的NALU,一個NALU可以分為多個RTP包發送
/*
* 0 1 2
* 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3
* +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
* | FU indicator | FU header | FU payload ... |
* +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
* |F|NRI| Type |S|E|R| Type |
* +---------------+--------------+
*/
byte head=h264NALUArr[0];
int pktNum = (h264NALUArr.length-1)/packageSize; // 有幾個完整的包
int endPktSize = (h264NALUArr.length-1)%packageSize; // 剩餘不完整包的大小
int currentNum = 0;
while (currentNum <= pktNum){
if(currentNum<pktNum){
byte[] rtpHeader = initRTPHeader(PT, seqNum, timestamp, ssrc);//12個位元組的rtpHeader
byte[] rtpPackage=new byte[12+2+packageSize];//(currentNum*packageSize+1,packageSize)
System.arraycopy(rtpHeader,0,rtpPackage,0,12);//從源數組的第幾位,複制到目标數組開始下标,n位
rtpPackage[12]= (byte) (head & 0x60 |(byte) (28));
if(currentNum==0){
//第一包
rtpPackage[13]= (byte) (0x80 | ((byte) (head & 0x1f)));//|S=1|E=0|R=0|
}else if(currentNum==pktNum-1&&endPktSize==0){
//最後一包
rtpPackage[13]= (byte) (0x40 | ((byte) (head & 0x1f)));//|S=0|E=1|R=0|
}else{
//中間包
rtpPackage[13]= (byte) (head & 0x1f);//|S=0|E=0|R=0|
}
System.arraycopy(h264NALUArr,currentNum*packageSize+1,rtpPackage,14,packageSize);//從源數組的第幾位,複制到目标數組開始下标,n位
res.add(rtpPackage);
seqNum+=1;
}else if(currentNum==pktNum&&endPktSize>0){
//最後一包
byte[] rtpHeader = initRTPHeader(PT, seqNum, timestamp, ssrc);//12個位元組的rtpHeader
byte[] rtpPackage=new byte[12+2+endPktSize];//(currentNum*packageSize+1,endPktSize)
System.arraycopy(rtpHeader,0,rtpPackage,0,12);//從源數組的第幾位,複制到目标數組開始下标,n位
rtpPackage[12]= (byte) (head & 0x60 |(byte) (28));
rtpPackage[13]= (byte) (0x40 | ((byte) (head & 0x1f)));//|S=0|E=1|R=0|
System.arraycopy(h264NALUArr,currentNum*packageSize+1,rtpPackage,14,endPktSize);//從源數組的第幾位,複制到目标數組開始下标,n位
res.add(rtpPackage);
seqNum+=1;
}
currentNum++;
}
}
return res;
}
/*
* RTP封包格式:
* |===============================================================|
* | 0 | 1 | 2 | 3 |
* |===============|===============|===============|===============|
* |7 6 5 4 3 2 1 0|7 6 5 4 3 2 1 0|7 6 5 4 3 2 1 0|7 6 5 4 3 2 1 0|
* |===============|===============|===============================|
* |V2|P1|X1|CC4 |M1| PT7 | sequence number16 |
* |===============================================================|
* | timestamp時間戳 |
* |===============================================================|
* |同步信源(SSRC)辨別符synchronization source (SSRC) identifier |
* |===============================================================|
* |特約信源(CSRC)辨別符contributing source (CSRC) identifiers |
* | .... |
* |===============================================================|
*/
public byte[] initRTPHeader(int PT,int seq,int timestamp,int ssrc){
byte[] headerArr = new byte[12];//rtp固定頭部有12個位元組
//1.清空headerArr
for (int i = 0; i < headerArr.length; i++) { headerArr[i] = (byte) 0; }
//2.填充資料
//位元組1:|V2|P1|X1|CC4 |
headerArr[0] = (byte) 0x80;//10000000==>V=1.0,P=0,X=0,CC=0000
//位元組2:|M1| PT7 |
headerArr[1] = (byte)(PT & 0x7f);//01100000==>M=0,PT=1100000
//位元組3-4:|sequence number16|:headerArr[2],headerArr[3]
System.arraycopy(intToBytes(seq,2),0,headerArr,2,2);//從源數組的第幾位,複制到目标數組開始下标,n位
//位元組5-8|timestamp時間戳|:headerArr[4]~headerArr[7]
System.arraycopy(intToBytes(timestamp,4),0,headerArr,4,4);//從源數組的第幾位,複制到目标數組開始下标,n位
//位元組9-12|同步信源(SSRC)辨別符|:headerArr[8]~headerArr[11]
System.arraycopy(intToBytes(ssrc,4),0,headerArr,8,4);//從源數組的第幾位,複制到目标數組開始下标,n位
return headerArr;
}
/**
* 将32位長度轉換為n位元組。(大端位元組序:高位在前,低位在後)
* @param ldata 将從中構造n位元組數組的int。
* @param n 要将長檔案轉換為的所需位元組數。
* @return 用長值填充的所需位元組數組。
*/
public byte[] intToBytes(int ldata, int n) {
byte[] buff = new byte[n];
for (int i=n-1;i>=0;i--) {
// 保持将最右邊的8位配置設定給位元組數組,同時在每次疊代中移位8位
buff[i] = (byte)ldata;
ldata = ldata>>8;
}
return buff;
}
}
啟動
- 運作
的RtspTcpServer.java
就跑起來了mian()
- 使用vlc播放器播放網絡流(檔案=>打開網絡=>輸入URL)
rtsp://127.0.0.1:8888
VLC播放器 | 打開URl播放 | 結果 |