文章目錄
- MediaSoup
-
- RTP處理
MediaSoup
2019年3月寫的文章了,後續也沒有再跟進這個項目有沒有新變化。
MediaSoup是一個開源的SFU庫,分為用戶端和服務端。服務端分為JS層和C++層,C++層用于處理媒體和SDP等資料。我個人主要關注媒體相關的處理,也就是RTP和RTCP相關的處理。我們的項目不會用到這個項目,看它的代碼主要是解決我的兩個疑問:
- 多人會議,它是如何能保證每一個接收端都能流暢?
- 它是如何處理各個發送端以及接收端的RTCP包?
第一個問題,它不做任何處理,接收端僅僅是做了接收端擁塞控制該做的那部分接收端帶寬估計的工作而已。它不會根據其他端的接收情況(下行帶寬)調節發送端的帶寬。
第二個問題,服務端類似一個用戶端接收端和一個發送端,接收端需要做好的工作就是保證自己收到完整的包(NACK/FIR等)、給到發送端接收端的統計資訊(RR)。發送端就是接收每一個接收端的RTCP請求并做相應的處理,但是它僅僅支援丢包處理工作,不會處理碼率相關的工作,理論上來說它應該要根據每一個接收端的接收能力去得出一個合理的碼率,并把這個碼率告訴真實的發送端。
RTP處理
WebRtcTransport::OnRtpDataRecv
- 資料從JS層傳到C++層的OnRtpDataRecv函數
- 先對srtp進行解密,得到rtp包
- 如果存在一個鏡像(mirror)直接把解壓後的rtp資料轉發一份給它
- 解析rtp包,得到RtpPacket(rtp的頭部資訊包括擴充部分)
- 判斷是否存在絕對發送時間的rtp擴充,如果存在則把這份資料直接傳遞給接收端帶寬估計。MediaSoup僅僅支援一種遠端帶寬估計(RemoteBitrateEstimatorAbsSendTime),也就是依賴rtp擴充的絕對發送時間這種。
- 找到關聯的生産者(Producer),并把資料傳遞給它
- 删除資料,結束一次處理
// 隻保留媒體相關的處理邏輯,友善閱讀
inline void WebRtcTransport::OnRtpDataRecv(RTC::TransportTuple* tuple, const uint8_t* data, size_t len)
{
// Decrypt the SRTP packet.
if (!this->srtpRecvSession->DecryptSrtp(data, &len)) {
return;
}
// Mirror RTP if needed.
if (this->mirrorTuple != nullptr && this->mirroringOptions.recvRtp)
this->mirrorTuple->Send(data, len);
RTC::RtpPacket* packet = RTC::RtpPacket::Parse(data, len);
if (packet == nullptr) {
return;
}
// Apply the Transport RTP header extension ids so the RTP listener can use them.
if (this->headerExtensionIds.absSendTime != 0u) {
packet->AddExtensionMapping(RtpHeaderExtensionUri::Type::ABS_SEND_TIME, this->headerExtensionIds.absSendTime);
}
if (this->headerExtensionIds.mid != 0u) {
packet->AddExtensionMapping(RtpHeaderExtensionUri::Type::MID, this->headerExtensionIds.mid);
}
if (this->headerExtensionIds.rid != 0u) {
packet->AddExtensionMapping(RtpHeaderExtensionUri::Type::RTP_STREAM_ID, this->headerExtensionIds.rid);
}
// Feed the remote bitrate estimator (REMB).
uint32_t absSendTime;
if (packet->ReadAbsSendTime(&absSendTime)) {
this->remoteBitrateEstimator->IncomingPacket(DepLibUV::GetTime(), packet->GetPayloadLength(), *packet, absSendTime);
}
// Get the associated Producer.
RTC::Producer* producer = this->rtpListener.GetProducer(packet);
if (producer == nullptr) {
delete packet;
return;
}
// Pass the RTP packet to the corresponding Producer.
producer->ReceiveRtpPacket(packet);
delete packet;
}
Producer::ReceiveRtpPacket
- 判斷是否需要建立一個新的RtpStreamRecv,如果不存在它會根據RtpParameters資訊建立一個,RtpParameters是發送端在發送媒體資料之前向MediaSoup發送的能力集合資訊,包括了ssrc,編解碼器名字,編碼器負載類型,是否支援nack,是否支援fec以及fec的ssrc,是否支援rtx以及rtx的ssrc,最大幀率,分辨率縮放比例,rtp擴充等。
- 判斷收到的是rtp包還是rtx包并傳遞給RtpStreamRecv對應的處理函數
- RtpStreamRecv接收到RTX包,會先解出RTP包,計算jitter和判斷Nack
- RtpStreamRecv接收到RTP包,計算jitter和判斷Nack
- RtpStreamRecv還會對包進行nal解析,判斷是否是關鍵幀
- RtpStreamRecv還會計算碼率,判斷目前的方式端的狀态
- 根據發送端協商的rtp擴充,重新填充擴充
- 傳遞給OnProducerRtpPacket,從代碼上看之後Router注冊了,是以隻有實際上僅僅是調用了Router::OnProducerRtpPacket
- OnProducerRtpPacket調用注冊到此生産者的所有消費者(SendRtpPacket)
void Producer::ReceiveRtpPacket(RTC::RtpPacket* packet)
{
// May need to create a new RtpStreamRecv.
MayNeedNewStream(packet);
// Find the corresponding RtpStreamRecv.
uint32_t ssrc = packet->GetSsrc();
RTC::RtpStreamRecv* rtpStream{ nullptr };
RTC::RtpEncodingParameters::Profile profile;
std::unique_ptr<RTC::RtpPacket> clonedPacket;
// Media RTP stream found.
if (this->mapSsrcRtpStreamInfo.find(ssrc) != this->mapSsrcRtpStreamInfo.end()) {
rtpStream = this->mapSsrcRtpStreamInfo[ssrc].rtpStream;
auto& info = this->mapSsrcRtpStreamInfo[ssrc];
rtpStream = info.rtpStream;
profile = info.profile;
clonedPacket.reset(packet->Clone(ClonedPacketBuffer));
packet = clonedPacket.get();
// Process the packet.
if (!rtpStream->ReceivePacket(packet))
return;
} else {
for (auto& kv : this->mapSsrcRtpStreamInfo) {
auto& info = kv.second;
if (info.rtxSsrc != 0u && info.rtxSsrc == ssrc) {
rtpStream = info.rtpStream;
profile = info.profile;
clonedPacket.reset(packet->Clone(ClonedPacketBuffer));
packet = clonedPacket.get();
// Process the packet.
if (!rtpStream->ReceiveRtxPacket(packet))
return;
// Packet repaired after applying RTX.
rtpStream->packetsRepaired++;
break;
}
}
}
ApplyRtpMapping(packet);
for (auto& listener : this->listeners) {
listener->OnProducerRtpPacket(this, packet, profile);
}
}
Consumer::SendRtpPacket
- 重寫rtp的ssrc,序号和時間戳,并存儲此包(看了代碼還是不明白為什麼要這樣做)
- 發送此包(最後通過Transport子產品發送,實際上就是通過WebRTCTransport子產品發送的,一個生産者有一個WebRTCTransport執行個體)
- 恢複原始的ssrc,序号和時間戳
void Consumer::SendRtpPacket(RTC::RtpPacket* packet, RTC::RtpEncodingParameters::Profile profile)
{
// Map the payload type.
auto payloadType = packet->GetPayloadType();
// NOTE: This may happen if this Consumer supports just some codecs of those in the corresponding Producer.
if (this->supportedCodecPayloadTypes.find(payloadType) == this->supportedCodecPayloadTypes.end())
return;
// Check whether this is the key frame we are waiting for in order to update the effective profile.
if (this->effectiveProfile != this->targetProfile && profile == this->targetProfile) {
bool isKeyFrame = false;
bool canBeKeyFrame = Codecs::CanBeKeyFrame(this->rtpStream->GetMimeType());
if (canBeKeyFrame && packet->IsKeyFrame()) {
isKeyFrame = true;
if (isKeyFrame || !canBeKeyFrame) {
SetEffectiveProfile(this->targetProfile);
// Resynchronize the stream.
this->syncRequired = true;
// Clear RTP retransmission buffer to avoid congesting the receiver by
// sending useless retransmissions (now that we are sending a newer key frame).
this->rtpStream->ClearRetransmissionBuffer();
// Stop probation if probing profile is the new effective profile.
if (IsProbing() && this->probingProfile == this->effectiveProfile)
StopProbation();
}
}
bool isSyncPacket = false;
if (this->syncRequired) {
isSyncPacket = true;
this->rtpSeqManager.Sync(packet->GetSequenceNumber());
this->rtpTimestampManager.Sync(packet->GetTimestamp());
// Calculate RTP timestamp diff between now and last sent RTP packet.
if (this->rtpStream->GetMaxPacketMs() != 0u) {
auto now = DepLibUV::GetTime();
auto diffMs = now - this->rtpStream->GetMaxPacketMs();
auto diffTs = diffMs * this->rtpStream->GetClockRate() / 1000;
this->rtpTimestampManager.Offset(diffTs);
}
this->syncRequired = false;
if (this->encodingContext)
this->encodingContext->SyncRequired();
}
// Rewrite payload if needed. Drop packet if necessary.
if (this->encodingContext && !packet->EncodePayload(this->encodingContext.get())) {
this->rtpSeqManager.Drop(packet->GetSequenceNumber());
this->rtpTimestampManager.Drop(packet->GetTimestamp());
return;
}
// Update RTP seq number and timestamp.
uint16_t rtpSeq;
uint32_t rtpTimestamp;
this->rtpSeqManager.Input(packet->GetSequenceNumber(), rtpSeq);
this->rtpTimestampManager.Input(packet->GetTimestamp(), rtpTimestamp);
auto origSsrc = packet->GetSsrc();
auto origSeq = packet->GetSequenceNumber();
auto origTimestamp = packet->GetTimestamp();
// Rewrite packet
packet->SetSsrc(this->rtpParameters.encodings[0].ssrc);
packet->SetSequenceNumber(rtpSeq);
packet->SetTimestamp(rtpTimestamp);
// Process the packet.
if (this->rtpStream->ReceivePacket(packet)) {
// Send the packet.
this->transport->SendRtpPacket(packet);
// Retransmit the RTP packet if probing.
if (IsProbing())
SendProbation(packet);
}
// Restore
packet->SetSsrc(origSsrc);
packet->SetSequenceNumber(origSeq);
packet->SetTimestamp(origTimestamp);
// Restore the original payload if needed.
if (this->encodingContext)
packet->RestorePayload();
// Run probation if needed.
if (this->kind == RTC::Media::Kind::VIDEO && --this->rtpPacketsBeforeProbation == 0) {
this->rtpPacketsBeforeProbation = RtpPacketsBeforeProbation;
MayRunProbation();
}
}