zlmediakit簡介
ZLMediaKit 是一個基于C++11的高性能營運級流媒體服務架構
項目特點: 1. 基于C++11開發,避免使用裸指針,代碼穩定可靠,性能優越。 2. 支援多種協定(RTSP/RTMP/HLS/HTTP-FLV/Websocket-FLV/GB28181/MP4),支援協定互轉。 3. 使用多路複用/多線程/異步網絡IO模式開發,并發性能優越,支援海量用戶端連接配接。 4. 代碼經過長期大量的穩定性、性能測試,已經線上上商用驗證已久。 支援linux、macos、ios、android、windows全平台。 5. 支援畫面秒開、極低延時(500毫秒内,最低可達100毫秒)
第三方編碼器Encode
1.初始化sdk_init()
初始化記憶體池,_buffer_pool.setSize(200);
初始化監控線程encodermonitor_thd和搜尋線程
std::thread encodermonitor_thd(encoder_moniter);
encodermonitor_thd.detach();
//setup askmedia
//todo 開啟定時搜尋編碼闆資訊
std::thread t_search(askDevMediaData);
t_search.detach();
初始化相關資料結構:
s_encodermap使用unordered_map儲存編碼器資訊
static std::recursive_mutex s_encoderMtx;
static unordered_map<int, SDK_MediaHelper::Ptr> s_encodermap;
static unordered_map<int, int> encoder_online_map;
static unordered_map<string, encoder_status> ipc_online_map;
static unordered_map<int, encoder_status> encoder_status_map;
2.編碼器MAP的操作
目前編碼器是否在錄制狀态
bool is_devrecording(int dwDevID)
{
GET_CONFIG(uint32_t, keep_alive_sec, Rtsp::kKeepAliveSecond);
bool m_isrecording = 0;
lock_guard<recursive_mutex> lck(s_encoderMtx);
for (auto& pr : s_encodermap)
if (dwDevID == pr.first)
{
SDK_MediaHelper::Ptr m_obj = s_encodermap[dwDevID];
if (m_obj->_alive_ticker.elapsedTime() < keep_alive_sec * 1000)
{
m_isrecording = 1;
}
}
return m_isrecording;
}
查找未成功錄制的編碼器
//check all encoder status is failed,we'll restart the encoder.
vectorstd::string getencoderfailed(int &encodesize)
{
std::vectorstd::string failed_list;
const string vhost = “default_vhost”;
const string app = “live”;
string customized_path = “”;
for (auto& encodeit : encoder_status_map)
{
auto tty = MediaSource::find(RTSP_SCHEMA, vhost, app, to_string(encodeit.first));
if (!tty ||(encodeit.second.elapsedTime>30000))
{
failed_list.push_back(to_string(encodeit.first));
}
}
encodesize = encoder_status_map.size();
return failed_list;
}
-
增加編碼器
SDK_MediaHelper作為編碼器的類,初始化相關參數。
sdk_add_encoder使用Http接口進行調研,把對應的編碼器加入到zlmediakit.
void sdk_add_encoder(int encoder_ID)
{
const string vhost = "__default_vhost__";
const string app = "live";
string stream_id = to_string(encoder_ID);
string nID = get_sdknID(stream_id);
s_encoderMtx.lock();
{
if (s_encodermap.find(encoder_ID) != s_encodermap.end()) {
//已經在拉流了
s_encoderMtx.unlock();
return;
}
/* SDK_MediaHelper::Ptr m_SDK_MediaHelper(std::make_shared<SDK_MediaHelper>(vhost, app, stream_id, 0, true, true, true, true
));*/
SDK_MediaHelper::Ptr m_SDK_MediaHelper(std::make_shared<SDK_MediaHelper>(vhost, app, nID, 0, true, true, true, true
));
VideoInfo m_sdkvideoinfo;
m_sdkvideoinfo.iWidth = SUB_VIDEO_WIDTH;
m_sdkvideoinfo.iHeight = SUB_VIDEO_HEIGHT;
m_sdkvideoinfo.iFrameRate = 15;
EventPollerPool::Instance().getPoller()->doDelayTask(30, [&]() {
encoder_askkeyframe(encoder_ID, 3);
return 0;
});
m_SDK_MediaHelper.get()->getChannel()->initVideo(m_sdkvideoinfo);
m_SDK_MediaHelper.get()->_alive_ticker.resetTime();
encoder_online_map[encoder_ID] = 0;
encoder_status m_encoder_status = { 0,0 };
s_encodermap[encoder_ID] = m_SDK_MediaHelper;
encoder_status_map[encoder_ID] = m_encoder_status;
s_encoderMtx.unlock();
};
Recorder::startRecord((Recorder::type)(1), "__default_proxy__", "live", nID, "");
}
4.删除編碼器,采用線程啟動sdk_del_encoderfunc來删除編碼器
void sdk_del_encoderfunc(int encoder_ID)
{
//make sure ID is exists
//DebugL << “sdk_del_encoderfunc******************* deletem_encoder_ID:” << encoder_ID;
s_encoderMtx.lock();
auto ttt = s_encodermap.find(encoder_ID);
if (ttt != s_encodermap.end())
{
s_encodermap.erase(ttt);
}
auto tttt = encoder_status_map.find(encoder_ID);
if (tttt != encoder_status_map.end())
{
encoder_status_map.erase(tttt);
}
DebugL << "sdk_del_encoderfunc******************* deletem_encoder_ID:end" << encoder_ID;
auto tt_t = encoder_online_map.find(encoder_ID);
if (tt_t != encoder_online_map.end())
{
encoder_online_map.erase(tt_t);
}
s_encoderMtx.unlock();
}
void sdk_del_encoder(int encoder_ID)
{
std::thread delencoderthread(sdk_del_encoderfunc, encoder_ID); // pass by value
delencoderthread.detach();
}
5.監控線程encoder_moniter
調用MediaSource::find(RTSP_SCHEMA, vhost, app, nID)檢視所有map中的編碼器是否成功錄制。
void encoder_moniter()
{
GET_CONFIG(uint32_t, keep_alive_sec, Rtsp::kKeepAliveSecond);
const string vhost = "__default_vhost__";
const string app = "live";
string customized_path = "";
while (true)
{
DebugL << "encoder_moniter runing________________________";
if (s_encoderMtx.try_lock())
{
for (auto& encodeit : s_encodermap)
{
//MediaSource::
string nID = get_sdknID(to_string(encodeit.first));
auto tty=MediaSource::find(RTSP_SCHEMA, vhost, app, nID);
if (tty)
{
//DebugL << "is media recording ________:" << tty.get()->isRecording(Recorder::type_mp4);
}
else
{
DebugL << "mediasource not found.encoder is:" << encodeit.first << "elapse time is:" << encodeit.second->_alive_ticker.elapsedTime() << "nID" << nID;
auto ttt = encoder_online_map.find(encodeit.first);
if(ttt!= encoder_online_map.end())
{
encoder_online_map[encodeit.first] = encoder_online_map[encodeit.first] + 1;
}
if (encoder_online_map[encodeit.first] >3)
{
encoder_online_map[encodeit.first] = 0;
string stream_id = to_string(encodeit.first);
DebugL << "encoderid recreated is_____" << stream_id;
string nID = get_sdknID(stream_id);
auto path = Recorder::getRecordPath(Recorder::type_mp4, vhost, app, nID, customized_path);
Recorder::stopRecord(Recorder::type_mp4, vhost, app, nID);
encodeit.second.reset();
SDK_MediaHelper::Ptr m_SDK_MediaHelper(std::make_shared<SDK_MediaHelper>(vhost, app, nID, 0, true, true, true, true
));
VideoInfo m_sdkvideoinfo;
m_sdkvideoinfo.iWidth = SUB_VIDEO_WIDTH;
m_sdkvideoinfo.iHeight = SUB_VIDEO_HEIGHT;
m_sdkvideoinfo.iFrameRate = 15;
EventPollerPool::Instance().getPoller()->doDelayTask(30, [&]() {
encoder_askkeyframe(encodeit.first, 3);
return 0;
});
m_SDK_MediaHelper.get()->getChannel()->initVideo(m_sdkvideoinfo);
m_SDK_MediaHelper.get()->_alive_ticker.resetTime();
s_encodermap[encodeit.first] = m_SDK_MediaHelper;
Recorder::startRecord((Recorder::type)(1), "__default_proxy__", "live", nID, "");
}
}
}
s_encoderMtx.unlock();
}
Sleep(15000);
}
}
6.編碼器資料接收處理
從資源池擷取BUFFER,把接收到的碼流資料複制到資源池。
把處理資料放入到threadpool:recv_RecorderData—》WorkThreadPool::Instance().getExecutor()->async(buffer, nLen
接受指定節點流媒體資訊回調函數
static void on_rcv_media_stream(char *lpData, int nLen, void* lpParamter)
{
if (nLen < 20)
{
ErrorL << "nLen is error,nLen is:" << nLen;
return;
}
BufferRaw::Ptr buffer = _buffer_pool.obtain();
buffer->setCapacity(nLen + 1);
buffer->setSize(nLen);
auto lp_Data = buffer->data();
memcpy(lp_Data, lpData, nLen);
//memcpy_fast(lp_Data, lpData, nLen);
// WorkThreadPool::setPoolSize();
WorkThreadPool::Instance().getExecutor()->async([buffer, nLen]() {
//DebugL << "async record" << nLen << endl;
recv_RecorderData(buffer, nLen);
});
//WorkThreadPool::Instance().getExecutor()->async([lpData, nLen, lpParamter]() {
// //DebugL << "async record" << nLen << endl;
// recvRecorderData(lpData, nLen, lpParamter);
//});
}
recv_RecorderData調用inputH264把資料注入到mediasource中
m_obj->getChannel()->inputH264
void recv_RecorderData(BufferRaw::Ptr buf, int nLen)
{
try
{
//std::cout << "recvRecorderData:" << std::endl;
int dwDevID;
int nMediaType;
unsigned int dwTimeStamp;
int nOff = 0;
int nCodec = 3;
int nPackID = 0;
int nWidth, nHeight;
int nRawLen = nLen;
auto lpData = buf->data();
dwDevID = *(unsigned int*)&lpData[nOff];
if (dwDevID < 0)
{
ErrorL << "dwDevID is negative" << dwDevID << "nLen is" << nLen;
return;
}
nOff += sizeof(unsigned int);
nMediaType = lpData[nOff++];
dwTimeStamp = *(unsigned int*)&lpData[nOff];
nOff += sizeof(unsigned int);
if (nMediaType == MEDIA_TYPE_SUB_VIDEO)
{
// lock_guard<recursive_mutex> lck(s_encoderMtx);
if (s_encoderMtx.try_lock())
{
for (auto& pr : s_encodermap)
if (dwDevID == pr.first)
{
nCodec = lpData[nOff++];
nWidth = *(unsigned short*)&lpData[nOff]; nOff += sizeof(unsigned short);
nHeight = *(unsigned short*)&lpData[nOff]; nOff += sizeof(unsigned short);
nPackID = lpData[nOff++] & 0xff;
nRawLen -= nOff;
if (nCodec == 3)
{
SDK_MediaHelper::Ptr m_obj = s_encodermap[dwDevID];
m_obj->_alive_ticker.resetTime();
m_obj->getChannel()->inputH264((const char *)(lpData + nOff), nRawLen, 0, 0);
buf.reset();
//buf.reset();
}
}
s_encoderMtx.unlock();
}
}
}
catch (const std::exception& ex)
{
std::cout << ex.what() << std::endl;
}
}
效果:資料可通過RTSP、RTMP來進行通路。
URL:
rtsp://IP:554/live/encoder_ID
rtmp://IP:1935/live/encoder_ID
7.停止sdk_stop。銷毀相關資源。
8.http接口
增加編碼器流
api_regist1("/index/api/addsdkstream", {
CHECK_SECRET();
CHECK_ARGS(“encoderid”,“MediaType”);
string m_nID= svrsdk::get_sdknID(allArgs[“encoderid”]);
add_devicemap(m_nID, “0”);
DebugL << “/index/api/addsdkstream” << “m_nID” << m_nID;
int m_encodeid = allArgs[“encoderid”];
svrsdk::sdk_add_encoder(m_encodeid);
val[“data”][“encoderid”] = to_string(m_encodeid);
});
删除編碼器流
api_regist1("/index/api/deletesdkstream", {
CHECK_SECRET();
CHECK_ARGS(“encoderid”);
DebugL << “/index/api/deletesdkstream”;
int m_encodeid = allArgs[“encoderid”];
svrsdk::sdk_del_encoder(m_encodeid);
val[“data”][“deletesdkstream”] = to_string(m_encodeid);
});
調試:
使用Postman添加電源增加編碼器流http接口。
連結:
https://github.com/zlmediakit/ZLMediaKit
postman