天天看点

如何发送和接收RTP包,用FFmpeg分离、解码

 RTP是网络上进行流媒体传输的一种常用协议,现在有很多封装RTP协议的开源库,比如:ortp, jrtplib,而其中最有名的要数jrtplib,本文给大家演示怎么用jrtplib开发一个带RTP发送和接收功能的应用程序,但这篇文章不会讲述jrtplib的基本用法知识,如果你要了解更多关于这个库的用法,可以参考这篇文章:http://www.cnblogs.com/yuweifeng/p/7550737.html。

本文给大家演示怎么开发一个基于RTP协议的流媒体播放器。播放器从网络上接收RTP包,解包后把视频分离出来,然后用FFmpeg解码,把图像显示出来。这个流媒体播放器实现的功能比较简单,但是实现了一个典型的网络播放器的框架。该文章涉及的开发知识和技巧包括:

1. 怎么用jrtplib发送数据;

2. 怎么使用jrtplib接收数据;

3. 怎么让ffmpeg从内存中读取流媒体数据,然后进行分离和解码;

4. 怎么用双线程技术同时接收和解码视频,提高播放的效率;

5. 怎么实现一个先入先出的缓冲队列存储收到的视频帧;

该播放器的代码下载地址:https://download.csdn.net/download/zhoubotong2012/10918971

这个播放器的界面如下所示:

如何发送和接收RTP包,用FFmpeg分离、解码

这个播放器主要演示播放网络流的功能,但为了方便测试,也集成了发送流媒体的功能,界面上提供一个按钮选择一个视频文件,文件可以是H264、PS、TS等容器格式;然后,需要指定发送的目标IP和目标端口号,这里的IP可以选本机IP。点击发送,则程序会以RTP方式打包,通过UDP将数据发送到目标地址。在本程序的接收端,只需要配置接收端口,点击“开始接收“按钮则开始接收数据。注意:目标端口要和接收端口一致,发送和接收是一对一的。程序可以分布在两台机器上运行,一个作发送端,一个作接收端;也可以在一台机上测试发送和接收。

  • 如何实现发送

首先,讲一下怎么实现发送流媒体的功能。我们需要创建一个RTPSession的发送对象,然后初始化相关的参数:

RTPSession session;

	RTPSessionParams sessionparams;
	sessionparams.SetOwnTimestampUnit(1.0 / 90000.0);
	sessionparams.SetAcceptOwnPackets(true);

	RTPUDPv4TransmissionParams transparams;
	transparams.SetPortbase(8000); //这个端口必须未被占用

	int status = session.Create(sessionparams, &transparams);
	if (status < 0)
	{
		//std::cerr << RTPGetErrorString(status) << std::endl;
		return - 1;
	}

#if 1
	RTPIPv4Address addr(ntohl(inet_addr(m_szDestIP)), m_nDestPort);
	status = session.AddDestination(addr);
#else
	unsigned long addr = ntohl(inet_addr(m_szDestIP));
	status = session.AddDestination(addr, m_nDestPort);
#endif
	if (status < 0)
	{
		//std::cerr << RTPGetErrorString(status) << std::endl;
		return -2;
	}

	session.SetDefaultPayloadType(96);
	session.SetDefaultMark(false);
	session.SetDefaultTimestampIncrement(90000.0 / 25.0);
           

这里初始化的参数包括RTP头的Payload类型(赋值为96),时间单位(1.0/90000.0),时间戳增量(90000/25=3600),以及Rtp头的MarkerBit的默认值。

接着读取一个视频文件,每次读1K字节,然后调用jrtplib的RTPSession::SendPacket函数发送数据:

FILE *fp_open;
	uint8_t buff[1024 * 5] = { 0 };
	DWORD  bufsize = 1024; //每次读1024字节,不超过1400就行

	DWORD dwReadBytesPerSec = 2*1024*1024/8; //读取速度
	RTPTime delay(bufsize*1.0/ dwReadBytesPerSec);

	//读取文件
	fp_open = fopen(m_szFilePath, "rb");
	while (!feof(fp_open) && g_RTPSendThreadRun)
	{
		int true_size = fread(buff, 1, bufsize, fp_open);

		int status = session.SendPacket(buff, true_size);

		Sleep(1000* bufsize/dwReadBytesPerSec);
		//RTPTime::Wait(delay); //delay for a few milliseconds
	}
           

(注意:这里读文件数据只是简单地将文件数据块读出来然后直接发送,没有对视频帧做二次封装和处理,对于某些格式比如H264,一般要求要以NALU单元来传输,以FU-A分片方式打包,然后再封装到RTP包里面,而本文的方法没有采取这种方式,大家要注意区分。)

  • 如何实现接收

接收的实现较为复杂一些,用到了多线程技术和缓冲队列。本文的实现中用到两条线程,一条用于接收RTP包,从中提取出视频数据;另一条线程用于解码视频,并把视频帧转成RGB格式后显示到窗口中。用到两条线程的好处是:可以并行接收和解码,两个工作相互独立,提高视频帧的处理效率,减少播放延时。而如果用一条线程来做,它既要接收又要解码,线程中处理一个帧的时间就长一些,而这时又不能接收数据,很可能造成后面的数据包丢掉。所以,用双线程的”分工合作“方式处理效率更高。两条线程之间需要维护一个队列,其中一条线程收到数据后放到队列里,然后另外一个线程从队列里读取数据,这是一个典型的”生产者-消费者“的模型,我们需要实现一个先入先出的队列来转运”视频帧“,这个队列的定义如下:

std::list<PacketNode_t>  m_packetList; //包列表
           

其中,PacketNode_t结构体的定义为:

typedef struct
{
	unsigned length;
	uint8_t *buf;
}PacketNode_t;
           

下面对接收线程和解码线程的工作流程作详细介绍。

首先,程序在接收前需要创建两个线程:

g_RTPRecvThreadRun = true;
	g_decoding_thread_run = true;

	DWORD threadID = 0;
	m_hRecvThread   = CreateThread(NULL, 0, RTPRecvThread, this, 0, &threadID);
	m_hDecodeThread = CreateThread(NULL, 0, decoding_thread, this, 0, &threadID);
           

RTPRecvThread是RTP数据的接收线程,实现方式如下:

DWORD WINAPI RTPRecvThread(void* param)
{
	TRACE("RTPRecvThread began! \n");

	CPlayStreamDlg * pThisDlg = (CPlayStreamDlg*)param;

	RTPSession session;
	//WSADATA dat;
	//WSAStartup(MAKEWORD(2, 2), &dat);

	RTPSessionParams sessionparams;
	sessionparams.SetOwnTimestampUnit(1.0 / 90000.0);
	//sessionparams.SetAcceptOwnPackets(true);

	RTPUDPv4TransmissionParams transparams;
	transparams.SetPortbase(m_nRecvPort); //接收端口

	int oldBufSize = transparams.GetRTPReceiveBuffer();
	transparams.SetRTPReceiveBuffer(oldBufSize * 2);
	int status = session.Create(sessionparams, &transparams);

	int newBufSize = transparams.GetRTPReceiveBuffer();
	int oldBufSizec = transparams.GetRTCPReceiveBuffer();
	transparams.SetRTCPReceiveBuffer(oldBufSizec * 2);
	int newBufSizec = transparams.GetRTCPReceiveBuffer();

	while (g_RTPRecvThreadRun)
	{
#ifndef RTP_SUPPORT_THREAD
		int error_status = session.Poll();
#endif // RTP_SUPPORT_THREAD

		session.BeginDataAccess();
		if (session.GotoFirstSourceWithData())
		{
			do
			{
				RTPPacket *pack;

				while ((pack = session.GetNextPacket()) != NULL)
				{
					int nPayType = pack->GetPayloadType();
					int nLen = pack->GetPayloadLength();
					unsigned char *pPayData = pack->GetPayloadData();
					int nPackLen = pack->GetPacketLength();
					unsigned char *pPackData = pack->GetPacketData();
					int csrc_cont = pack->GetCSRCCount();
					int ssrc = pack->GetSSRC();
					int nTimestamp = pack->GetTimestamp();
					int nSeqNum = pack->GetSequenceNumber();

#if 0
					Writebuf((char*)pPayData, nLen);
#else			
					pThisDlg->m_cs.Lock();
					//if (pThisDlg->m_packetList.size() < MAX_PACKET_COUNT)
					{
						PacketNode_t  temNode;
						temNode.length = nLen;
						temNode.buf = new uint8_t[nLen];
						memcpy(temNode.buf, pPayData, nLen);

						pThisDlg->m_packetList.push_back(temNode); //存包列表
					}
					pThisDlg->m_cs.Unlock();
#endif

					session.DeletePacket(pack);
				}
			} while (session.GotoNextSourceWithData());
		}
		else
		{
			//Sleep(10);
		}
		session.EndDataAccess();

		Sleep(1);
	}
	session.Destroy();

	TRACE("RTPRecvThread end! \n");
	return 0;
}
           

接收线程里创建了一个RTPSession对象,这个对象是用于接收RTP包,前面一部分代码用于初始化一些参数,包括:接收端口,时间戳单位,接收缓冲区大小。然后,进入一个循环,在里面不停地读取RTP数据包,如果session.GetNextPacket()返回的指针不为空,则表示读取到一个数据包,返回的指针变量是一个RTPPacket*类型,其指向的成员变量包括RTP头的各个字段的值,以及Payload数据的内存地址和大小。我们关键要提取出Payload的数据和大小,然后把它作为一个元素插入到缓冲队列中(如下面代码所示:)

pThisDlg->m_cs.Lock();

PacketNode_t  temNode;
temNode.length = nLen;
temNode.buf = new uint8_t[nLen];
memcpy(temNode.buf, pPayData, nLen);

pThisDlg->m_packetList.push_back(temNode); //存包列表

pThisDlg->m_cs.Unlock();
           

上面的接收线程实现了一个“生成者”,而“消费者”是实现在另外一个线程---decoding_thread,这个线程做的工作是解码。这个线程调用了很多FFmpeg的函数,但基本的流程是:打开一个文件源或URL地址-》从源中读取各个流的信息-》初始化解码器-》解码和显示。因为我们是从网络中收数据,所以是一个网络源,从网络源中读取数据有两种方式:一种是用FFmpeg内置的协议栈的支持,比如RTSP/RTMP/RTP,还有一种方式是我们传数据给FFmpeg,FFmpeg从内存中读取我们送的数据,然后用它的Demuxer和Parser来进行分析,分离出视频和音频。这里程序使用的是第二种方式,即从网络中探测数据,然后送数据给FFmpeg去解析。探测网络数据需要调用FFmpeg的av_probe_input_buffer函数,这个函数要传入一个内存缓冲区地址和一个回调函数指针,其中回调函数是用来从网络中读数据的(即我们放到缓冲队列里的数据包)。下面的fill_iobuffer就是读数据的回调函数,而pIOBuffer指向用于存放读取数据的缓冲区地址,FFmpeg就是从这里读取数据。

pIObuffer = (uint8_t*)av_malloc(4096);
	pb = avio_alloc_context(
		pIObuffer,
		4096,
		0,
		param,
		fill_iobuffer,
		NULL,
		NULL);

	if (av_probe_input_buffer(pb, &piFmt, "", NULL, 0, 0) < 0)//探测从内存中获取到的媒体流的格式
	{
		TRACE("Error: probe format failed\n");
		return -1;
	}
	else {
		TRACE("input format:%s[%s]\n", piFmt->name, piFmt->long_name);

	}
           

回调函数fill_iobuffer调用了一个ReadBuf的函数:

int fill_iobuffer(void* opaque, uint8_t* buf, int bufSize)
{
	ASSERT(opaque != NULL);
	CPlayStreamDlg* p_CPSDecoderDlg = (CPlayStreamDlg*)opaque;

	//TRACE("ReadBuf----- \n");
	int nBytes = ReadBuf((char*)buf, bufSize, (void*)p_CPSDecoderDlg);
	return (nBytes > 0) ? bufSize : -1;
}
           
static int ReadBuf(char* data, int len, void* pContext)
{
	CPlayStreamDlg * pThisDlg = (CPlayStreamDlg*)pContext;

	int data_to_read = len;
	char * pReadPtr = data;

	while (g_RTPRecvThreadRun)
	{
		int nRead = pThisDlg->ReadNetPacket((uint8_t*)pReadPtr, data_to_read);
		if (nRead < 0)
		{
			Sleep(10);
			continue;
		}
		pReadPtr += nRead;
		data_to_read -= nRead;
		if (data_to_read > 0)
		{
			Sleep(10);
			continue;
		}
		break;
	}

	return (data_to_read > 0) ? -1 : len;
}
           

ReadBuf函数的作用就不用解释了,大家一看就明白了。它实现了一个我们前面说的“消费者”,从前面实现的缓冲队列中读取数据包,读取之后就会从队列中删除相应的元素。如果队列不为空,则直接从前面的元素读取;如果无数据,则继续等待。

读了视频帧数据之后,就到了解码,解码的代码如下:

while (g_decoding_thread_run)
	{
		av_read_frame(pFormatContext, pAVPacket);
		if(pAVPacket->stream_index == video_stream_index)
		{
			avcodec_decode_video2(pCodecCtx, pFrame, &got_picture, pAVPacket);
			if(got_picture)
			{
				p_uint8_t_temp = pFrame->data[1];
				pFrame->data[1] = pFrame->data[2];
				pFrame->data[2] = p_uint8_t_temp;
				pFrame->data[0] += pFrame->linesize[0] * (pCodecCtx->height - 1);
				pFrame->linesize[0] *= -1;
				pFrame->data[1] += pFrame->linesize[1] * (pCodecCtx->height / 2 - 1);
				pFrame->linesize[1] *= -1;
				pFrame->data[2] += pFrame->linesize[2] * (pCodecCtx->height / 2 - 1);
				pFrame->linesize[2] *= -1;
				got_picture = sws_scale(img_convert_ctx, pFrame->data, pFrame->linesize, 0, pCodecCtx->height, RGB24Data, RGB24Linesize);
				got_picture = StretchDIBits(hDC, 0, 0, PlayingWidth, PlayingHeight, 0, 0, pCodecCtx->width, pCodecCtx->height, RGB24Data[0], (BITMAPINFO*)&bmpinfo, DIB_RGB_COLORS, SRCCOPY);
			}
		}

		av_free_packet(pAVPacket);
	}
           

FFmpeg从解码器输出的格式是YUV的,我们要转成RGB图像格式显示,所以调用了sws_scale函数来转换,最后调用Windows GDI函数---StretchDiBits来把图像显示到指定的窗口区域。

如果要停止解码,则退出线程的时候记得要释放FFmpeg创建的资源:

if (pFormatContext)
	{
		avformat_close_input(&pFormatContext);
		pFormatContext = NULL;
	}

	sws_freeContext(img_convert_ctx);
	av_freep(&RGB24Data[0]);
	av_frame_free(&pFrame);
	//avcodec_close(pCodecCtx);
	//av_free(pIObuffer); //调用了avformat_close_input会自动释放pIObuffer
	ReleaseDC(hwnd, hDC);
           

到此为止,一个简单的流媒体播放器的实现过程就介绍完了。

这篇博文传输的视频是直接从文件读取到的,并且不分什么格式,而对于常见的H264,一般需要以FU-A方式打包,需要对数据进行重组后再打成RTP包,我的下一篇博文(https://blog.csdn.net/zhoubotong2012/article/details/86510032)会向大家介绍怎么用这种方法打包和发送H264,并且RTP协议实现不依赖于rtplib,自己管理Socket实现RTP包收发。

------------------------------------------------------------------------------------

后记:

2019-11-11

某些网友反映用了这个工具后遇到报错或异常,其中主要问题是对某些格式FFmpeg不能识别或探测时间很长,这个是因为使用者测试用的媒体文件不是一种可流化的格式,什么是可流化格式,就是不需要下载整个文件,可以边下边播的,比如常见的PS、TS、MKV等封装格式,而MP4大多数不属于此类,因为它的某些元数据不是写在文件头,要读到文件尾部才能拿到所有帧的索引。

2018-01-29:

测试发现用jrtplib接收数据如果数据量很大会出现丢包,弄了半天才发现原来速度瓶颈是在session.Poll()函数,这个函数会等待很久去拿数据。jrtplib库里关于这个Poll函数的说明:

 意思是用poll thread就不需要调用这个函数,那可能就没有这个问题了(我没有验证过)。但是,这样需要用到 jthread 库。关于这点,我引用网上一篇博文里的介绍:

 jrtp-3.x 中有两种数据接收方式:第一种是用 jthread 库提供的线程自动在后台执行对数据的接收。第二种是用户自己调用 RTPSession 中的 Poll 方法。如果采取第一种方法则要安装 jthread 库。安装 jthread-1.2.1.tar.gz ,而且 jthread-1.2.1 必须先与jrtp-3.7.1 的安装。因为在 jrtp-3.7.1 的 configure 中,会查找系统是否有编译了 jthread 库,如果有,那么编译的 jrtp 库会开启对 jthread 的支持。因此如果先编译jrtp 再编译 jthread ,编译出来的 jrtp 是没有开启对 jthread 的支持的。如果采用第二种方法,那么可以不用编译 jthread 库,而直接编译 jrtp 库。

但是,我没有安装 jthread库,干脆不用jrtplib接收了,直接自己写Socket接收数据、对RTP解包。

新的例子代码:https://download.csdn.net/download/zhoubotong2012/10943378