VirtualBox

Ignore:
Timestamp:
Dec 2, 2011 3:58:27 PM (13 years ago)
Author:
vboxsync
Message:

Some request thread pool musings.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/src/VBox/Runtime/common/misc/reqpool.cpp

    r39510 r39517  
    5959    /** The submit timestamp of the pending request. */
    6060    uint64_t                uPendingNanoTs;
     61    /** The submit timestamp of the request processing. */
     62    uint64_t                uProcessingNanoTs;
    6163    /** When this CPU went idle the last time. */
    6264    uint64_t                uIdleNanoTs;
     
    7173    RTCPUID                 idLastCpu;
    7274
    73     /** The thread handle. */
    74     RTTHREAD                hThread;
    75 
    7675    /** The submitter will put an incoming request here when scheduling an idle
    7776     * thread.  */
     
    8079    PRTREQINT volatile      pPendingReq;
    8180
     81    /** The thread handle. */
     82    RTTHREAD                hThread;
     83    /** Nano seconds timestamp representing the birth time of the thread.  */
     84    uint64_t                uBirthNanoTs;
     85    /** Pointer to the request thread pool instance the thread is associated
     86     *  with. */
     87    struct RTREQPOOLINT    *pPool;
    8288} RTREQPOOLTHREAD;
    83 
     89/** Pointer to a worker thread. */
     90typedef RTREQPOOLTHREAD *PRTREQPOOLTHREAD;
     91
     92/**
     93 * Request thread pool instance data.
     94 */
    8495typedef struct RTREQPOOLINT
    8596{
     
    8798    uint32_t                u32Magic;
    8899
     100    /** The worker thread type. */
     101    RTTHREADTYPE            enmThreadType;
    89102    /** The current number of worker threads. */
    90103    uint32_t                cCurThreads;
     
    99112     * considered for retirement. */
    100113    uint32_t                cMsMinIdle;
     114    /** The max number of milliseconds to push back a submitter before creating
     115     * a new worker thread once the threshold has been reached. */
     116    uint32_t                cMsMaxPushBack;
     117    /** The minimum number of milliseconds to push back a submitter before
     118     * creating a new worker thread once the threshold has been reached. */
     119    uint32_t                cMsMinPushBack;
     120    /** The current submitter push back in milliseconds.
     121     * This is recalculated when worker threads come and go.  */
     122    uint32_t                cMsCurPushBack;
    101123
    102124    /** Statistics: The total number of threads created. */
     
    107129    RTLISTANCHOR            WorkerThreads;
    108130
     131    /** Event semaphore that submitters block on when pushing back . */
     132    RTSEMEVENT              hPushBackEvt;
     133
    109134    /** Critical section serializing access to members of this structure.  */
    110135    RTCRITSECT              CritSect;
    111136
     137    /** Destruction indicator.  The worker threads checks in their loop. */
     138    bool volatile           fDestructing;
     139
    112140    /** Reference counter. */
    113141    uint32_t volatile       cRefs;
     142    /** Number of threads pushing back. */
     143    uint32_t volatile       cPushingBack;
     144    /** The number of idle thread or threads in the process of becoming
     145     * idle.  This is increased before the to-be-idle thread tries to enter
     146     * the critical section and add itself to the list. */
     147    uint32_t volatile       cIdleThreads;
    114148    /** Linked list of idle threads. */
    115149    RTLISTANCHOR            IdleThreads;
    116150
     151    /** Head of the request FIFO. */
     152    PRTREQINT               pPendingRequests;
     153    /** Where to insert the next request. */
     154    PRTREQINT              *ppPendingRequests;
    117155
    118156} RTREQPOOLINT;
     
    120158typedef RTREQPOOLINT *PRTREQPOOLINT;
    121159
     160
     161static void rtReqPoolRecalcPushBack(PRTREQPOOLINT pPool)
     162{
     163    uint32_t const cMsRange = pPool->cMsMaxPushBack - pPool->cMsMinPushBack;
     164    uint32_t const cSteps   = pPool->cMaxThreads - pPool->cThreadsThreshold;
     165    uint32_t const iStep    = pPool->cCurThreads - pPool->cThreadsThreshold;
     166
     167    uint32_t cMsCurPushBack;
     168    if ((cMsRange >> 2) >= cSteps)
     169        cMsCurPushBack = cMsRange / cSteps * iStep;
     170    else
     171        cMsCurPushBack = (uint32_t)( (uint64_t)cMsRange * RT_NS_1MS  / cSteps * iStep / RT_NS_1MS );
     172    cMsCurPushBack += pPool->cMsMinPushBack;
     173
     174    pPool->cMsCurPushBack = cMsCurPushBack;
     175}
     176
     177
     178
     179static void rtReqPoolThreadProcessRequest(PRTREQPOOLTHREAD pThread, PRTREQINT pReq)
     180{
     181    /*
     182     * Update thread state.
     183     */
     184    pThread->uProcessingNanoTs  = RTTimeNanoTS();
     185    pThread->uPendingNanoTs     = pReq->uSubmitNanoTs;
     186    pThread->pPendingReq        = pReq;
     187    Assert(pReq->u32Magic == RTREQ_MAGIC);
     188
     189    /*
     190     * Do the actual processing.
     191     */
     192    /** @todo  */
     193
     194    /*
     195     * Update thread statistics and state.
     196     */
     197    uint64_t const uNsTsEnd = RTTimeNanoTS();
     198    pThread->cNsTotalReqProcessing += uNsTsEnd - pThread->uProcessingNanoTs;
     199    pThread->cNsTotalReqQueued     += uNsTsEnd - pThread->uPendingNanoTs;
     200    pThread->cReqProcessed++;
     201}
     202
     203
     204
     205static DECLCALLBACK(int) rtReqPoolThreadProc(RTTHREAD hThreadSelf, void *pvArg)
     206{
     207    PRTREQPOOLTHREAD    pThread = (PRTREQPOOLTHREAD)pvArg;
     208    PRTREQPOOLINT       pPool   = pThread->pPool;
     209
     210    /*
     211     * The work loop.
     212     */
     213    uint64_t            cPrevReqProcessed = 0;
     214    while (pPool->fDestructing)
     215    {
     216        /*
     217         * Pending work?
     218         */
     219        PRTREQINT pReq = ASMAtomicXchgPtrT(&pThread->pTodoReq, NULL, PRTREQINT);
     220        if (pReq)
     221            rtReqPoolThreadProcessRequest(pThread, pReq);
     222        else
     223        {
     224            ASMAtomicIncU32(&pPool->cIdleThreads);
     225            RTCritSectEnter(&pPool->CritSect);
     226
     227            /* Recheck the todo request pointer after entering the critsect. */
     228            pReq = ASMAtomicXchgPtrT(&pThread->pTodoReq, NULL, PRTREQINT);
     229            if (!pReq)
     230            {
     231                /* Any pending requests in the queue? */
     232                pReq = pPool->pPendingRequests;
     233                if (pReq)
     234                {
     235                    pPool->pPendingRequests = pReq->pNext;
     236                    if (pReq->pNext == NULL)
     237                        pPool->ppPendingRequests = &pPool->pPendingRequests;
     238                }
     239            }
     240
     241            if (pReq)
     242            {
     243                /*
     244                 * Un-idle ourselves and process the request.
     245                 */
     246                if (!RTListIsEmpty(&pThread->IdleNode))
     247                {
     248                    RTListNodeRemove(&pThread->IdleNode);
     249                    RTListInit(&pThread->IdleNode);
     250                }
     251                ASMAtomicDecU32(&pPool->cIdleThreads);
     252                RTCritSectLeave(&pPool->CritSect);
     253
     254                rtReqPoolThreadProcessRequest(pThread, pReq);
     255            }
     256            else
     257            {
     258                /*
     259                 * Nothing to do, go idle.
     260                 */
     261                if (cPrevReqProcessed != pThread->cReqProcessed)
     262                {
     263                    pThread->cReqProcessed = cPrevReqProcessed;
     264                    pThread->uIdleNanoTs   = RTTimeNanoTS();
     265                }
     266
     267                if (RTListIsEmpty(&pThread->IdleNode))
     268                    RTListPrepend(&pPool->IdleThreads, &pThread->IdleNode);
     269                RTThreadUserReset(hThreadSelf);
     270
     271                RTCritSectLeave(&pPool->CritSect);
     272
     273                RTThreadUserWait(hThreadSelf, 0);
     274
     275
     276
     277            }
     278        }
     279    }
     280
     281    /*
     282     * Clean up on the way out.
     283     */
     284    RTCritSectEnter(&pPool->CritSect);
     285
     286    /** @todo ....  */
     287
     288    rtReqPoolRecalcPushBack(pPool);
     289
     290    RTCritSectLeave(&pPool->CritSect);
     291
     292    return VINF_SUCCESS;
     293}
     294
     295
     296DECLHIDDEN(int) rtReqPoolSubmit(PRTREQPOOLINT pPool, PRTREQINT pReq)
     297{
     298    /*
     299     * Prepare the request.
     300     */
     301    pReq->uSubmitNanoTs = RTTimeNanoTS();
     302
     303
     304    RTCritSectEnter(&pPool->CritSect);
     305
     306    /*
     307     * Try schedule the request to any currently idle thread.
     308     */
     309    PRTREQPOOLTHREAD pThread = RTListGetFirst(&pPool->IdleThreads, RTREQPOOLTHREAD, IdleNode);
     310    if (pThread)
     311    {
     312        /** @todo CPU affinity... */
     313        ASMAtomicWritePtr(&pThread->pTodoReq, pReq);
     314
     315        RTListNodeRemove(&pThread->IdleNode);
     316        RTListInit(&pThread->IdleNode);
     317        ASMAtomicDecU32(&pPool->cIdleThreads);
     318
     319        RTThreadUserSignal(pThread->hThread);
     320
     321        RTCritSectLeave(&pPool->CritSect);
     322        return VINF_SUCCESS;
     323    }
     324    Assert(RTListIsEmpty(&pPool->IdleThreads));
     325
     326    /*
     327     * Put the request in the pending queue.
     328     */
     329    pReq->pNext = NULL;
     330    *pPool->ppPendingRequests = pReq;
     331    pPool->ppPendingRequests  = (PRTREQINT*)&pReq->pNext;
     332
     333    /*
     334     * If there is an incoming worker thread already or we've reached the
     335     * maximum number of worker threads, we're done.
     336     */
     337    if (   pPool->cIdleThreads > 0
     338        || pPool->cCurThreads >= pPool->cMaxThreads)
     339    {
     340        RTCritSectLeave(&pPool->CritSect);
     341        return VINF_SUCCESS;
     342    }
     343
     344    /*
     345     * Push back before creating a new worker thread.
     346     */
     347    if (   pPool->cCurThreads > pPool->cThreadsThreshold
     348        && (RTTimeNanoTS() - pReq->uSubmitNanoTs) / RT_NS_1MS >= pPool->cMsCurPushBack )
     349    {
     350        uint32_t const cMsTimeout = pPool->cMsCurPushBack;
     351        pPool->cPushingBack++;
     352        RTCritSectLeave(&pPool->CritSect);
     353
     354        /** @todo this is everything but perfect... it makes wake up order
     355         *        assumptions. A better solution would be having a lazily
     356         *        allocated push back event on each request. */
     357        int rc = RTSemEventWait(pPool->hPushBackEvt, cMsTimeout);
     358
     359        RTCritSectEnter(&pPool->CritSect);
     360        pPool->cPushingBack--;
     361    }
     362
     363    /*
     364     * Create a new thread for processing the request, or should we wait?
     365     */
     366    pThread = (PRTREQPOOLTHREAD)RTMemAllocZ(sizeof(RTREQPOOLTHREAD));
     367    if (pThread)
     368    {
     369        pThread->uBirthNanoTs = RTTimeNanoTS();
     370        pThread->pPool        = pPool;
     371        pThread->idLastCpu    = NIL_RTCPUID;
     372        pThread->hThread      = NIL_RTTHREAD;
     373        RTListInit(&pThread->IdleNode);
     374        RTListAppend(&pPool->WorkerThreads, &pThread->ListNode);
     375        pPool->cCurThreads++;
     376        pPool->cThreadsCreated++;
     377
     378        static uint32_t s_idThread = 0;
     379        int rc = RTThreadCreateF(&pThread->hThread, rtReqPoolThreadProc, pThread, 0 /*default stack size*/,
     380                                 pPool->enmThreadType, RTTHREADFLAGS_WAITABLE, "REQPT%02u", ++s_idThread);
     381        if (RT_SUCCESS(rc))
     382            pPool->uLastThreadCreateNanoTs = pThread->uBirthNanoTs;
     383        else
     384        {
     385            pPool->cCurThreads--;
     386            RTListNodeRemove(&pThread->ListNode);
     387            RTMemFree(pThread);
     388        }
     389    }
     390    RTCritSectLeave(&pPool->CritSect);
     391
     392    return VINF_SUCCESS;
     393}
     394
Note: See TracChangeset for help on using the changeset viewer.

© 2024 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette