Changeset 39616 in vbox for trunk/src/VBox/Runtime/common
- Timestamp:
- Dec 14, 2011 4:35:38 PM (13 years ago)
- svn:sync-xref-src-repo-rev:
- 75397
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/src/VBox/Runtime/common/misc/reqpool.cpp
r39550 r39616 112 112 * considered for retirement. */ 113 113 uint32_t cMsMinIdle; 114 /** cMsMinIdle in nano seconds. */ 115 uint64_t cNsMinIdle; 116 /** The idle thread sleep interval in milliseconds. */ 117 uint32_t cMsIdleSleep; 114 118 /** The max number of milliseconds to push back a submitter before creating 115 119 * a new worker thread once the threshold has been reached. */ … … 123 127 124 128 /** Signaled by terminating worker threads. */ 125 RTSEMEVENT 129 RTSEMEVENTMULTI hThreadTermEvt; 126 130 127 131 /** Destruction indicator. The worker threads checks in their loop. */ … … 166 170 167 171 172 /** 173 * Used by exiting thread and the pool destruction code to cancel unexpected 174 * requests. 175 * 176 * @param pReq The request. 177 */ 178 static void rtReqPoolCancelReq(PRTREQINT pReq) 179 { 180 pReq->uOwner.hPool = NIL_RTREQPOOL; /* force free */ 181 pReq->enmState = RTREQSTATE_COMPLETED; 182 ASMAtomicWriteS32(&pReq->iStatusX, VERR_CANCELLED); 183 if (pReq->hPushBackEvt != NIL_RTSEMEVENTMULTI) 184 RTSemEventMultiSignal(pReq->hPushBackEvt); 185 RTSemEventSignal(pReq->EventSem); 186 187 RTReqRelease(pReq); 188 } 189 190 191 /** 192 * Recalculate the max pushback interval when adding or removing worker threads. 193 * 194 * @param pPool The pool. cMsCurPushBack will be changed. 195 */ 168 196 static void rtReqPoolRecalcPushBack(PRTREQPOOLINT pPool) 169 197 { … … 184 212 185 213 214 /** 215 * Performs thread exit. 216 * 217 * @returns Thread termination status code (VINF_SUCCESS). 218 * @param pPool The pool. 219 * @param pThread The thread. 220 * @param fLocked Whether we are inside the critical section 221 * already. 222 */ 223 static int rtReqPoolThreadExit(PRTREQPOOLINT pPool, PRTREQPOOLTHREAD pThread, bool fLocked) 224 { 225 if (!fLocked) 226 RTCritSectEnter(&pPool->CritSect); 227 228 /* Get out of the idle list. */ 229 if (!RTListIsEmpty(&pThread->IdleNode)) 230 { 231 RTListNodeRemove(&pThread->IdleNode); 232 Assert(pPool->cIdleThreads > 0); 233 ASMAtomicDecU32(&pPool->cIdleThreads); 234 } 235 236 /* Get out of the thread list. */ 237 RTListNodeRemove(&pThread->ListNode); 238 Assert(pPool->cCurThreads > 0); 239 pPool->cCurThreads--; 240 rtReqPoolRecalcPushBack(pPool); 241 242 /* This shouldn't happen... */ 243 PRTREQINT pReq = pThread->pTodoReq; 244 if (pReq) 245 { 246 AssertFailed(); 247 pThread->pTodoReq = NULL; 248 rtReqPoolCancelReq(pReq); 249 } 250 251 /* If we're the last thread terminating, ping the destruction thread before 252 we leave the critical section. */ 253 if ( RTListIsEmpty(&pPool->WorkerThreads) 254 && pPool->hThreadTermEvt != NIL_RTSEMEVENT) 255 RTSemEventMultiSignal(pPool->hThreadTermEvt); 256 257 RTCritSectLeave(&pPool->CritSect); 258 259 return VINF_SUCCESS; 260 } 261 262 263 186 264 static void rtReqPoolThreadProcessRequest(PRTREQPOOLTHREAD pThread, PRTREQINT pReq) 187 265 { … … 210 288 211 289 290 /** 291 * The Worker Thread Procedure. 292 * 293 * @returns VINF_SUCCESS. 294 * @param hThreadSelf The thread handle (unused). 295 * @param pvArg Pointer to the thread data. 296 */ 212 297 static DECLCALLBACK(int) rtReqPoolThreadProc(RTTHREAD hThreadSelf, void *pvArg) 213 298 { … … 215 300 PRTREQPOOLINT pPool = pThread->pPool; 216 301 217 /** @todo rework this... */218 219 302 /* 220 303 * The work loop. 221 304 */ 222 uint64_t cPrevReqProcessed = 0;223 while ( pPool->fDestructing)305 uint64_t cPrevReqProcessed = UINT64_MAX; 306 while (!pPool->fDestructing) 224 307 { 225 308 /* 226 * P ending work?309 * Process pending work. 227 310 */ 311 312 /* Check if anything is scheduled directly to us. */ 228 313 PRTREQINT pReq = ASMAtomicXchgPtrT(&pThread->pTodoReq, NULL, PRTREQINT); 229 314 if (pReq) 315 { 316 Assert(RTListIsEmpty(&pThread->IdleNode)); /* Must not be in the idle list. */ 230 317 rtReqPoolThreadProcessRequest(pThread, pReq); 318 continue; 319 } 320 321 ASMAtomicIncU32(&pPool->cIdleThreads); 322 RTCritSectEnter(&pPool->CritSect); 323 324 /* Recheck the todo request pointer after entering the critsect. */ 325 pReq = ASMAtomicXchgPtrT(&pThread->pTodoReq, NULL, PRTREQINT); 326 if (pReq) 327 { 328 Assert(RTListIsEmpty(&pThread->IdleNode)); /* Must not be in the idle list. */ 329 RTCritSectLeave(&pPool->CritSect); 330 331 rtReqPoolThreadProcessRequest(pThread, pReq); 332 continue; 333 } 334 335 /* Any pending requests in the queue? */ 336 pReq = pPool->pPendingRequests; 337 if (pReq) 338 { 339 pPool->pPendingRequests = pReq->pNext; 340 if (pReq->pNext == NULL) 341 pPool->ppPendingRequests = &pPool->pPendingRequests; 342 343 /* Un-idle ourselves and process the request. */ 344 if (!RTListIsEmpty(&pThread->IdleNode)) 345 { 346 RTListNodeRemove(&pThread->IdleNode); 347 RTListInit(&pThread->IdleNode); 348 ASMAtomicDecU32(&pPool->cIdleThreads); 349 } 350 ASMAtomicDecU32(&pPool->cIdleThreads); 351 RTCritSectLeave(&pPool->CritSect); 352 353 rtReqPoolThreadProcessRequest(pThread, pReq); 354 continue; 355 } 356 357 /* 358 * Nothing to do, go idle. 359 */ 360 if (cPrevReqProcessed != pThread->cReqProcessed) 361 { 362 pThread->cReqProcessed = cPrevReqProcessed; 363 pThread->uIdleNanoTs = RTTimeNanoTS(); 364 } 365 else if (pPool->cCurThreads > pPool->cMinThreads) 366 { 367 uint64_t cNsIdle = RTTimeNanoTS() - pThread->uIdleNanoTs; 368 if (cNsIdle >= pPool->cNsMinIdle) 369 return rtReqPoolThreadExit(pPool, pThread, true /*fLocked*/); 370 } 371 372 if (RTListIsEmpty(&pThread->IdleNode)) 373 RTListPrepend(&pPool->IdleThreads, &pThread->IdleNode); 231 374 else 232 { 233 ASMAtomicIncU32(&pPool->cIdleThreads); 234 RTCritSectEnter(&pPool->CritSect); 235 236 /* Recheck the todo request pointer after entering the critsect. */ 237 pReq = ASMAtomicXchgPtrT(&pThread->pTodoReq, NULL, PRTREQINT); 238 if (!pReq) 239 { 240 /* Any pending requests in the queue? */ 241 pReq = pPool->pPendingRequests; 242 if (pReq) 243 { 244 pPool->pPendingRequests = pReq->pNext; 245 if (pReq->pNext == NULL) 246 pPool->ppPendingRequests = &pPool->pPendingRequests; 247 } 248 } 249 250 if (pReq) 251 { 252 /* 253 * Un-idle ourselves and process the request. 254 */ 255 if (!RTListIsEmpty(&pThread->IdleNode)) 256 { 257 RTListNodeRemove(&pThread->IdleNode); 258 RTListInit(&pThread->IdleNode); 259 } 260 ASMAtomicDecU32(&pPool->cIdleThreads); 261 RTCritSectLeave(&pPool->CritSect); 262 263 rtReqPoolThreadProcessRequest(pThread, pReq); 264 } 265 else 266 { 267 /* 268 * Nothing to do, go idle. 269 */ 270 if (cPrevReqProcessed != pThread->cReqProcessed) 271 { 272 pThread->cReqProcessed = cPrevReqProcessed; 273 pThread->uIdleNanoTs = RTTimeNanoTS(); 274 } 275 276 if (RTListIsEmpty(&pThread->IdleNode)) 277 RTListPrepend(&pPool->IdleThreads, &pThread->IdleNode); 278 RTThreadUserReset(hThreadSelf); 279 280 RTCritSectLeave(&pPool->CritSect); 281 282 RTThreadUserWait(hThreadSelf, 0); 283 284 285 286 } 287 } 288 } 289 290 /* 291 * Clean up on the way out. 292 */ 293 RTCritSectEnter(&pPool->CritSect); 294 295 /** @todo .... */ 296 297 rtReqPoolRecalcPushBack(pPool); 298 299 RTCritSectLeave(&pPool->CritSect); 300 301 return VINF_SUCCESS; 375 ASMAtomicDecU32(&pPool->cIdleThreads); 376 RTThreadUserReset(hThreadSelf); 377 uint32_t const cMsSleep = pPool->cMsIdleSleep; 378 379 RTCritSectLeave(&pPool->CritSect); 380 381 RTThreadUserWait(hThreadSelf, cMsSleep); 382 } 383 384 return rtReqPoolThreadExit(pPool, pThread, false /*fLocked*/); 302 385 } 303 386 … … 480 563 } 481 564 565 typedef enum RTREQPOOLCFGVAR 566 { 567 RTREQPOOLCFGVAR_INVALID = 0, 568 RTREQPOOLCFGVAR_END, 569 RTREQPOOLCFGVAR_32BIT_HACK = 0x7fffffff 570 } RTREQPOOLCFGVAR; 571 572 573 RTDECL(int) RTReqPoolSetCfgVar(RTREQPOOL hPool, RTREQPOOLCFGVAR enmVar, uint64_t uValue) 574 { 575 return VERR_NOT_SUPPORTED; 576 } 577 578 579 RTDECL(int) RTReqPoolQueryCfgVar(RTREQPOOL hPool, RTREQPOOLCFGVAR enmVar, uint64_t *puValue) 580 { 581 return VERR_NOT_SUPPORTED; 582 } 583 584 585 typedef enum RTREQPOOLSTAT 586 { 587 RTREQPOOLSTAT_INVALID = 0, 588 RTREQPOOLSTAT_END, 589 RTREQPOOLSTAT_32BIT_HACK = 0x7fffffff 590 } RTREQPOOLSTAT; 591 592 593 RTDECL(uint64_t) RTReqPoolGetStat(RTREQPOOL hPool, RTREQPOOLSTAT enmStat) 594 { 595 return UINT64_MAX; 596 } 597 482 598 483 599 RTDECL(uint32_t) RTReqPoolRetain(RTREQPOOL hPool) … … 512 628 513 629 RTCritSectEnter(&pPool->CritSect); 630 #ifdef RT_STRICT 631 RTTHREAD const hSelf = RTThreadSelf(); 632 #endif 514 633 515 634 /* Indicate to the worker threads that we're shutting down. */ … … 518 637 RTListForEach(&pPool->WorkerThreads, pThread, RTREQPOOLTHREAD, ListNode) 519 638 { 639 Assert(pThread->hThread != hSelf); 520 640 RTThreadUserSignal(pThread->hThread); 521 641 } … … 527 647 PRTREQINT pReq = pPool->pPendingRequests; 528 648 pPool->pPendingRequests = pReq->pNext; 529 530 pReq->enmState = RTREQSTATE_COMPLETED; 531 ASMAtomicWriteS32(&pReq->iStatusX, VERR_CANCELLED); 532 if (pReq->hPushBackEvt != NIL_RTSEMEVENTMULTI) 533 RTSemEventMultiSignal(pReq->hPushBackEvt); 534 RTSemEventSignal(pReq->EventSem); 535 536 pReq->uOwner.hPool = NULL; 537 RTReqRelease(pReq); 649 rtReqPoolCancelReq(pReq); 538 650 } 539 651 pPool->ppPendingRequests = NULL; … … 543 655 { 544 656 RTCritSectLeave(&pPool->CritSect); 545 RTSemEvent Wait(pPool->hThreadTermEvt, RT_MS_1MIN);657 RTSemEventMultiWait(pPool->hThreadTermEvt, RT_MS_1MIN); 546 658 RTCritSectEnter(&pPool->CritSect); 547 659 /** @todo should we wait forever here? */
Note:
See TracChangeset
for help on using the changeset viewer.