VirtualBox

Changeset 39620 in vbox for trunk/src/VBox


Ignore:
Timestamp:
Dec 15, 2011 1:10:48 AM (13 years ago)
Author:
vboxsync
svn:sync-xref-src-repo-rev:
75401
Message:

more reqpool code.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/src/VBox/Runtime/common/misc/reqpool.cpp

    r39617 r39620  
    4545#include "internal/req.h"
    4646#include "internal/magics.h"
     47
     48
     49/*******************************************************************************
     50*   Defined Constants And Macros                                               *
     51*******************************************************************************/
     52/** The max number of worker threads. */
     53#define RTREQPOOL_MAX_THREADS           UINT32_C(16384)
     54/** The max number of milliseconds to push back. */
     55#define RTREQPOOL_PUSH_BACK_MAX_MS      RT_MS_1MIN
     56/** The max number of free requests to keep around. */
     57#define RTREQPOOL_MAX_FREE_REQUESTS     (RTREQPOOL_MAX_THREADS * 2U)
    4758
    4859
     
    104115    /** The maximum number of worker threads. */
    105116    uint32_t                cMaxThreads;
    106     /** The number of threads which should be spawned before throttling kicks
    107      * in. */
    108     uint32_t                cThreadsThreshold;
    109117    /** The minimum number of worker threads. */
    110118    uint32_t                cMinThreads;
     
    115123    uint64_t                cNsMinIdle;
    116124    /** The idle thread sleep interval in milliseconds. */
    117     uint32_t                cMsIdleSleep;
     125    RTMSINTERVAL            cMsIdleSleep;
     126    /** The number of threads which should be spawned before throttling kicks
     127     * in. */
     128    uint32_t                cThreadsPushBackThreshold;
    118129    /** The max number of milliseconds to push back a submitter before creating
    119130     * a new worker thread once the threshold has been reached. */
     
    144155    RTLISTANCHOR            WorkerThreads;
    145156
     157    /** The number of requests processed and counted in the time totals. */
     158    uint64_t                cReqProcessed;
     159    /** Total time the requests processed by this thread took to process. */
     160    uint64_t                cNsTotalReqProcessing;
     161    /** Total time the requests processed by this thread had to wait in
     162     * the queue before being scheduled. */
     163    uint64_t                cNsTotalReqQueued;
     164
    146165    /** Reference counter. */
    147166    uint32_t volatile       cRefs;
     
    157176    /** Where to insert the next request. */
    158177    PRTREQINT              *ppPendingRequests;
     178    /** The number of requests currently pending. */
     179    uint32_t                cCurPendingRequests;
     180    /** The number of requests currently being executed. */
     181    uint32_t volatile       cCurActiveRequests;
     182    /** The number of requests submitted. */
     183    uint64_t                cReqSubmitted;
    159184
    160185    /** Head of the request recycling LIFO. */
     
    197222{
    198223    uint32_t const cMsRange = pPool->cMsMaxPushBack - pPool->cMsMinPushBack;
    199     uint32_t const cSteps   = pPool->cMaxThreads - pPool->cThreadsThreshold;
    200     uint32_t const iStep    = pPool->cCurThreads - pPool->cThreadsThreshold;
     224    uint32_t const cSteps   = pPool->cMaxThreads - pPool->cThreadsPushBackThreshold;
     225    uint32_t const iStep    = pPool->cCurThreads - pPool->cThreadsPushBackThreshold;
    201226
    202227    uint32_t cMsCurPushBack;
     
    262287
    263288
    264 static void rtReqPoolThreadProcessRequest(PRTREQPOOLTHREAD pThread, PRTREQINT pReq)
     289/**
     290 * Process one request.
     291 *
     292 * @param   pPool               The pool.
     293 * @param   pThread             The worker thread.
     294 * @param   pReq                The request to process.
     295 */
     296static void rtReqPoolThreadProcessRequest(PRTREQPOOLINT pPool, PRTREQPOOLTHREAD pThread, PRTREQINT pReq)
    265297{
    266298    /*
     
    270302    pThread->uPendingNanoTs     = pReq->uSubmitNanoTs;
    271303    pThread->pPendingReq        = pReq;
     304    ASMAtomicIncU32(&pPool->cCurActiveRequests);
    272305    Assert(pReq->u32Magic == RTREQ_MAGIC);
    273306
     
    275308     * Do the actual processing.
    276309     */
    277     /** @todo  */
     310    rtReqProcessOne(pReq);
    278311
    279312    /*
    280313     * Update thread statistics and state.
    281314     */
     315    ASMAtomicDecU32(&pPool->cCurActiveRequests);
     316    pThread->pPendingReq    = NULL;
    282317    uint64_t const uNsTsEnd = RTTimeNanoTS();
    283318    pThread->cNsTotalReqProcessing += uNsTsEnd - pThread->uProcessingNanoTs;
     
    303338     * The work loop.
    304339     */
    305     uint64_t cPrevReqProcessed = UINT64_MAX;
     340    uint64_t cReqPrevProcessedIdle     = UINT64_MAX;
     341    uint64_t cReqPrevProcessedStat     = 0;
     342    uint64_t cNsPrevTotalReqProcessing = 0;
     343    uint64_t cNsPrevTotalReqQueued     = 0;
    306344    while (!pPool->fDestructing)
    307345    {
     
    315353        {
    316354            Assert(RTListIsEmpty(&pThread->IdleNode)); /* Must not be in the idle list. */
    317             rtReqPoolThreadProcessRequest(pThread, pReq);
     355            rtReqPoolThreadProcessRequest(pPool, pThread, pReq);
    318356            continue;
    319357        }
     
    321359        ASMAtomicIncU32(&pPool->cIdleThreads);
    322360        RTCritSectEnter(&pPool->CritSect);
     361
     362        /* Update the global statistics. */
     363        if (cReqPrevProcessedStat != pThread->cReqProcessed)
     364        {
     365            pPool->cReqProcessed          = pThread->cReqProcessed         - cReqPrevProcessedStat;
     366            cReqPrevProcessedStat         = pThread->cReqProcessed;
     367            pPool->cNsTotalReqProcessing += pThread->cNsTotalReqProcessing - cNsPrevTotalReqProcessing;
     368            cNsPrevTotalReqProcessing     = pThread->cNsTotalReqProcessing;
     369            pPool->cNsTotalReqQueued     += pThread->cNsTotalReqQueued     - cNsPrevTotalReqQueued;
     370            cNsPrevTotalReqQueued         = pThread->cNsTotalReqQueued;
     371        }
    323372
    324373        /* Recheck the todo request pointer after entering the critsect. */
     
    329378            RTCritSectLeave(&pPool->CritSect);
    330379
    331             rtReqPoolThreadProcessRequest(pThread, pReq);
     380            rtReqPoolThreadProcessRequest(pPool, pThread, pReq);
    332381            continue;
    333382        }
     
    340389            if (pReq->pNext == NULL)
    341390                pPool->ppPendingRequests = &pPool->pPendingRequests;
     391            Assert(pPool->cCurPendingRequests > 0);
     392            pPool->cCurPendingRequests--;
    342393
    343394            /* Un-idle ourselves and process the request. */
     
    351402            RTCritSectLeave(&pPool->CritSect);
    352403
    353             rtReqPoolThreadProcessRequest(pThread, pReq);
     404            rtReqPoolThreadProcessRequest(pPool, pThread, pReq);
    354405            continue;
    355406        }
     
    358409         * Nothing to do, go idle.
    359410         */
    360         if (cPrevReqProcessed != pThread->cReqProcessed)
    361         {
    362             pThread->cReqProcessed = cPrevReqProcessed;
    363             pThread->uIdleNanoTs   = RTTimeNanoTS();
     411        if (cReqPrevProcessedIdle != pThread->cReqProcessed)
     412        {
     413            cReqPrevProcessedIdle = pThread->cReqProcessed;
     414            pThread->uIdleNanoTs  = RTTimeNanoTS();
    364415        }
    365416        else if (pPool->cCurThreads > pPool->cMinThreads)
     
    475526    RTCritSectEnter(&pPool->CritSect);
    476527
     528    pPool->cReqSubmitted++;
     529
    477530    /*
    478531     * Try schedule the request to a thread that's currently idle.
     
    481534    if (pThread)
    482535    {
    483         /** @todo CPU affinity... */
     536        /** @todo CPU affinity??? */
    484537        ASMAtomicWritePtr(&pThread->pTodoReq, pReq);
    485538
     
    501554    *pPool->ppPendingRequests = pReq;
    502555    pPool->ppPendingRequests  = (PRTREQINT*)&pReq->pNext;
     556    pPool->cCurPendingRequests++;
    503557
    504558    /*
     
    516570     * Push back before creating a new worker thread.
    517571     */
    518     if (   pPool->cCurThreads > pPool->cThreadsThreshold
     572    if (   pPool->cCurThreads > pPool->cThreadsPushBackThreshold
    519573        && (RTTimeNanoTS() - pReq->uSubmitNanoTs) / RT_NS_1MS >= pPool->cMsCurPushBack )
    520574    {
     
    563617}
    564618
    565 #if 0 /* later */
    566 
    567 typedef enum RTREQPOOLCFGVAR
    568 {
    569     RTREQPOOLCFGVAR_INVALID = 0,
    570     RTREQPOOLCFGVAR_END,
    571     RTREQPOOLCFGVAR_32BIT_HACK = 0x7fffffff
    572 } RTREQPOOLCFGVAR;
    573 
    574619
    575620RTDECL(int) RTReqPoolSetCfgVar(RTREQPOOL hPool, RTREQPOOLCFGVAR enmVar, uint64_t uValue)
    576621{
    577     return VERR_NOT_SUPPORTED;
    578 }
     622    PRTREQPOOLINT pPool = hPool;
     623    AssertPtrReturn(pPool, VERR_INVALID_HANDLE);
     624    AssertReturn(pPool->u32Magic == RTREQPOOL_MAGIC, VERR_INVALID_HANDLE);
     625    AssertReturn(enmVar > RTREQPOOLCFGVAR_INVALID && enmVar < RTREQPOOLCFGVAR_END, VERR_INVALID_PARAMETER);
     626
     627    RTCritSectEnter(&pPool->CritSect);
     628
     629    bool fWakeUpIdleThreads = false;
     630    int  rc                 = VINF_SUCCESS;
     631    switch (enmVar)
     632    {
     633        case RTREQPOOLCFGVAR_THREAD_TYPE:
     634            AssertMsgBreakStmt(uValue > (uint64_t)RTTHREADTYPE_INVALID && uValue < (uint64_t)RTTHREADTYPE_END,
     635                               ("%llu\n",  uValue), rc = VERR_OUT_OF_RANGE);
     636
     637            pPool->enmThreadType = (RTTHREADTYPE)uValue;
     638            break;
     639
     640        case RTREQPOOLCFGVAR_MIN_THREADS:
     641            AssertMsgBreakStmt(uValue <= RTREQPOOL_MAX_THREADS,  ("%llu\n",  uValue), rc = VERR_OUT_OF_RANGE);
     642            fWakeUpIdleThreads = pPool->cMinThreads > (uint32_t)uValue;
     643            pPool->cMinThreads = (uint32_t)uValue;
     644            if (pPool->cMinThreads > pPool->cMaxThreads)
     645                pPool->cMaxThreads = pPool->cMinThreads;
     646            if (   pPool->cThreadsPushBackThreshold < pPool->cMinThreads
     647                || pPool->cThreadsPushBackThreshold > pPool->cMaxThreads)
     648                pPool->cThreadsPushBackThreshold = pPool->cMinThreads + (pPool->cMaxThreads - pPool->cMinThreads) / 2;
     649            rtReqPoolRecalcPushBack(pPool);
     650            break;
     651
     652        case RTREQPOOLCFGVAR_MAX_THREADS:
     653            AssertMsgBreakStmt(uValue <= RTREQPOOL_MAX_THREADS && uValue >= 1,  ("%llu\n",  uValue), rc = VERR_OUT_OF_RANGE);
     654            pPool->cMaxThreads = (uint32_t)uValue;
     655            if (pPool->cMaxThreads < pPool->cMinThreads)
     656            {
     657                pPool->cMinThreads = pPool->cMaxThreads;
     658                fWakeUpIdleThreads = true;
     659            }
     660            if (pPool->cMaxThreads < pPool->cThreadsPushBackThreshold)
     661                pPool->cThreadsPushBackThreshold = pPool->cMinThreads + (pPool->cMaxThreads - pPool->cMinThreads) / 2;
     662            rtReqPoolRecalcPushBack(pPool);
     663            break;
     664
     665        case RTREQPOOLCFGVAR_MS_MIN_IDLE:
     666            AssertMsgBreakStmt(uValue < UINT32_MAX || uValue == RT_INDEFINITE_WAIT,  ("%llu\n",  uValue), rc = VERR_OUT_OF_RANGE);
     667            if (uValue < UINT32_MAX && uValue != RT_INDEFINITE_WAIT)
     668            {
     669                fWakeUpIdleThreads = pPool->cMsMinIdle != (uint32_t)uValue;
     670                pPool->cMsMinIdle = (uint32_t)uValue;
     671                pPool->cNsMinIdle = pPool->cMsMinIdle * RT_NS_1MS_64;
     672                if (pPool->cMsIdleSleep > pPool->cMsMinIdle)
     673                    pPool->cMsIdleSleep = RT_MAX(RT_MS_1SEC, pPool->cMsMinIdle);
     674            }
     675            else
     676            {
     677                pPool->cMsMinIdle   = UINT32_MAX;
     678                pPool->cNsMinIdle   = UINT64_MAX;
     679                pPool->cMsIdleSleep = RT_INDEFINITE_WAIT;
     680            }
     681            break;
     682
     683        case RTREQPOOLCFGVAR_MS_IDLE_SLEEP:
     684            AssertMsgBreakStmt(uValue <= RT_INDEFINITE_WAIT,  ("%llu\n",  uValue), rc = VERR_OUT_OF_RANGE);
     685            fWakeUpIdleThreads = pPool->cMsMinIdle > (RTMSINTERVAL)uValue;
     686            pPool->cMsIdleSleep = (RTMSINTERVAL)uValue;
     687            if (pPool->cMsIdleSleep == RT_INDEFINITE_WAIT)
     688            {
     689                pPool->cMsMinIdle = UINT32_MAX;
     690                pPool->cNsMinIdle = UINT64_MAX;
     691            }
     692            break;
     693
     694        case RTREQPOOLCFGVAR_PUSH_BACK_THRESHOLD:
     695            if (uValue == UINT64_MAX)
     696                pPool->cThreadsPushBackThreshold = pPool->cMaxThreads;
     697            else if (uValue == 0)
     698                pPool->cThreadsPushBackThreshold = pPool->cMinThreads;
     699            else
     700            {
     701                AssertMsgBreakStmt(uValue <= pPool->cMaxThreads,  ("%llu\n",  uValue), rc = VERR_OUT_OF_RANGE);
     702                AssertMsgBreakStmt(uValue >= pPool->cMinThreads,  ("%llu\n",  uValue), rc = VERR_OUT_OF_RANGE);
     703                pPool->cThreadsPushBackThreshold = (uint32_t)uValue;
     704            }
     705            break;
     706
     707        case RTREQPOOLCFGVAR_PUSH_BACK_MIN_MS:
     708            AssertMsgBreakStmt(uValue <= RTREQPOOL_PUSH_BACK_MAX_MS,  ("%llu\n",  uValue), rc = VERR_OUT_OF_RANGE);
     709            pPool->cMsMinPushBack = (uint32_t)uValue;
     710            if (pPool->cMsMaxPushBack < pPool->cMsMinPushBack)
     711                pPool->cMsMaxPushBack = pPool->cMsMinPushBack;
     712            rtReqPoolRecalcPushBack(pPool);
     713            break;
     714
     715        case RTREQPOOLCFGVAR_PUSH_BACK_MAX_MS:
     716            AssertMsgBreakStmt(uValue <= RTREQPOOL_PUSH_BACK_MAX_MS,  ("%llu\n",  uValue), rc = VERR_OUT_OF_RANGE);
     717            pPool->cMsMaxPushBack = (uint32_t)uValue;
     718            if (pPool->cMsMinPushBack < pPool->cMsMaxPushBack)
     719                pPool->cMsMinPushBack = pPool->cMsMaxPushBack;
     720            rtReqPoolRecalcPushBack(pPool);
     721            break;
     722
     723        case RTREQPOOLCFGVAR_MAX_FREE_REQUESTS:
     724            if (uValue == UINT64_MAX)
     725            {
     726                pPool->cMaxFreeRequests = pPool->cMaxThreads * 2;
     727                if (pPool->cMaxFreeRequests < 16)
     728                    pPool->cMaxFreeRequests = 16;
     729            }
     730            else
     731            {
     732                AssertMsgBreakStmt(uValue <= RTREQPOOL_MAX_FREE_REQUESTS,  ("%llu\n",  uValue), rc = VERR_OUT_OF_RANGE);
     733                pPool->cMaxFreeRequests = (uint32_t)uValue;
     734            }
     735
     736            while (pPool->cCurFreeRequests > pPool->cMaxFreeRequests)
     737            {
     738                PRTREQINT pReq = pPool->pFreeRequests;
     739                pPool->pFreeRequests = pReq->pNext;
     740                ASMAtomicDecU32(&pPool->cCurFreeRequests);
     741                rtReqFreeIt(pReq);
     742            }
     743            break;
     744
     745        default:
     746            AssertFailed();
     747            rc = VERR_IPE_NOT_REACHED_DEFAULT_CASE;
     748    }
     749
     750    /* Wake up all idle threads if required. */
     751    if (fWakeUpIdleThreads)
     752    {
     753        Assert(rc == VINF_SUCCESS);
     754        PRTREQPOOLTHREAD pThread;
     755        RTListForEach(&pPool->WorkerThreads, pThread, RTREQPOOLTHREAD, ListNode)
     756        {
     757            RTThreadUserSignal(pThread->hThread);
     758        }
     759    }
     760
     761    RTCritSectLeave(&pPool->CritSect);
     762
     763    return rc;
     764}
     765RT_EXPORT_SYMBOL(RTReqPoolSetCfgVar);
    579766
    580767
    581768RTDECL(int) RTReqPoolQueryCfgVar(RTREQPOOL hPool, RTREQPOOLCFGVAR enmVar, uint64_t *puValue)
    582769{
    583     return VERR_NOT_SUPPORTED;
    584 }
    585 
    586 
    587 typedef enum RTREQPOOLSTAT
    588 {
    589     RTREQPOOLSTAT_INVALID = 0,
    590     RTREQPOOLSTAT_END,
    591     RTREQPOOLSTAT_32BIT_HACK = 0x7fffffff
    592 } RTREQPOOLSTAT;
     770    PRTREQPOOLINT pPool = hPool;
     771    AssertPtrReturn(pPool, VERR_INVALID_HANDLE);
     772    AssertReturn(pPool->u32Magic == RTREQPOOL_MAGIC, VERR_INVALID_HANDLE);
     773    AssertReturn(enmVar > RTREQPOOLCFGVAR_INVALID && enmVar < RTREQPOOLCFGVAR_END, VERR_INVALID_PARAMETER);
     774
     775    RTCritSectEnter(&pPool->CritSect);
     776
     777    int rc = VINF_SUCCESS;
     778    switch (enmVar)
     779    {
     780        case RTREQPOOLCFGVAR_THREAD_TYPE:
     781            *puValue = pPool->enmThreadType;
     782            break;
     783
     784        case RTREQPOOLCFGVAR_MIN_THREADS:
     785            *puValue = pPool->cMinThreads;
     786            break;
     787
     788        case RTREQPOOLCFGVAR_MAX_THREADS:
     789            *puValue = pPool->cMaxThreads;
     790            break;
     791
     792        case RTREQPOOLCFGVAR_MS_MIN_IDLE:
     793            *puValue = pPool->cMsMinIdle;
     794            break;
     795
     796        case RTREQPOOLCFGVAR_MS_IDLE_SLEEP:
     797            *puValue = pPool->cMsIdleSleep;
     798            break;
     799
     800        case RTREQPOOLCFGVAR_PUSH_BACK_THRESHOLD:
     801            *puValue = pPool->cThreadsPushBackThreshold;
     802            break;
     803
     804        case RTREQPOOLCFGVAR_PUSH_BACK_MIN_MS:
     805            *puValue = pPool->cMsMinPushBack;
     806            break;
     807
     808        case RTREQPOOLCFGVAR_PUSH_BACK_MAX_MS:
     809            *puValue = pPool->cMsMaxPushBack;
     810            break;
     811
     812        case RTREQPOOLCFGVAR_MAX_FREE_REQUESTS:
     813            *puValue = pPool->cMaxFreeRequests;
     814            break;
     815
     816        default:
     817            AssertFailed();
     818            rc = VERR_IPE_NOT_REACHED_DEFAULT_CASE;
     819            *puValue = UINT64_MAX;
     820            break;
     821    }
     822
     823    RTCritSectLeave(&pPool->CritSect);
     824
     825    return rc;
     826}
     827RT_EXPORT_SYMBOL(RTReqPoolQueryCfgVar);
    593828
    594829
    595830RTDECL(uint64_t) RTReqPoolGetStat(RTREQPOOL hPool, RTREQPOOLSTAT enmStat)
    596831{
    597     return UINT64_MAX;
    598 }
    599 
    600 #endif /* later */
     832    PRTREQPOOLINT pPool = hPool;
     833    AssertPtrReturn(pPool, UINT64_MAX);
     834    AssertReturn(pPool->u32Magic == RTREQPOOL_MAGIC, UINT64_MAX);
     835    AssertReturn(enmStat > RTREQPOOLSTAT_INVALID && enmStat < RTREQPOOLSTAT_END, UINT64_MAX);
     836
     837    RTCritSectEnter(&pPool->CritSect);
     838
     839    uint64_t         u64;
     840    PRTREQPOOLTHREAD pThread;
     841    switch (enmStat)
     842    {
     843        case RTREQPOOLSTAT_THREADS:                     u64 = pPool->cCurThreads; break;
     844        case RTREQPOOLSTAT_THREADS_CREATED:             u64 = pPool->cThreadsCreated; break;
     845        case RTREQPOOLSTAT_REQUESTS_PROCESSED:          u64 = pPool->cReqProcessed; break;
     846        case RTREQPOOLSTAT_REQUESTS_SUBMITTED:          u64 = pPool->cReqSubmitted; break;
     847        case RTREQPOOLSTAT_REQUESTS_PENDING:            u64 = pPool->cCurPendingRequests; break;
     848        case RTREQPOOLSTAT_REQUESTS_ACTIVE:             u64 = pPool->cCurActiveRequests; break;
     849        case RTREQPOOLSTAT_REQUESTS_FREE:               u64 = pPool->cCurFreeRequests; break;
     850        case RTREQPOOLSTAT_NS_TOTAL_REQ_PROCESSING:     u64 = pPool->cNsTotalReqProcessing; break;
     851        case RTREQPOOLSTAT_NS_TOTAL_REQ_QUEUED:         u64 = pPool->cNsTotalReqQueued; break;
     852        case RTREQPOOLSTAT_NS_AVERAGE_REQ_PROCESSING:   u64 = pPool->cNsTotalReqProcessing / RT_MAX(pPool->cReqProcessed, 1); break;
     853        case RTREQPOOLSTAT_NS_AVERAGE_REQ_QUEUED:       u64 = pPool->cNsTotalReqQueued / RT_MAX(pPool->cReqProcessed, 1); break;
     854        default:
     855            AssertFailed();
     856            u64 = UINT64_MAX;
     857            break;
     858    }
     859
     860    RTCritSectLeave(&pPool->CritSect);
     861
     862    return u64;
     863}
     864RT_EXPORT_SYMBOL(RTReqPoolGetStat);
     865
    601866
    602867RTDECL(uint32_t) RTReqPoolRetain(RTREQPOOL hPool)
     
    653918        }
    654919        pPool->ppPendingRequests = NULL;
     920        pPool->cCurPendingRequests = 0;
    655921
    656922        /* Wait for the workers to shut down. */
Note: See TracChangeset for help on using the changeset viewer.

© 2025 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette