天天看點

流媒體-RTMP協定-librtmp庫學習-c++多線程實作rtmp推流flv檔案(三)

  1. 流媒體-RTMP協定-rtmpdump-flv封裝解析(一)
  2. 流媒體-RTMP協定-librtmp庫學習(二)
  3. 流媒體-RTMP協定-librtmp庫學習-c++多線程實作rtmp推流flv檔案(三)

文章目錄

      • rtmppusher.h頭檔案
      • rtmppusher.cpp 檔案
      • main.cpp
      • 源代碼下載下傳
      • 參考

rtmppusher.h頭檔案

/**
* @projectName   TestlibRtmp
* @author        wangbaojia
* @date          2020-12-02
* @brief
*
*  librtmp +nginx 實作flv推流(adobe rtmp協定)
*
*
*/


#ifndef RTMPPUSHER_H
#define RTMPPUSHER_H

#include <QObject>
#include <QThread>
#include <QDebug>

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdint.h>

#include <iostream>
#include <atomic>
#include <chrono>

#ifndef WIN32
#include <unistd.h>
#endif

#include "librtmp/rtmp.h"
#include "librtmp/log.h"
#include "librtmp/amf.h"

#define HTON16(x)  ((x >> 8 & 0xff)|(x << 8 & 0xff00))
#define HTON24(x)  ((x >> 16 & 0xff)|(x << 16 & 0xff0000)|(x & 0xff00))
#define HTON32(x)  ((x >> 24 & 0xff)|(x >> 8 & 0xff00)|(x << 8 & 0xff0000)|(x << 24 & 0xff000000))
#define HTONTIME(x) ((x >> 16 & 0xff)|(x << 16 & 0xff0000)|(x & 0xff00)|(x & 0xff000000))


class RTMPPusher : public QThread
{
    Q_OBJECT
public:
    RTMPPusher();
    virtual ~RTMPPusher();
    /**
     * @brief InitRtmp
     * @param filename     待打開的檔案
     * @param url          推流url
     * @param packetsize   包大小
     * @return
     */
    bool InitRtmp(std::string filename,std::string url,int packetsize);

    inline void CleanupSockets()
    {
    #ifdef WIN32
        WSACleanup();
    #endif
    }

    inline int InitSockets()
    {
    #ifdef WIN32
        WORD version;
        WSADATA wsaData;

        version = MAKEWORD(1, 1);
        return (WSAStartup(version, &wsaData) == 0);
    #else
        return TRUE;
    #endif
    }

    void Close();
    void Pause();
    bool ReadHeaderAndMetadata();

protected:

    void run();

private:

    std::string m_strFileName = "";
    std::string m_strUrl= "";
    FILE* m_pFp= nullptr;
    /**
     * @brief m_nFileSize off_t在64位的Linux系統中則會被編譯為long long int
     */
    off_t m_nFileSize;
    /**
     * @brief m_nMetadataPos metadate資料位置
     */
    off_t m_nMetadataPos;

    RTMP* m_pRtmp = nullptr;
    /**
     * @brief m_pRtmpPkt
     */
    RTMPPacket* m_pRtmpPkt = nullptr;

    std::atomic<bool> m_bIsExit;
    std::atomic<bool> m_bIsPause;

};

#endif // RTMPPUSHER_H

           

rtmppusher.cpp 檔案

#include "rtmppusher.h"

RTMPPusher::RTMPPusher()
{
    m_bIsExit = false;
    m_bIsPause = false;
}


RTMPPusher::~RTMPPusher()
{

}

bool RTMPPusher::InitRtmp(std::string filename,std::string url,int packetsize)
{
    m_pFp = fopen(filename.c_str(),"rb");
    if(!m_pFp)
    {
        RTMP_LogPrintf("Open File Error\n");
        CleanupSockets();
        return false;
    }

    //初始化Socket
    if(!InitSockets())
    {
        RTMP_LogPrintf("Init Socket Err\n");
        fclose(m_pFp);
        return false;
    }

    //為結構體“RTMP”配置設定記憶體
    m_pRtmp = RTMP_Alloc();
    if(!m_pRtmp)
    {
        RTMP_LogPrintf("RTMP_Alloc failed\n");
        CleanupSockets();
        fclose(m_pFp);
        return false;
    }

    //初始化結構體“RTMP”中的成員變量
    /** 源碼
    void RTMP_Init(RTMP *r)
    {
    #ifdef CRYPTO
      if (!RTMP_TLS_ctx)
        RTMP_TLS_Init();
    #endif

      memset(r, 0, sizeof(RTMP));
      r->m_sb.sb_socket = -1;
      r->m_inChunkSize = RTMP_DEFAULT_CHUNKSIZE;
      r->m_outChunkSize = RTMP_DEFAULT_CHUNKSIZE;
      r->m_nBufferMS = 30000;
      r->m_nClientBW = 2500000;
      r->m_nClientBW2 = 2;
      r->m_nServerBW = 2500000;
      r->m_fAudioCodecs = 3191.0;
      r->m_fVideoCodecs = 252.0;
      r->Link.timeout = 30;
      r->Link.swfAge = 30;
    }
    */
    RTMP_Init(m_pRtmp);
    m_pRtmp->Link.timeout = 5; //預設為30秒


    if(!RTMP_SetupURL(m_pRtmp, const_cast<char*>(url.c_str())))
    {
        RTMP_Log(RTMP_LOGERROR,"SetupURL Err\n");
        RTMP_Free(m_pRtmp);
        CleanupSockets();
        fclose(m_pFp);
        return false;
    }

    //釋出流的時候必須要使用。如果不使用則代表接收流。
    RTMP_EnableWrite(m_pRtmp);

    //建立RTMP連接配接,建立一個RTMP協定規範中的NetConnection
    if(!RTMP_Connect(m_pRtmp,nullptr))
    {
        RTMP_Log(RTMP_LOGERROR,"Connect Err\n");
        Close();
        return false;
    }

    //建立一個RTMP協定規範中的NetStream
    if (!RTMP_ConnectStream(m_pRtmp,0))
    {
        RTMP_Log(RTMP_LOGERROR,"ConnectStream Err\n");
        Close();
        return false;
    }

    m_pRtmpPkt = (RTMPPacket*)malloc(sizeof(RTMPPacket));
    if(!m_pRtmpPkt)
    {
        RTMP_Log(RTMP_LOGERROR,"malloc RTMPPacket Err\n");
        Close();
        return false;
    }

    int flag = RTMPPacket_Alloc(m_pRtmpPkt,packetsize);
    if(flag == 0)
    {
        RTMP_Log(RTMP_LOGERROR,"RTMPPacket_Alloc Err\n");
        Close();
        return false;
    }

    RTMPPacket_Reset(m_pRtmpPkt);

    m_pRtmpPkt->m_hasAbsTimestamp = 0;
    m_pRtmpPkt->m_nChannel = 0x04;//快流id,0,1,2,作為保留;/* source channel */


    return true;
}

bool RTMPPusher::ReadHeaderAndMetadata()
{

    /**
     * @brief int fseek(FILE *stream, long offset, int fromwhere);
     *        int fseeko(FILE *stream, off_t offset, int fromwhere);
     *        int fseeko64(FILE *stream, off64_t offset, int fromwhere);
     * 這幾個函數唯一的不同是offset的資料類型不同,相應的能夠處理的偏移量的範圍也就有大有小
     * stream:檔案指針
     * fromwhere:偏移起始位置
     * offset:偏移量
     * 函數設定檔案指針stream的位置。如果執行成功,stream将指向以fromwhere
     * 偏移起始位置:檔案頭0(SEEK_SET),目前位置1(SEEK_CUR),檔案尾2(SEEK_END))為基準,
     * 偏移offset(指針偏移量)個位元組的位置。如果執行失敗(比如offset超過檔案自身大小),則不改變stream指向的位置。
     */
    fseek(m_pFp, 0, SEEK_END);//指針跳轉到尾部
    /* Return the current position of STREAM.
       This function is a possible cancellation point and therefore not
       marked with __THROW.  */
    //extern __off_t ftello (FILE *__stream) __wur;
    //擷取檔案位元組大小
    m_nFileSize = ftello(m_pFp);
    //傳回檔案頭部
    fseek(m_pFp, 0, SEEK_SET);

    size_t bufferSize = 0;
    char hbuf[16], *buffer = nullptr;
    char *metaHeader;	// meta data read from the file [out]
    uint32_t nMetaHeaderSize;	// length of metaHeader [out]
    double duration;

    if (m_nFileSize > 0)
    {
        // verify FLV format and read header
        uint32_t prevTagSize = 0;

        // check we've got a valid FLV file to continue!
        // 9 位元組頭 + 4 位元組prevTagSize為0的大小 = 13 位元組
        if (fread(hbuf, 1, 13, m_pFp) != 13)
        {
            RTMP_Log(RTMP_LOGERROR, "Couldn't read FLV file header!");
            return false;
        }
        if (hbuf[0] != 'F' || hbuf[1] != 'L' || hbuf[2] != 'V'
                || hbuf[3] != 0x01)
        {
            RTMP_Log(RTMP_LOGERROR, "Invalid FLV file!");
            return false;
        }
        //第五位元組的第一位視訊,第三位音頻,兩者都不存在直接退出
        if ((hbuf[4] & 0x05) == 0)
        {
            RTMP_Log(RTMP_LOGERROR,
                     "FLV file contains neither video nor audio, aborting!");
            return false;
        }
        //頭檔案長度第6-9位元組:4位元組
        uint32_t dataOffset = AMF_DecodeInt32(hbuf + 5);
        //flv檔案頭解析結束
        fseek(m_pFp, dataOffset, SEEK_SET);

        //讀取4位元組 prevTagSize 此值應為0
        if (fread(hbuf, 1, 4, m_pFp) != 4)
        {
            RTMP_Log(RTMP_LOGERROR, "Invalid FLV file: missing first prevTagSize!");
            return false;
        }
        prevTagSize = AMF_DecodeInt32(hbuf);
        if (prevTagSize != 0)
        {
            RTMP_Log(RTMP_LOGWARNING,
                     "First prevTagSize is not zero: prevTagSize = 0x%08X",
                     prevTagSize);
        }

        // go through the file to find the meta data!
        m_nMetadataPos = dataOffset + 4;//m_nMetadataPos 指向tag body
        int bFoundMetaHeader = FALSE;
        //解析中繼資料
        while (m_nMetadataPos < m_nFileSize - 4 && !bFoundMetaHeader)
        {
            fseeko(m_pFp, m_nMetadataPos, SEEK_SET);
            //讀取tag body前四位元組
            if (fread(hbuf, 1, 4, m_pFp) != 4)
                break;
            //第一位元組tag類型,第2-4共三位元組表示tag檔案長度,表示StreamID之後的資料長度
            uint32_t dataSize = AMF_DecodeInt24(hbuf + 1);
            //該tag是中繼資料
            if (hbuf[0] == 0x12)
            {
                if (dataSize > bufferSize)
                {
                    /* round up to next page boundary */
                    bufferSize = dataSize + 4095;
                    bufferSize ^= (bufferSize & 4095);
                    free(buffer);
                    buffer = (char*)malloc(bufferSize);
                    if (!buffer)
                        return false;
                }

                //跳過body頭部
                fseeko(m_pFp, m_nMetadataPos + 11, SEEK_SET);
                //讀取資料到buffer中
                if (fread(buffer, 1, dataSize, m_pFp) != dataSize)
                    break;

                AMFObject metaObj;
                int nRes = AMF_Decode(&metaObj, buffer, dataSize, FALSE);
                if (nRes < 0)
                {
                    RTMP_Log(RTMP_LOGERROR, "%s, error decoding meta data packet",
                             __FUNCTION__);
                    break;
                }

#if 0
                AVal metastring;
                AMFProp_GetString(AMF_GetProp(&metaObj, NULL, 0), &metastring);

                if (AVMATCH(&metastring, &av_onMetaData))
                {
                    AMF_Dump(&metaObj);

                    nMetaHeaderSize = dataSize;
                    if (metaHeader)
                        free(metaHeader);
                    metaHeader = (char *) malloc(nMetaHeaderSize);
                    memcpy(metaHeader, buffer, nMetaHeaderSize);

                    // get duration
                    AMFObjectProperty prop;
                    if (RTMP_FindFirstMatchingProperty
                            (&metaObj, &av_duration, &prop))
                    {
                        duration = AMFProp_GetNumber(&prop);
                        RTMP_Log(RTMP_LOGDEBUG, "File has duration: %f", duration);
                    }

                    bFoundMetaHeader = TRUE;
                    break;
                }
#endif
                break; //讀取metadate跳出

            }
            //跳轉到下一個tag
            m_nMetadataPos += (dataSize + 11 + 4);
        }

        free(buffer);
        if (!bFoundMetaHeader)
            RTMP_Log(RTMP_LOGWARNING, "Couldn't locate meta data!");
    }
    //回到metadata位置
    fseek(m_pFp,m_nMetadataPos,SEEK_SET);
    return true;

}

void RTMPPusher::Close()
{
    m_bIsExit = true;

    if(m_pRtmp)
    {
        RTMP_Close(m_pRtmp);
        RTMP_Free(m_pRtmp);
    }

    CleanupSockets();

    if(m_pFp)
    {
        fclose(m_pFp);
        m_pFp = nullptr;
    }

}

void RTMPPusher::Pause()
{
    m_bIsPause = true;
}

void RTMPPusher::run()
{
    //線程讀取資料
    unsigned int pretime = 0;
    unsigned int videonum = 0;
    unsigned int audionum = 0;

    while(!m_bIsExit)
    {
        if(m_bIsPause) //暫停
        {
            msleep(1);
            continue;
        }
        if(!m_pRtmp)
        {
            msleep(1);
            continue;
        }

        //從flv檔案中讀取資料
        /**
         * RTMPPacket包資料
         * 1)0 類型的塊長度為11 位元組:時間戳3位元組,消息長度3位元組,消息類型1位元組,消息流id 4位元組。
         * 2)擴充時間戳:零或四位元組
         *
         * flv檔案的tag
         * 1)Tag 頭部:11位元組:類型1位元組,長度3位元組(StreamID之後的資料長度),時間戳3位元組,擴充時間戳1位元組,消息流id 3位元組
         *
         * */
        unsigned int timestamp;
        unsigned int length;
        unsigned char type;
        unsigned int streamid;
        unsigned int pretagsize;

        if(fread(&type,1,1,m_pFp) != 1) break;

        if(fread(&length,1,3,m_pFp) != 3) break;
        length = HTON24(length);

        if(fread(&timestamp,1,4,m_pFp) != 4) break;
        timestamp = HTONTIME(timestamp);


        if(fread(&streamid,1,3,m_pFp) != 3) break;
        streamid = HTON24(streamid);

        if(fread(m_pRtmpPkt->m_body,1,length,m_pFp) != length)
        {
            RTMP_Log(RTMP_LOGERROR,"rtmp read data failed\n");
            break;
        }

        //讀取4位元組的tagsize大小
        if(fread(&pretagsize,1,4,m_pFp) != 4) break;
        pretagsize = HTON32(pretagsize);

        if(pretagsize != length + 11) break;



        m_pRtmpPkt->m_headerType = RTMP_PACKET_SIZE_LARGE;
        m_pRtmpPkt->m_nTimeStamp = timestamp; //時間戳
        m_pRtmpPkt->m_nBodySize = length; //消息長度
        m_pRtmpPkt->m_packetType = type; //消息類型
        m_pRtmpPkt->m_nInfoField2 = m_pRtmp->m_stream_id;  //消息流id

        unsigned int delay = timestamp - pretime;


        if(type == 0x08)
        {
            qDebug()<<"type="<<type<<" audionum="<<audionum++
                   <<" length="<<length<<" tagsize="<<pretagsize<<" timestamp="<<timestamp
                  <<" delay="<<delay<<" streamid="<<streamid
                  <<" m_pRtmp->m_stream_id"<<m_pRtmp->m_stream_id;
        }
        
        if(type == 0x09)
        {
            qDebug()<<"type="<<type<<" videonum="<<videonum++
                   <<" length="<<length<<" tagsize="<<pretagsize<<" timestamp="<<timestamp
                  <<" delay="<<delay<<" streamid="<<streamid
                  <<" m_pRtmp->m_stream_id"<<m_pRtmp->m_stream_id;
        }

        std::this_thread::sleep_for(std::chrono::milliseconds(delay));

        //判斷rtmp連接配接是否正常
        if(!RTMP_IsConnected(m_pRtmp))
        {
            RTMP_Log(RTMP_LOGERROR,"rtmp is not connect\n");
            break;
        }
        int result = RTMP_SendPacket(m_pRtmp,m_pRtmpPkt,0);
        pretime = timestamp;
    }
}

           

main.cpp

#include "rtmppusher.h"

#include <string>
#include <iostream>

using namespace std;

int main()
{

    RTMPPusher rtmppush;
    string filepath = "/home/ubuntu/data/video/source.200kbps.768x320.flv";
    string url = "rtmp://192.168.0.10:2020/live/livestream";
    int size = 64 * 1024;
    bool isinit = rtmppush.InitRtmp(filepath,url,size);
    cout<<"init statu="<<isinit<<endl;
    bool isreadhead = false;
    if(isinit)
    {
        isreadhead = rtmppush.ReadHeaderAndMetadata();
    }

    cout<<"readhead success"<<endl;
    if(!isreadhead) return 0;

    rtmppush.start();
    rtmppush.wait();
    return 0;
}

           

源代碼下載下傳

參考

  1. rtmpdump2.3 中rtmpdump.c

繼續閱讀