x265探索与研究(八):x265中的并行处理机制函数关系分析

x265探索与研究(八):x265中的并行处理机制函数关系分析

 

        HEVC的高计算复杂度如果仅仅依赖于单核处理器计算能力的提高,其代价是非常昂贵的,为此,HEVC的设计充分考虑到了并行计算的需求。x265不同于HM(HEVC
test Model),x265将并行计算发挥地更加淋淋尽致。在main()函数中,为了完成多线程计算,读完24帧输入帧后才开始编码的原因也基于此。

 

        为了理清x265中的并行处理机制,首先给出了如下图的并行处理函数关系图:

 

        经过前面几篇文章的分析,我们知道main()函数主要调用了encoder_open()函数、encoder_headers()函数、encoder_encode()函数与encoder_close()函数。其中:

(1)encoder_encode()函数调用了encode()函数,而encode()函数中调用了startCompressFrame()函数,在startCompressFrame()函数中,采用m_enable.trigger完成了触发线程的功能;

(2)encoder_open()函数调用了encoder_create()函数,在encoder_create()函数中等待线程的初始化并进入threadMain()函数中等待线程的触发,线程一旦触发则调用compressFrame函数进行进一步地编码工作。

        这两步的协调工作完美地成就了多线程的实现。为了进一步分析HEVC的并行处理机制,接下来首先分析视频编码的并行处理相关技术,然后依次分析该函数关系图中较为重要的startCompressFrame()函数、encoder_create()函数以及threadMain()函数。

 

1、并行处理技术

 

        并行处理一般指许多指令得以同时进行的处理模式,通常分为两种:功能并行和数据并行。

(1)功能并行是指将应用程序划分成互相独立的功能模块,每个模块间可以并行地执行,这种并行方式也称为流水线型并行,它将各个独立的模块划分给不同的运算单元,各个模块之间通过流的方式来进行数据交换和通信,最终再将各个单元串接在一起。功能并行充分利用了时间上的并行性来获得加速的效果,比较适合于硬件实现。功能并行的缺点是很明显的,a、由于分配给不同运算单元的功能模块是不同的,因此很容易产生载荷失衡问题;b、功能并行还需要在不同运算单元间进行数据通信,当数据量较大时,需要花费额外的资源来进行存储;c、另外,功能并行的扩展性较差。

(2)数据并行是将数据信息划分为相互独立的部分,每一部分交给不同的运算单元来执行,从而实现并行处理。在这种方式下,不同运算单元上执行的程序是相同的,而且处理的是相互独立的数据信息,因此不需要进行运算单元间的通信;数据并行也具有较好的扩展性,易于软件实现。

 

        HEVC/H.265提供了适用于进行数据并行处理的结构单元,如片和Tile,在不同的片和Tile中,数据信息是相互独立的,这样有利于将其分配给不同的运算单元来处理;对于小于片和Tile的划分单元,HEVC支持波前并行处理(Wavefront
Parallel Processing, WPP),这是对于相互具有依赖关系的图像单元进行数据并行处理的方法。在HEVC中,并行处理技术主要包括:基于Tile的并行和波前并行两种。在进行基于Tile的并行时,由于Tile的相互独立性,不需要考虑它们之间的相互依赖关系,而在进行波前并行处理时,数据间的相互依赖关系是必不可少的。

 

2、startCompressFrame()函数的分析

 

        startCompressFrame()函数的主要功能就是触发线程,对应的代码分析如下:

 

/*=============================================================*/
/*
 ====== Analysed by: RuiDong Fang
 ====== Csdn Blog:	 http://blog.csdn.net/frd2009041510
 ====== Date:		 2016.04.15
 ====== Funtion:	 startCompressFrame()函数,触发线程。
 */
/*=============================================================*/
bool FrameEncoder::startCompressFrame(Frame* curFrame)
{
    m_slicetypeWaitTime = x265_mdate() - m_prevOutputTime;
    m_frame = curFrame;
    m_param = curFrame->m_param;
    m_sliceType = curFrame->m_lowres.sliceType;
    curFrame->m_encData->m_frameEncoderID = m_jpId;
    curFrame->m_encData->m_jobProvider = this;
    curFrame->m_encData->m_slice->m_mref = m_mref;

    if (!m_cuGeoms)
    {
        if (!initializeGeoms())
            return false;
    }

    m_enable.trigger();	//触发线程,下一步将会进入threadMain()函数中
    return true;
}

 

3、encoder_create()函数的分析

 

        encoder_create()函数的主要功能是检测线程池、可用的线程数目等等,若线程使用的条件符合则调用threadMain()函数,对应的代码分析如下:

 

void Encoder::create()
{
    if (!primitives.pu[0].sad)
    {
        // this should be an impossible condition when using our public API, and indicates a serious bug.
        x265_log(m_param, X265_LOG_ERROR, "Primitives must be initialized before encoder is created\n");
        abort();
    }

    x265_param* p = m_param;

    int rows = (p->sourceHeight + p->maxCUSize - 1) >> g_log2Size[p->maxCUSize];
    int cols = (p->sourceWidth  + p->maxCUSize - 1) >> g_log2Size[p->maxCUSize];

    // Do not allow WPP if only one row or fewer than 3 columns, it is pointless and unstable
	//对于不符合条件的,不进行WPP
    if (rows == 1 || cols < 3)
    {
        x265_log(p, X265_LOG_WARNING, "Too few rows/columns, --wpp disabled\n");
        p->bEnableWavefront = 0;
    }

    bool allowPools = !p->numaPools || strcmp(p->numaPools, "none");

    // Trim the thread pool if --wpp, --pme, and --pmode are disabled
	//如果--wpp, --pme, and --pmode不使能,清理线程池
    if (!p->bEnableWavefront && !p->bDistributeModeAnalysis && !p->bDistributeMotionEstimation && !p->lookaheadSlices)
        allowPools = false;

	//根据核数检测线程的数目
    if (!p->frameNumThreads)
    {
        // auto-detect frame threads
        int cpuCount = ThreadPool::getCpuCount();
        if (!p->bEnableWavefront)
            p->frameNumThreads = X265_MIN3(cpuCount, (rows + 1) / 2, X265_MAX_FRAME_THREADS);
        else if (cpuCount >= 32)
            p->frameNumThreads = (p->sourceHeight > 2000) ? 8 : 6; // dual-socket 10-core IvyBridge or higher
        else if (cpuCount >= 16)
            p->frameNumThreads = 5; // 8 HT cores, or dual socket
        else if (cpuCount >= 8)
            p->frameNumThreads = 3; // 4 HT cores
        else if (cpuCount >= 4)
            p->frameNumThreads = 2; // Dual or Quad core
        else
            p->frameNumThreads = 1;
    }

    m_numPools = 0;
    if (allowPools)
        m_threadPool = ThreadPool::allocThreadPools(p, m_numPools);

    if (!m_numPools)
    {
        // issue warnings if any of these features were requested
        if (p->bEnableWavefront)
            x265_log(p, X265_LOG_WARNING, "No thread pool allocated, --wpp disabled\n");
        if (p->bDistributeMotionEstimation)
            x265_log(p, X265_LOG_WARNING, "No thread pool allocated, --pme disabled\n");
        if (p->bDistributeModeAnalysis)
            x265_log(p, X265_LOG_WARNING, "No thread pool allocated, --pmode disabled\n");
        if (p->lookaheadSlices)
            x265_log(p, X265_LOG_WARNING, "No thread pool allocated, --lookahead-slices disabled\n");

        // disable all pool features if the thread pool is disabled or unusable.
        p->bEnableWavefront = p->bDistributeModeAnalysis = p->bDistributeMotionEstimation = p->lookaheadSlices = 0;
    }

    if (!p->bEnableWavefront && p->rc.vbvBufferSize)
    {
        x265_log(p, X265_LOG_ERROR, "VBV requires wavefront parallelism\n");
        m_aborted = true;
    }

    char buf[128];
    int len = 0;
    if (p->bEnableWavefront)
        len += sprintf(buf + len, "wpp(%d rows)", rows);
    if (p->bDistributeModeAnalysis)
        len += sprintf(buf + len, "%spmode", len ? "+" : "");
    if (p->bDistributeMotionEstimation)
        len += sprintf(buf + len, "%spme ", len ? "+" : "");
    if (!len)
        strcpy(buf, "none");

    x265_log(p, X265_LOG_INFO, "frame threads / pool features       : %d / %s\n", p->frameNumThreads, buf);

    for (int i = 0; i < m_param->frameNumThreads; i++)
    {
        m_frameEncoder[i] = new FrameEncoder;
        m_frameEncoder[i]->m_nalList.m_annexB = !!m_param->bAnnexB;
    }

    if (m_numPools)
    {
        for (int i = 0; i < m_param->frameNumThreads; i++)
        {
            int pool = i % m_numPools;
            m_frameEncoder[i]->m_pool = &m_threadPool[pool];
            m_frameEncoder[i]->m_jpId = m_threadPool[pool].m_numProviders++;
            m_threadPool[pool].m_jpTable[m_frameEncoder[i]->m_jpId] = m_frameEncoder[i];
        }
        for (int i = 0; i < m_numPools; i++)
            m_threadPool[i].start();
    }
    else
    {
        /* CU stats and noise-reduction buffers are indexed by jpId, so it cannot be left as -1 */
        for (int i = 0; i < m_param->frameNumThreads; i++)
            m_frameEncoder[i]->m_jpId = 0;
    }

    if (!m_scalingList.init())
    {
        x265_log(m_param, X265_LOG_ERROR, "Unable to allocate scaling list arrays\n");
        m_aborted = true;
    }
    else if (!m_param->scalingLists || !strcmp(m_param->scalingLists, "off"))
        m_scalingList.m_bEnabled = false;
    else if (!strcmp(m_param->scalingLists, "default"))
        m_scalingList.setDefaultScalingList();
    else if (m_scalingList.parseScalingList(m_param->scalingLists))
        m_aborted = true;
    m_scalingList.setupQuantMatrices();

    m_lookahead = new Lookahead(m_param, m_threadPool);
    if (m_numPools)
    {
        m_lookahead->m_jpId = m_threadPool[0].m_numProviders++;
        m_threadPool[0].m_jpTable[m_lookahead->m_jpId] = m_lookahead;
    }

    m_dpb = new DPB(m_param);
    m_rateControl = new RateControl(*m_param);

    initVPS(&m_vps);
    initSPS(&m_sps);
    initPPS(&m_pps);

    int numRows = (m_param->sourceHeight + g_maxCUSize - 1) / g_maxCUSize;
    int numCols = (m_param->sourceWidth  + g_maxCUSize - 1) / g_maxCUSize;
    for (int i = 0; i < m_param->frameNumThreads; i++)
    {
        if (!m_frameEncoder[i]->init(this, numRows, numCols))
        {
            x265_log(m_param, X265_LOG_ERROR, "Unable to initialize frame encoder, aborting\n");
            m_aborted = true;
        }
    }

    for (int i = 0; i < m_param->frameNumThreads; i++)
    {
        m_frameEncoder[i]->start();
        m_frameEncoder[i]->m_done.wait(); /* wait for thread to initialize */	//========调用threadMain()
    }

    if (m_param->bEmitHRDSEI)
        m_rateControl->initHRD(m_sps);
    if (!m_rateControl->init(m_sps))
        m_aborted = true;
    if (!m_lookahead->create())
        m_aborted = true;

    if (m_param->analysisMode)
    {
        const char* name = m_param->analysisFileName;
        if (!name)
            name = defaultAnalysisFileName;
        const char* mode = m_param->analysisMode == X265_ANALYSIS_LOAD ? "rb" : "wb";
        m_analysisFile = fopen(name, mode);
        if (!m_analysisFile)
        {
            x265_log(NULL, X265_LOG_ERROR, "Analysis load/save: failed to open file %s\n", name);
            m_aborted = true;
        }
    }

    m_bZeroLatency = !m_param->bframes && !m_param->lookaheadDepth && m_param->frameNumThreads == 1;

    m_aborted |= parseLambdaFile(m_param);

    m_encodeStartTime = x265_mdate();

    m_nalList.m_annexB = !!m_param->bAnnexB;
}

 

4、threadMain()函数的分析

 

        threadMain()函数相当于线程函数的main()函数,其主要功能就是在完成线程触发后等待处理,在此处,调用了compressFrame()函数。

 

        对应的代码分析如下:

void FrameEncoder::threadMain()
{
    THREAD_NAME("Frame", m_jpId);

    if (m_pool)	//若线程池不为空
    {
        m_pool->setCurrentThreadAffinity();	//设置当前线程

        /* the first FE on each NUMA node is responsible for allocating thread
         * local data for all worker threads in that pool. If WPP is disabled, then
         * each FE also needs a TLD instance */
        if (!m_jpId)
        {
            int numTLD = m_pool->m_numWorkers;
            if (!m_param->bEnableWavefront)
                numTLD += m_pool->m_numProviders;

            m_tld = new ThreadLocalData[numTLD];
            for (int i = 0; i < numTLD; i++)
            {
                m_tld[i].analysis.initSearch(*m_param, m_top->m_scalingList);
                m_tld[i].analysis.create(m_tld);
            }

            for (int i = 0; i < m_pool->m_numProviders; i++)
            {
                if (m_pool->m_jpTable[i]->m_isFrameEncoder) /* ugh; over-allocation and other issues here */
                {
                    FrameEncoder *peer = dynamic_cast<FrameEncoder*>(m_pool->m_jpTable[i]);
                    peer->m_tld = m_tld;
                }
            }
        }

        if (m_param->bEnableWavefront)
            m_localTldIdx = -1; // cause exception if used
        else
            m_localTldIdx = m_pool->m_numWorkers + m_jpId;
    }
    else	//若线程池为空,则WPP
    {
        m_tld = new ThreadLocalData;
        m_tld->analysis.initSearch(*m_param, m_top->m_scalingList);
        m_tld->analysis.create(NULL);
        m_localTldIdx = 0;
    }

    m_done.trigger();     /* signal that thread is initialized */ //线程已经触发
    m_enable.wait();      /* Encoder::encode() triggers this event */	//等待处理

    while (m_threadActive)
    {
        compressFrame();	//=====================调用compressFrame()
        m_done.trigger(); /* FrameEncoder::getEncodedPicture() blocks for this event */
        m_enable.wait();
    }
}
时间: 2025-01-02 12:49:41

x265探索与研究(八):x265中的并行处理机制函数关系分析的相关文章

x265探索与研究(九):compressFrame()函数

x265探索与研究(九):compressFrame()函数           compressFrame()函数是一个功能繁杂且分析难度较大的函数,主要包括时间戳的初始化工作.access unit的设计.加权预测技术.运动参考帧的估计.当前Slice的QP值确定.熵编码相关信息配置.并行计算与否及其空间的申请.SEI相关配置.线程控制.CTU分析.Multi-pass Encoding.滤波与去噪处理等等,其中最重要的就是调用了encodeSlice()函数.           comp

x265探索与研究(七):encode()函数

x265探索与研究(七):encode()函数           在x265中,main()函数中调用了encoder_encode()函数,而encoder_encode()函数调用了encode()函数,encode()函数的主要功能是输入一帧图像,得到一帧图像的输出.           encode()函数主要包括大致三个部分: (1)分析是否由于错误造成的代码终止,如g_checkFailures.m_aborted. (2)判断是否有输入帧,若有,则判断该输入帧的像素深度.颜色空间

x265探索与研究(六):main()函数

x265探索与研究(六):main()函数           x265源码的入口函数是main(),本文分析main()的主要功能.首先给出main()函数的功能及其代码结构:其次给出main()函数源码以及分析:最后给出main()函数中的主要功能函数的具体功能.   1.main()函数的功能及其代码结构           main()函数的主要功能是解析参数并进行编码的一些准备工作,调用了如下几个重要的函数: (1)cliopt.parse()函数:解析参数 (2)api->encod

x265探索与研究(十):encodeSlice()函数、encodeCTU()函数、encodeCU()函数与finishCU()函数分析

x265探索与研究(十):encodeSlice()函数.encodeCTU()函数.encodeCU()函数与finishCU()函数分析           encodeSlice()函数.encodeCTU()函数.encodeCU()函数与finishCU()函数都是编码的重要函数,根据函数命名均可得知其各自的功能.下面首先给出这几个函数之间的调用关系图.       1.encodeSlice()函数           encodeSlice()函数位于frameencoder.cp

x265探索与研究(一):x265下载安装与配置

x265下载安装与配置         研究了这么久的HEVC Test Model(HM),相信大家对x265开源代码的实现与框架早就充满了好奇,接下来的日子,我将把自己入手学习与探索"x265开源代码的实现与框架"的过程记录下来,与大家共同进步学习. 1.x265下载地址与参考资料 x265的官网为: http://x265.org/ x265下载地址: https://bitbucket.org/multicoreware/x265/downloads 或 http://ftp.

php中的异常机制理解分析

    异常本身的语法并不值得讨论,异常的使用场景才是主要的,这里我对比php和java,来看看php里的异常到底是怎么回事,异常到底应该怎么用.       看到了PPC论坛上的这篇讨论,觉得很有价值,我重新整理了下我的观点,做个总结.       首先,需要说的是,这里的异常是指php的异常.因为php的异常和其它语言相比有着很大的不同.       php里的异常,是程序运行中的不符合预期的情况,即一种在程序执行流程里面允许发生,只是和正常流程不同的状况.它是一种不正常的情况,就是按照我们

x265探索与研究(五):如何用VS调试x265?

如何用VS调试x265? 1.设置cli为启动项         用VS打开工程项目,如下图:         右击cli,设置为启动项,如下图: 2.配置路径和命令行参数         右击cli选择Properties,如下图所示         进入如下界面:         将Command Arguments和Working Directory中的内容填写好即可.         示例如下: Command Arguments: --preset fast --input hall_

x265探索与研究(二):x265使用基本方法

x265使用基本方法         首先,完成x265的下载安装与配置. (参考网址:http://blog.csdn.net/frd2009041510/article/details/50446007) 接下来介绍x265编解码视频的基本方法. 第一步:         进入"...x265_1.8\build\vc10-x86",双击"build-all.bat",则进行编译.文件夹中的内容变化如下两图所示.      第二步:         用VS打开上

x265探索与研究(三):如何播放*.265格式的视频或解码视频流

如何播放*.265格式的视频或解码视频流               如下图,在得到.265格式的视频或视频流后应如何播放265格式的视频呢?本博文总结出5种播放265格式视频或视频流的基本方法. 方式一:Elecard HEVC Player Sample           软件下载地址:http://download.csdn.net/detail/frd2009041510/9387068         下载后直接双击安装即可,播放效果如下图所示. 方式二:GitlHEVCAnalyz