Changeset 39620 in vbox for trunk/src/VBox
- Timestamp:
- Dec 15, 2011 1:10:48 AM (13 years ago)
- svn:sync-xref-src-repo-rev:
- 75401
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/src/VBox/Runtime/common/misc/reqpool.cpp
r39617 r39620 45 45 #include "internal/req.h" 46 46 #include "internal/magics.h" 47 48 49 /******************************************************************************* 50 * Defined Constants And Macros * 51 *******************************************************************************/ 52 /** The max number of worker threads. */ 53 #define RTREQPOOL_MAX_THREADS UINT32_C(16384) 54 /** The max number of milliseconds to push back. */ 55 #define RTREQPOOL_PUSH_BACK_MAX_MS RT_MS_1MIN 56 /** The max number of free requests to keep around. */ 57 #define RTREQPOOL_MAX_FREE_REQUESTS (RTREQPOOL_MAX_THREADS * 2U) 47 58 48 59 … … 104 115 /** The maximum number of worker threads. */ 105 116 uint32_t cMaxThreads; 106 /** The number of threads which should be spawned before throttling kicks107 * in. */108 uint32_t cThreadsThreshold;109 117 /** The minimum number of worker threads. */ 110 118 uint32_t cMinThreads; … … 115 123 uint64_t cNsMinIdle; 116 124 /** The idle thread sleep interval in milliseconds. */ 117 uint32_t cMsIdleSleep; 125 RTMSINTERVAL cMsIdleSleep; 126 /** The number of threads which should be spawned before throttling kicks 127 * in. */ 128 uint32_t cThreadsPushBackThreshold; 118 129 /** The max number of milliseconds to push back a submitter before creating 119 130 * a new worker thread once the threshold has been reached. */ … … 144 155 RTLISTANCHOR WorkerThreads; 145 156 157 /** The number of requests processed and counted in the time totals. */ 158 uint64_t cReqProcessed; 159 /** Total time the requests processed by this thread took to process. */ 160 uint64_t cNsTotalReqProcessing; 161 /** Total time the requests processed by this thread had to wait in 162 * the queue before being scheduled. */ 163 uint64_t cNsTotalReqQueued; 164 146 165 /** Reference counter. */ 147 166 uint32_t volatile cRefs; … … 157 176 /** Where to insert the next request. */ 158 177 PRTREQINT *ppPendingRequests; 178 /** The number of requests currently pending. */ 179 uint32_t cCurPendingRequests; 180 /** The number of requests currently being executed. */ 181 uint32_t volatile cCurActiveRequests; 182 /** The number of requests submitted. */ 183 uint64_t cReqSubmitted; 159 184 160 185 /** Head of the request recycling LIFO. */ … … 197 222 { 198 223 uint32_t const cMsRange = pPool->cMsMaxPushBack - pPool->cMsMinPushBack; 199 uint32_t const cSteps = pPool->cMaxThreads - pPool->cThreads Threshold;200 uint32_t const iStep = pPool->cCurThreads - pPool->cThreads Threshold;224 uint32_t const cSteps = pPool->cMaxThreads - pPool->cThreadsPushBackThreshold; 225 uint32_t const iStep = pPool->cCurThreads - pPool->cThreadsPushBackThreshold; 201 226 202 227 uint32_t cMsCurPushBack; … … 262 287 263 288 264 static void rtReqPoolThreadProcessRequest(PRTREQPOOLTHREAD pThread, PRTREQINT pReq) 289 /** 290 * Process one request. 291 * 292 * @param pPool The pool. 293 * @param pThread The worker thread. 294 * @param pReq The request to process. 295 */ 296 static void rtReqPoolThreadProcessRequest(PRTREQPOOLINT pPool, PRTREQPOOLTHREAD pThread, PRTREQINT pReq) 265 297 { 266 298 /* … … 270 302 pThread->uPendingNanoTs = pReq->uSubmitNanoTs; 271 303 pThread->pPendingReq = pReq; 304 ASMAtomicIncU32(&pPool->cCurActiveRequests); 272 305 Assert(pReq->u32Magic == RTREQ_MAGIC); 273 306 … … 275 308 * Do the actual processing. 276 309 */ 277 /** @todo */310 rtReqProcessOne(pReq); 278 311 279 312 /* 280 313 * Update thread statistics and state. 281 314 */ 315 ASMAtomicDecU32(&pPool->cCurActiveRequests); 316 pThread->pPendingReq = NULL; 282 317 uint64_t const uNsTsEnd = RTTimeNanoTS(); 283 318 pThread->cNsTotalReqProcessing += uNsTsEnd - pThread->uProcessingNanoTs; … … 303 338 * The work loop. 304 339 */ 305 uint64_t cPrevReqProcessed = UINT64_MAX; 340 uint64_t cReqPrevProcessedIdle = UINT64_MAX; 341 uint64_t cReqPrevProcessedStat = 0; 342 uint64_t cNsPrevTotalReqProcessing = 0; 343 uint64_t cNsPrevTotalReqQueued = 0; 306 344 while (!pPool->fDestructing) 307 345 { … … 315 353 { 316 354 Assert(RTListIsEmpty(&pThread->IdleNode)); /* Must not be in the idle list. */ 317 rtReqPoolThreadProcessRequest(p Thread, pReq);355 rtReqPoolThreadProcessRequest(pPool, pThread, pReq); 318 356 continue; 319 357 } … … 321 359 ASMAtomicIncU32(&pPool->cIdleThreads); 322 360 RTCritSectEnter(&pPool->CritSect); 361 362 /* Update the global statistics. */ 363 if (cReqPrevProcessedStat != pThread->cReqProcessed) 364 { 365 pPool->cReqProcessed = pThread->cReqProcessed - cReqPrevProcessedStat; 366 cReqPrevProcessedStat = pThread->cReqProcessed; 367 pPool->cNsTotalReqProcessing += pThread->cNsTotalReqProcessing - cNsPrevTotalReqProcessing; 368 cNsPrevTotalReqProcessing = pThread->cNsTotalReqProcessing; 369 pPool->cNsTotalReqQueued += pThread->cNsTotalReqQueued - cNsPrevTotalReqQueued; 370 cNsPrevTotalReqQueued = pThread->cNsTotalReqQueued; 371 } 323 372 324 373 /* Recheck the todo request pointer after entering the critsect. */ … … 329 378 RTCritSectLeave(&pPool->CritSect); 330 379 331 rtReqPoolThreadProcessRequest(p Thread, pReq);380 rtReqPoolThreadProcessRequest(pPool, pThread, pReq); 332 381 continue; 333 382 } … … 340 389 if (pReq->pNext == NULL) 341 390 pPool->ppPendingRequests = &pPool->pPendingRequests; 391 Assert(pPool->cCurPendingRequests > 0); 392 pPool->cCurPendingRequests--; 342 393 343 394 /* Un-idle ourselves and process the request. */ … … 351 402 RTCritSectLeave(&pPool->CritSect); 352 403 353 rtReqPoolThreadProcessRequest(p Thread, pReq);404 rtReqPoolThreadProcessRequest(pPool, pThread, pReq); 354 405 continue; 355 406 } … … 358 409 * Nothing to do, go idle. 359 410 */ 360 if (c PrevReqProcessed!= pThread->cReqProcessed)361 { 362 pThread->cReqProcessed = cPrevReqProcessed;363 pThread->uIdleNanoTs 411 if (cReqPrevProcessedIdle != pThread->cReqProcessed) 412 { 413 cReqPrevProcessedIdle = pThread->cReqProcessed; 414 pThread->uIdleNanoTs = RTTimeNanoTS(); 364 415 } 365 416 else if (pPool->cCurThreads > pPool->cMinThreads) … … 475 526 RTCritSectEnter(&pPool->CritSect); 476 527 528 pPool->cReqSubmitted++; 529 477 530 /* 478 531 * Try schedule the request to a thread that's currently idle. … … 481 534 if (pThread) 482 535 { 483 /** @todo CPU affinity ...*/536 /** @todo CPU affinity??? */ 484 537 ASMAtomicWritePtr(&pThread->pTodoReq, pReq); 485 538 … … 501 554 *pPool->ppPendingRequests = pReq; 502 555 pPool->ppPendingRequests = (PRTREQINT*)&pReq->pNext; 556 pPool->cCurPendingRequests++; 503 557 504 558 /* … … 516 570 * Push back before creating a new worker thread. 517 571 */ 518 if ( pPool->cCurThreads > pPool->cThreads Threshold572 if ( pPool->cCurThreads > pPool->cThreadsPushBackThreshold 519 573 && (RTTimeNanoTS() - pReq->uSubmitNanoTs) / RT_NS_1MS >= pPool->cMsCurPushBack ) 520 574 { … … 563 617 } 564 618 565 #if 0 /* later */566 567 typedef enum RTREQPOOLCFGVAR568 {569 RTREQPOOLCFGVAR_INVALID = 0,570 RTREQPOOLCFGVAR_END,571 RTREQPOOLCFGVAR_32BIT_HACK = 0x7fffffff572 } RTREQPOOLCFGVAR;573 574 619 575 620 RTDECL(int) RTReqPoolSetCfgVar(RTREQPOOL hPool, RTREQPOOLCFGVAR enmVar, uint64_t uValue) 576 621 { 577 return VERR_NOT_SUPPORTED; 578 } 622 PRTREQPOOLINT pPool = hPool; 623 AssertPtrReturn(pPool, VERR_INVALID_HANDLE); 624 AssertReturn(pPool->u32Magic == RTREQPOOL_MAGIC, VERR_INVALID_HANDLE); 625 AssertReturn(enmVar > RTREQPOOLCFGVAR_INVALID && enmVar < RTREQPOOLCFGVAR_END, VERR_INVALID_PARAMETER); 626 627 RTCritSectEnter(&pPool->CritSect); 628 629 bool fWakeUpIdleThreads = false; 630 int rc = VINF_SUCCESS; 631 switch (enmVar) 632 { 633 case RTREQPOOLCFGVAR_THREAD_TYPE: 634 AssertMsgBreakStmt(uValue > (uint64_t)RTTHREADTYPE_INVALID && uValue < (uint64_t)RTTHREADTYPE_END, 635 ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE); 636 637 pPool->enmThreadType = (RTTHREADTYPE)uValue; 638 break; 639 640 case RTREQPOOLCFGVAR_MIN_THREADS: 641 AssertMsgBreakStmt(uValue <= RTREQPOOL_MAX_THREADS, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE); 642 fWakeUpIdleThreads = pPool->cMinThreads > (uint32_t)uValue; 643 pPool->cMinThreads = (uint32_t)uValue; 644 if (pPool->cMinThreads > pPool->cMaxThreads) 645 pPool->cMaxThreads = pPool->cMinThreads; 646 if ( pPool->cThreadsPushBackThreshold < pPool->cMinThreads 647 || pPool->cThreadsPushBackThreshold > pPool->cMaxThreads) 648 pPool->cThreadsPushBackThreshold = pPool->cMinThreads + (pPool->cMaxThreads - pPool->cMinThreads) / 2; 649 rtReqPoolRecalcPushBack(pPool); 650 break; 651 652 case RTREQPOOLCFGVAR_MAX_THREADS: 653 AssertMsgBreakStmt(uValue <= RTREQPOOL_MAX_THREADS && uValue >= 1, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE); 654 pPool->cMaxThreads = (uint32_t)uValue; 655 if (pPool->cMaxThreads < pPool->cMinThreads) 656 { 657 pPool->cMinThreads = pPool->cMaxThreads; 658 fWakeUpIdleThreads = true; 659 } 660 if (pPool->cMaxThreads < pPool->cThreadsPushBackThreshold) 661 pPool->cThreadsPushBackThreshold = pPool->cMinThreads + (pPool->cMaxThreads - pPool->cMinThreads) / 2; 662 rtReqPoolRecalcPushBack(pPool); 663 break; 664 665 case RTREQPOOLCFGVAR_MS_MIN_IDLE: 666 AssertMsgBreakStmt(uValue < UINT32_MAX || uValue == RT_INDEFINITE_WAIT, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE); 667 if (uValue < UINT32_MAX && uValue != RT_INDEFINITE_WAIT) 668 { 669 fWakeUpIdleThreads = pPool->cMsMinIdle != (uint32_t)uValue; 670 pPool->cMsMinIdle = (uint32_t)uValue; 671 pPool->cNsMinIdle = pPool->cMsMinIdle * RT_NS_1MS_64; 672 if (pPool->cMsIdleSleep > pPool->cMsMinIdle) 673 pPool->cMsIdleSleep = RT_MAX(RT_MS_1SEC, pPool->cMsMinIdle); 674 } 675 else 676 { 677 pPool->cMsMinIdle = UINT32_MAX; 678 pPool->cNsMinIdle = UINT64_MAX; 679 pPool->cMsIdleSleep = RT_INDEFINITE_WAIT; 680 } 681 break; 682 683 case RTREQPOOLCFGVAR_MS_IDLE_SLEEP: 684 AssertMsgBreakStmt(uValue <= RT_INDEFINITE_WAIT, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE); 685 fWakeUpIdleThreads = pPool->cMsMinIdle > (RTMSINTERVAL)uValue; 686 pPool->cMsIdleSleep = (RTMSINTERVAL)uValue; 687 if (pPool->cMsIdleSleep == RT_INDEFINITE_WAIT) 688 { 689 pPool->cMsMinIdle = UINT32_MAX; 690 pPool->cNsMinIdle = UINT64_MAX; 691 } 692 break; 693 694 case RTREQPOOLCFGVAR_PUSH_BACK_THRESHOLD: 695 if (uValue == UINT64_MAX) 696 pPool->cThreadsPushBackThreshold = pPool->cMaxThreads; 697 else if (uValue == 0) 698 pPool->cThreadsPushBackThreshold = pPool->cMinThreads; 699 else 700 { 701 AssertMsgBreakStmt(uValue <= pPool->cMaxThreads, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE); 702 AssertMsgBreakStmt(uValue >= pPool->cMinThreads, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE); 703 pPool->cThreadsPushBackThreshold = (uint32_t)uValue; 704 } 705 break; 706 707 case RTREQPOOLCFGVAR_PUSH_BACK_MIN_MS: 708 AssertMsgBreakStmt(uValue <= RTREQPOOL_PUSH_BACK_MAX_MS, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE); 709 pPool->cMsMinPushBack = (uint32_t)uValue; 710 if (pPool->cMsMaxPushBack < pPool->cMsMinPushBack) 711 pPool->cMsMaxPushBack = pPool->cMsMinPushBack; 712 rtReqPoolRecalcPushBack(pPool); 713 break; 714 715 case RTREQPOOLCFGVAR_PUSH_BACK_MAX_MS: 716 AssertMsgBreakStmt(uValue <= RTREQPOOL_PUSH_BACK_MAX_MS, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE); 717 pPool->cMsMaxPushBack = (uint32_t)uValue; 718 if (pPool->cMsMinPushBack < pPool->cMsMaxPushBack) 719 pPool->cMsMinPushBack = pPool->cMsMaxPushBack; 720 rtReqPoolRecalcPushBack(pPool); 721 break; 722 723 case RTREQPOOLCFGVAR_MAX_FREE_REQUESTS: 724 if (uValue == UINT64_MAX) 725 { 726 pPool->cMaxFreeRequests = pPool->cMaxThreads * 2; 727 if (pPool->cMaxFreeRequests < 16) 728 pPool->cMaxFreeRequests = 16; 729 } 730 else 731 { 732 AssertMsgBreakStmt(uValue <= RTREQPOOL_MAX_FREE_REQUESTS, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE); 733 pPool->cMaxFreeRequests = (uint32_t)uValue; 734 } 735 736 while (pPool->cCurFreeRequests > pPool->cMaxFreeRequests) 737 { 738 PRTREQINT pReq = pPool->pFreeRequests; 739 pPool->pFreeRequests = pReq->pNext; 740 ASMAtomicDecU32(&pPool->cCurFreeRequests); 741 rtReqFreeIt(pReq); 742 } 743 break; 744 745 default: 746 AssertFailed(); 747 rc = VERR_IPE_NOT_REACHED_DEFAULT_CASE; 748 } 749 750 /* Wake up all idle threads if required. */ 751 if (fWakeUpIdleThreads) 752 { 753 Assert(rc == VINF_SUCCESS); 754 PRTREQPOOLTHREAD pThread; 755 RTListForEach(&pPool->WorkerThreads, pThread, RTREQPOOLTHREAD, ListNode) 756 { 757 RTThreadUserSignal(pThread->hThread); 758 } 759 } 760 761 RTCritSectLeave(&pPool->CritSect); 762 763 return rc; 764 } 765 RT_EXPORT_SYMBOL(RTReqPoolSetCfgVar); 579 766 580 767 581 768 RTDECL(int) RTReqPoolQueryCfgVar(RTREQPOOL hPool, RTREQPOOLCFGVAR enmVar, uint64_t *puValue) 582 769 { 583 return VERR_NOT_SUPPORTED; 584 } 585 586 587 typedef enum RTREQPOOLSTAT 588 { 589 RTREQPOOLSTAT_INVALID = 0, 590 RTREQPOOLSTAT_END, 591 RTREQPOOLSTAT_32BIT_HACK = 0x7fffffff 592 } RTREQPOOLSTAT; 770 PRTREQPOOLINT pPool = hPool; 771 AssertPtrReturn(pPool, VERR_INVALID_HANDLE); 772 AssertReturn(pPool->u32Magic == RTREQPOOL_MAGIC, VERR_INVALID_HANDLE); 773 AssertReturn(enmVar > RTREQPOOLCFGVAR_INVALID && enmVar < RTREQPOOLCFGVAR_END, VERR_INVALID_PARAMETER); 774 775 RTCritSectEnter(&pPool->CritSect); 776 777 int rc = VINF_SUCCESS; 778 switch (enmVar) 779 { 780 case RTREQPOOLCFGVAR_THREAD_TYPE: 781 *puValue = pPool->enmThreadType; 782 break; 783 784 case RTREQPOOLCFGVAR_MIN_THREADS: 785 *puValue = pPool->cMinThreads; 786 break; 787 788 case RTREQPOOLCFGVAR_MAX_THREADS: 789 *puValue = pPool->cMaxThreads; 790 break; 791 792 case RTREQPOOLCFGVAR_MS_MIN_IDLE: 793 *puValue = pPool->cMsMinIdle; 794 break; 795 796 case RTREQPOOLCFGVAR_MS_IDLE_SLEEP: 797 *puValue = pPool->cMsIdleSleep; 798 break; 799 800 case RTREQPOOLCFGVAR_PUSH_BACK_THRESHOLD: 801 *puValue = pPool->cThreadsPushBackThreshold; 802 break; 803 804 case RTREQPOOLCFGVAR_PUSH_BACK_MIN_MS: 805 *puValue = pPool->cMsMinPushBack; 806 break; 807 808 case RTREQPOOLCFGVAR_PUSH_BACK_MAX_MS: 809 *puValue = pPool->cMsMaxPushBack; 810 break; 811 812 case RTREQPOOLCFGVAR_MAX_FREE_REQUESTS: 813 *puValue = pPool->cMaxFreeRequests; 814 break; 815 816 default: 817 AssertFailed(); 818 rc = VERR_IPE_NOT_REACHED_DEFAULT_CASE; 819 *puValue = UINT64_MAX; 820 break; 821 } 822 823 RTCritSectLeave(&pPool->CritSect); 824 825 return rc; 826 } 827 RT_EXPORT_SYMBOL(RTReqPoolQueryCfgVar); 593 828 594 829 595 830 RTDECL(uint64_t) RTReqPoolGetStat(RTREQPOOL hPool, RTREQPOOLSTAT enmStat) 596 831 { 597 return UINT64_MAX; 598 } 599 600 #endif /* later */ 832 PRTREQPOOLINT pPool = hPool; 833 AssertPtrReturn(pPool, UINT64_MAX); 834 AssertReturn(pPool->u32Magic == RTREQPOOL_MAGIC, UINT64_MAX); 835 AssertReturn(enmStat > RTREQPOOLSTAT_INVALID && enmStat < RTREQPOOLSTAT_END, UINT64_MAX); 836 837 RTCritSectEnter(&pPool->CritSect); 838 839 uint64_t u64; 840 PRTREQPOOLTHREAD pThread; 841 switch (enmStat) 842 { 843 case RTREQPOOLSTAT_THREADS: u64 = pPool->cCurThreads; break; 844 case RTREQPOOLSTAT_THREADS_CREATED: u64 = pPool->cThreadsCreated; break; 845 case RTREQPOOLSTAT_REQUESTS_PROCESSED: u64 = pPool->cReqProcessed; break; 846 case RTREQPOOLSTAT_REQUESTS_SUBMITTED: u64 = pPool->cReqSubmitted; break; 847 case RTREQPOOLSTAT_REQUESTS_PENDING: u64 = pPool->cCurPendingRequests; break; 848 case RTREQPOOLSTAT_REQUESTS_ACTIVE: u64 = pPool->cCurActiveRequests; break; 849 case RTREQPOOLSTAT_REQUESTS_FREE: u64 = pPool->cCurFreeRequests; break; 850 case RTREQPOOLSTAT_NS_TOTAL_REQ_PROCESSING: u64 = pPool->cNsTotalReqProcessing; break; 851 case RTREQPOOLSTAT_NS_TOTAL_REQ_QUEUED: u64 = pPool->cNsTotalReqQueued; break; 852 case RTREQPOOLSTAT_NS_AVERAGE_REQ_PROCESSING: u64 = pPool->cNsTotalReqProcessing / RT_MAX(pPool->cReqProcessed, 1); break; 853 case RTREQPOOLSTAT_NS_AVERAGE_REQ_QUEUED: u64 = pPool->cNsTotalReqQueued / RT_MAX(pPool->cReqProcessed, 1); break; 854 default: 855 AssertFailed(); 856 u64 = UINT64_MAX; 857 break; 858 } 859 860 RTCritSectLeave(&pPool->CritSect); 861 862 return u64; 863 } 864 RT_EXPORT_SYMBOL(RTReqPoolGetStat); 865 601 866 602 867 RTDECL(uint32_t) RTReqPoolRetain(RTREQPOOL hPool) … … 653 918 } 654 919 pPool->ppPendingRequests = NULL; 920 pPool->cCurPendingRequests = 0; 655 921 656 922 /* Wait for the workers to shut down. */
Note:
See TracChangeset
for help on using the changeset viewer.