test Model),x265将并行计算发挥地更加淋淋尽致。在main()函数中,为了完成多线程计算,读完24帧输入帧后才开始编码的原因也基于此。
Parallel Processing, WPP),这是对于相互具有依赖关系的图像单元进行数据并行处理的方法。在HEVC中,并行处理技术主要包括:基于Tile的并行和波前并行两种。在进行基于Tile的并行时,由于Tile的相互独立性,不需要考虑它们之间的相互依赖关系,而在进行波前并行处理时,数据间的相互依赖关系是必不可少的。
/*=============================================================*/ /* ====== Analysed by: RuiDong Fang ====== Csdn Blog: http://blog.csdn.net/frd2009041510 ====== Date: 2016.04.15 ====== Funtion: startCompressFrame()函数,触发线程。 */ /*=============================================================*/ bool FrameEncoder::startCompressFrame(Frame* curFrame) { m_slicetypeWaitTime = x265_mdate() - m_prevOutputTime; m_frame = curFrame; m_param = curFrame->m_param; m_sliceType = curFrame->m_lowres.sliceType; curFrame->m_encData->m_frameEncoderID = m_jpId; curFrame->m_encData->m_jobProvider = this; curFrame->m_encData->m_slice->m_mref = m_mref; if (!m_cuGeoms) { if (!initializeGeoms()) return false; } m_enable.trigger(); //触发线程,下一步将会进入threadMain()函数中 return true; }
void Encoder::create() { if (!primitives.pu[0].sad) { // this should be an impossible condition when using our public API, and indicates a serious bug. x265_log(m_param, X265_LOG_ERROR, "Primitives must be initialized before encoder is created\n"); abort(); } x265_param* p = m_param; int rows = (p->sourceHeight + p->maxCUSize - 1) >> g_log2Size[p->maxCUSize]; int cols = (p->sourceWidth + p->maxCUSize - 1) >> g_log2Size[p->maxCUSize]; // Do not allow WPP if only one row or fewer than 3 columns, it is pointless and unstable //对于不符合条件的,不进行WPP if (rows == 1 || cols < 3) { x265_log(p, X265_LOG_WARNING, "Too few rows/columns, --wpp disabled\n"); p->bEnableWavefront = 0; } bool allowPools = !p->numaPools || strcmp(p->numaPools, "none"); // Trim the thread pool if --wpp, --pme, and --pmode are disabled //如果--wpp, --pme, and --pmode不使能,清理线程池 if (!p->bEnableWavefront && !p->bDistributeModeAnalysis && !p->bDistributeMotionEstimation && !p->lookaheadSlices) allowPools = false; //根据核数检测线程的数目 if (!p->frameNumThreads) { // auto-detect frame threads int cpuCount = ThreadPool::getCpuCount(); if (!p->bEnableWavefront) p->frameNumThreads = X265_MIN3(cpuCount, (rows + 1) / 2, X265_MAX_FRAME_THREADS); else if (cpuCount >= 32) p->frameNumThreads = (p->sourceHeight > 2000) ? 8 : 6; // dual-socket 10-core IvyBridge or higher else if (cpuCount >= 16) p->frameNumThreads = 5; // 8 HT cores, or dual socket else if (cpuCount >= 8) p->frameNumThreads = 3; // 4 HT cores else if (cpuCount >= 4) p->frameNumThreads = 2; // Dual or Quad core else p->frameNumThreads = 1; } m_numPools = 0; if (allowPools) m_threadPool = ThreadPool::allocThreadPools(p, m_numPools); if (!m_numPools) { // issue warnings if any of these features were requested if (p->bEnableWavefront) x265_log(p, X265_LOG_WARNING, "No thread pool allocated, --wpp disabled\n"); if (p->bDistributeMotionEstimation) x265_log(p, X265_LOG_WARNING, "No thread pool allocated, --pme disabled\n"); if (p->bDistributeModeAnalysis) x265_log(p, X265_LOG_WARNING, "No thread pool allocated, --pmode disabled\n"); if (p->lookaheadSlices) x265_log(p, X265_LOG_WARNING, "No thread pool allocated, --lookahead-slices disabled\n"); // disable all pool features if the thread pool is disabled or unusable. p->bEnableWavefront = p->bDistributeModeAnalysis = p->bDistributeMotionEstimation = p->lookaheadSlices = 0; } if (!p->bEnableWavefront && p->rc.vbvBufferSize) { x265_log(p, X265_LOG_ERROR, "VBV requires wavefront parallelism\n"); m_aborted = true; } char buf[128]; int len = 0; if (p->bEnableWavefront) len += sprintf(buf + len, "wpp(%d rows)", rows); if (p->bDistributeModeAnalysis) len += sprintf(buf + len, "%spmode", len ? "+" : ""); if (p->bDistributeMotionEstimation) len += sprintf(buf + len, "%spme ", len ? "+" : ""); if (!len) strcpy(buf, "none"); x265_log(p, X265_LOG_INFO, "frame threads / pool features : %d / %s\n", p->frameNumThreads, buf); for (int i = 0; i < m_param->frameNumThreads; i++) { m_frameEncoder[i] = new FrameEncoder; m_frameEncoder[i]->m_nalList.m_annexB = !!m_param->bAnnexB; } if (m_numPools) { for (int i = 0; i < m_param->frameNumThreads; i++) { int pool = i % m_numPools; m_frameEncoder[i]->m_pool = &m_threadPool[pool]; m_frameEncoder[i]->m_jpId = m_threadPool[pool].m_numProviders++; m_threadPool[pool].m_jpTable[m_frameEncoder[i]->m_jpId] = m_frameEncoder[i]; } for (int i = 0; i < m_numPools; i++) m_threadPool[i].start(); } else { /* CU stats and noise-reduction buffers are indexed by jpId, so it cannot be left as -1 */ for (int i = 0; i < m_param->frameNumThreads; i++) m_frameEncoder[i]->m_jpId = 0; } if (!m_scalingList.init()) { x265_log(m_param, X265_LOG_ERROR, "Unable to allocate scaling list arrays\n"); m_aborted = true; } else if (!m_param->scalingLists || !strcmp(m_param->scalingLists, "off")) m_scalingList.m_bEnabled = false; else if (!strcmp(m_param->scalingLists, "default")) m_scalingList.setDefaultScalingList(); else if (m_scalingList.parseScalingList(m_param->scalingLists)) m_aborted = true; m_scalingList.setupQuantMatrices(); m_lookahead = new Lookahead(m_param, m_threadPool); if (m_numPools) { m_lookahead->m_jpId = m_threadPool[0].m_numProviders++; m_threadPool[0].m_jpTable[m_lookahead->m_jpId] = m_lookahead; } m_dpb = new DPB(m_param); m_rateControl = new RateControl(*m_param); initVPS(&m_vps); initSPS(&m_sps); initPPS(&m_pps); int numRows = (m_param->sourceHeight + g_maxCUSize - 1) / g_maxCUSize; int numCols = (m_param->sourceWidth + g_maxCUSize - 1) / g_maxCUSize; for (int i = 0; i < m_param->frameNumThreads; i++) { if (!m_frameEncoder[i]->init(this, numRows, numCols)) { x265_log(m_param, X265_LOG_ERROR, "Unable to initialize frame encoder, aborting\n"); m_aborted = true; } } for (int i = 0; i < m_param->frameNumThreads; i++) { m_frameEncoder[i]->start(); m_frameEncoder[i]->m_done.wait(); /* wait for thread to initialize */ //========调用threadMain() } if (m_param->bEmitHRDSEI) m_rateControl->initHRD(m_sps); if (!m_rateControl->init(m_sps)) m_aborted = true; if (!m_lookahead->create()) m_aborted = true; if (m_param->analysisMode) { const char* name = m_param->analysisFileName; if (!name) name = defaultAnalysisFileName; const char* mode = m_param->analysisMode == X265_ANALYSIS_LOAD ? "rb" : "wb"; m_analysisFile = fopen(name, mode); if (!m_analysisFile) { x265_log(NULL, X265_LOG_ERROR, "Analysis load/save: failed to open file %s\n", name); m_aborted = true; } } m_bZeroLatency = !m_param->bframes && !m_param->lookaheadDepth && m_param->frameNumThreads == 1; m_aborted |= parseLambdaFile(m_param); m_encodeStartTime = x265_mdate(); m_nalList.m_annexB = !!m_param->bAnnexB; }
void FrameEncoder::threadMain() { THREAD_NAME("Frame", m_jpId); if (m_pool) //若线程池不为空 { m_pool->setCurrentThreadAffinity(); //设置当前线程 /* the first FE on each NUMA node is responsible for allocating thread * local data for all worker threads in that pool. If WPP is disabled, then * each FE also needs a TLD instance */ if (!m_jpId) { int numTLD = m_pool->m_numWorkers; if (!m_param->bEnableWavefront) numTLD += m_pool->m_numProviders; m_tld = new ThreadLocalData[numTLD]; for (int i = 0; i < numTLD; i++) { m_tld[i].analysis.initSearch(*m_param, m_top->m_scalingList); m_tld[i].analysis.create(m_tld); } for (int i = 0; i < m_pool->m_numProviders; i++) { if (m_pool->m_jpTable[i]->m_isFrameEncoder) /* ugh; over-allocation and other issues here */ { FrameEncoder *peer = dynamic_cast<FrameEncoder*>(m_pool->m_jpTable[i]); peer->m_tld = m_tld; } } } if (m_param->bEnableWavefront) m_localTldIdx = -1; // cause exception if used else m_localTldIdx = m_pool->m_numWorkers + m_jpId; } else //若线程池为空,则WPP { m_tld = new ThreadLocalData; m_tld->analysis.initSearch(*m_param, m_top->m_scalingList); m_tld->analysis.create(NULL); m_localTldIdx = 0; } m_done.trigger(); /* signal that thread is initialized */ //线程已经触发 m_enable.wait(); /* Encoder::encode() triggers this event */ //等待处理 while (m_threadActive) { compressFrame(); //=====================调用compressFrame() m_done.trigger(); /* FrameEncoder::getEncodedPicture() blocks for this event */ m_enable.wait(); } }