VirtualBox

source: vbox/trunk/src/VBox/Runtime/common/misc/reqpool.cpp@ 39621

Last change on this file since 39621 was 39621, checked in by vboxsync, 13 years ago

Removed unused variable (c&p).

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
File size: 33.4 KB
Line 
1/* $Id: reqpool.cpp 39621 2011-12-15 09:56:54Z vboxsync $ */
2/** @file
3 * IPRT - Request Pool.
4 */
5
6/*
7 * Copyright (C) 2006-2011 Oracle Corporation
8 *
9 * This file is part of VirtualBox Open Source Edition (OSE), as
10 * available from http://www.virtualbox.org. This file is free software;
11 * you can redistribute it and/or modify it under the terms of the GNU
12 * General Public License (GPL) as published by the Free Software
13 * Foundation, in version 2 as it comes in the "COPYING" file of the
14 * VirtualBox OSE distribution. VirtualBox OSE is distributed in the
15 * hope that it will be useful, but WITHOUT ANY WARRANTY of any kind.
16 *
17 * The contents of this file may alternatively be used under the terms
18 * of the Common Development and Distribution License Version 1.0
19 * (CDDL) only, as it comes in the "COPYING.CDDL" file of the
20 * VirtualBox OSE distribution, in which case the provisions of the
21 * CDDL are applicable instead of those of the GPL.
22 *
23 * You may elect to license modified versions of this file under the
24 * terms and conditions of either the GPL or the CDDL or both.
25 */
26
27
28/*******************************************************************************
29* Header Files *
30*******************************************************************************/
31#include <iprt/req.h>
32#include "internal/iprt.h"
33
34#include <iprt/assert.h>
35#include <iprt/asm.h>
36#include <iprt/critsect.h>
37#include <iprt/list.h>
38#include <iprt/log.h>
39#include <iprt/mem.h>
40#include <iprt/string.h>
41#include <iprt/time.h>
42#include <iprt/semaphore.h>
43#include <iprt/thread.h>
44
45#include "internal/req.h"
46#include "internal/magics.h"
47
48
49/*******************************************************************************
50* Defined Constants And Macros *
51*******************************************************************************/
52/** The max number of worker threads. */
53#define RTREQPOOL_MAX_THREADS UINT32_C(16384)
54/** The max number of milliseconds to push back. */
55#define RTREQPOOL_PUSH_BACK_MAX_MS RT_MS_1MIN
56/** The max number of free requests to keep around. */
57#define RTREQPOOL_MAX_FREE_REQUESTS (RTREQPOOL_MAX_THREADS * 2U)
58
59
60/*******************************************************************************
61* Structures and Typedefs *
62*******************************************************************************/
63typedef struct RTREQPOOLTHREAD
64{
65 /** Node in the RTREQPOOLINT::IdleThreads list. */
66 RTLISTNODE IdleNode;
67 /** Node in the RTREQPOOLINT::WorkerThreads list. */
68 RTLISTNODE ListNode;
69
70 /** The submit timestamp of the pending request. */
71 uint64_t uPendingNanoTs;
72 /** The submit timestamp of the request processing. */
73 uint64_t uProcessingNanoTs;
74 /** When this CPU went idle the last time. */
75 uint64_t uIdleNanoTs;
76 /** The number of requests processed by this thread. */
77 uint64_t cReqProcessed;
78 /** Total time the requests processed by this thread took to process. */
79 uint64_t cNsTotalReqProcessing;
80 /** Total time the requests processed by this thread had to wait in
81 * the queue before being scheduled. */
82 uint64_t cNsTotalReqQueued;
83 /** The CPU this was scheduled last time we checked. */
84 RTCPUID idLastCpu;
85
86 /** The submitter will put an incoming request here when scheduling an idle
87 * thread. */
88 PRTREQINT volatile pTodoReq;
89 /** The request the thread is currently processing. */
90 PRTREQINT volatile pPendingReq;
91
92 /** The thread handle. */
93 RTTHREAD hThread;
94 /** Nano seconds timestamp representing the birth time of the thread. */
95 uint64_t uBirthNanoTs;
96 /** Pointer to the request thread pool instance the thread is associated
97 * with. */
98 struct RTREQPOOLINT *pPool;
99} RTREQPOOLTHREAD;
100/** Pointer to a worker thread. */
101typedef RTREQPOOLTHREAD *PRTREQPOOLTHREAD;
102
103/**
104 * Request thread pool instance data.
105 */
106typedef struct RTREQPOOLINT
107{
108 /** Magic value (RTREQPOOL_MAGIC). */
109 uint32_t u32Magic;
110
111 /** @name Config
112 * @{ */
113 /** The worker thread type. */
114 RTTHREADTYPE enmThreadType;
115 /** The maximum number of worker threads. */
116 uint32_t cMaxThreads;
117 /** The minimum number of worker threads. */
118 uint32_t cMinThreads;
119 /** The number of milliseconds a thread needs to be idle before it is
120 * considered for retirement. */
121 uint32_t cMsMinIdle;
122 /** cMsMinIdle in nano seconds. */
123 uint64_t cNsMinIdle;
124 /** The idle thread sleep interval in milliseconds. */
125 RTMSINTERVAL cMsIdleSleep;
126 /** The number of threads which should be spawned before throttling kicks
127 * in. */
128 uint32_t cThreadsPushBackThreshold;
129 /** The max number of milliseconds to push back a submitter before creating
130 * a new worker thread once the threshold has been reached. */
131 uint32_t cMsMaxPushBack;
132 /** The minimum number of milliseconds to push back a submitter before
133 * creating a new worker thread once the threshold has been reached. */
134 uint32_t cMsMinPushBack;
135 /** The max number of free requests in the recycle LIFO. */
136 uint32_t cMaxFreeRequests;
137 /** @} */
138
139 /** Signaled by terminating worker threads. */
140 RTSEMEVENTMULTI hThreadTermEvt;
141
142 /** Destruction indicator. The worker threads checks in their loop. */
143 bool volatile fDestructing;
144
145 /** The current submitter push back in milliseconds.
146 * This is recalculated when worker threads come and go. */
147 uint32_t cMsCurPushBack;
148 /** The current number of worker threads. */
149 uint32_t cCurThreads;
150 /** Statistics: The total number of threads created. */
151 uint32_t cThreadsCreated;
152 /** Statistics: The timestamp when the last thread was created. */
153 uint64_t uLastThreadCreateNanoTs;
154 /** Linked list of worker threads. */
155 RTLISTANCHOR WorkerThreads;
156
157 /** The number of requests processed and counted in the time totals. */
158 uint64_t cReqProcessed;
159 /** Total time the requests processed by this thread took to process. */
160 uint64_t cNsTotalReqProcessing;
161 /** Total time the requests processed by this thread had to wait in
162 * the queue before being scheduled. */
163 uint64_t cNsTotalReqQueued;
164
165 /** Reference counter. */
166 uint32_t volatile cRefs;
167 /** The number of idle thread or threads in the process of becoming
168 * idle. This is increased before the to-be-idle thread tries to enter
169 * the critical section and add itself to the list. */
170 uint32_t volatile cIdleThreads;
171 /** Linked list of idle threads. */
172 RTLISTANCHOR IdleThreads;
173
174 /** Head of the request FIFO. */
175 PRTREQINT pPendingRequests;
176 /** Where to insert the next request. */
177 PRTREQINT *ppPendingRequests;
178 /** The number of requests currently pending. */
179 uint32_t cCurPendingRequests;
180 /** The number of requests currently being executed. */
181 uint32_t volatile cCurActiveRequests;
182 /** The number of requests submitted. */
183 uint64_t cReqSubmitted;
184
185 /** Head of the request recycling LIFO. */
186 PRTREQINT pFreeRequests;
187 /** The number of requests in the recycling LIFO. This is read without
188 * entering the critical section, thus volatile. */
189 uint32_t volatile cCurFreeRequests;
190
191 /** Critical section serializing access to members of this structure. */
192 RTCRITSECT CritSect;
193
194} RTREQPOOLINT;
195
196
197/**
198 * Used by exiting thread and the pool destruction code to cancel unexpected
199 * requests.
200 *
201 * @param pReq The request.
202 */
203static void rtReqPoolCancelReq(PRTREQINT pReq)
204{
205 pReq->uOwner.hPool = NIL_RTREQPOOL; /* force free */
206 pReq->enmState = RTREQSTATE_COMPLETED;
207 ASMAtomicWriteS32(&pReq->iStatusX, VERR_CANCELLED);
208 if (pReq->hPushBackEvt != NIL_RTSEMEVENTMULTI)
209 RTSemEventMultiSignal(pReq->hPushBackEvt);
210 RTSemEventSignal(pReq->EventSem);
211
212 RTReqRelease(pReq);
213}
214
215
216/**
217 * Recalculate the max pushback interval when adding or removing worker threads.
218 *
219 * @param pPool The pool. cMsCurPushBack will be changed.
220 */
221static void rtReqPoolRecalcPushBack(PRTREQPOOLINT pPool)
222{
223 uint32_t const cMsRange = pPool->cMsMaxPushBack - pPool->cMsMinPushBack;
224 uint32_t const cSteps = pPool->cMaxThreads - pPool->cThreadsPushBackThreshold;
225 uint32_t const iStep = pPool->cCurThreads - pPool->cThreadsPushBackThreshold;
226
227 uint32_t cMsCurPushBack;
228 if ((cMsRange >> 2) >= cSteps)
229 cMsCurPushBack = cMsRange / cSteps * iStep;
230 else
231 cMsCurPushBack = (uint32_t)( (uint64_t)cMsRange * RT_NS_1MS / cSteps * iStep / RT_NS_1MS );
232 cMsCurPushBack += pPool->cMsMinPushBack;
233
234 pPool->cMsCurPushBack = cMsCurPushBack;
235}
236
237
238
239/**
240 * Performs thread exit.
241 *
242 * @returns Thread termination status code (VINF_SUCCESS).
243 * @param pPool The pool.
244 * @param pThread The thread.
245 * @param fLocked Whether we are inside the critical section
246 * already.
247 */
248static int rtReqPoolThreadExit(PRTREQPOOLINT pPool, PRTREQPOOLTHREAD pThread, bool fLocked)
249{
250 if (!fLocked)
251 RTCritSectEnter(&pPool->CritSect);
252
253 /* Get out of the idle list. */
254 if (!RTListIsEmpty(&pThread->IdleNode))
255 {
256 RTListNodeRemove(&pThread->IdleNode);
257 Assert(pPool->cIdleThreads > 0);
258 ASMAtomicDecU32(&pPool->cIdleThreads);
259 }
260
261 /* Get out of the thread list. */
262 RTListNodeRemove(&pThread->ListNode);
263 Assert(pPool->cCurThreads > 0);
264 pPool->cCurThreads--;
265 rtReqPoolRecalcPushBack(pPool);
266
267 /* This shouldn't happen... */
268 PRTREQINT pReq = pThread->pTodoReq;
269 if (pReq)
270 {
271 AssertFailed();
272 pThread->pTodoReq = NULL;
273 rtReqPoolCancelReq(pReq);
274 }
275
276 /* If we're the last thread terminating, ping the destruction thread before
277 we leave the critical section. */
278 if ( RTListIsEmpty(&pPool->WorkerThreads)
279 && pPool->hThreadTermEvt != NIL_RTSEMEVENT)
280 RTSemEventMultiSignal(pPool->hThreadTermEvt);
281
282 RTCritSectLeave(&pPool->CritSect);
283
284 return VINF_SUCCESS;
285}
286
287
288
289/**
290 * Process one request.
291 *
292 * @param pPool The pool.
293 * @param pThread The worker thread.
294 * @param pReq The request to process.
295 */
296static void rtReqPoolThreadProcessRequest(PRTREQPOOLINT pPool, PRTREQPOOLTHREAD pThread, PRTREQINT pReq)
297{
298 /*
299 * Update thread state.
300 */
301 pThread->uProcessingNanoTs = RTTimeNanoTS();
302 pThread->uPendingNanoTs = pReq->uSubmitNanoTs;
303 pThread->pPendingReq = pReq;
304 ASMAtomicIncU32(&pPool->cCurActiveRequests);
305 Assert(pReq->u32Magic == RTREQ_MAGIC);
306
307 /*
308 * Do the actual processing.
309 */
310 rtReqProcessOne(pReq);
311
312 /*
313 * Update thread statistics and state.
314 */
315 ASMAtomicDecU32(&pPool->cCurActiveRequests);
316 pThread->pPendingReq = NULL;
317 uint64_t const uNsTsEnd = RTTimeNanoTS();
318 pThread->cNsTotalReqProcessing += uNsTsEnd - pThread->uProcessingNanoTs;
319 pThread->cNsTotalReqQueued += uNsTsEnd - pThread->uPendingNanoTs;
320 pThread->cReqProcessed++;
321}
322
323
324
325/**
326 * The Worker Thread Procedure.
327 *
328 * @returns VINF_SUCCESS.
329 * @param hThreadSelf The thread handle (unused).
330 * @param pvArg Pointer to the thread data.
331 */
332static DECLCALLBACK(int) rtReqPoolThreadProc(RTTHREAD hThreadSelf, void *pvArg)
333{
334 PRTREQPOOLTHREAD pThread = (PRTREQPOOLTHREAD)pvArg;
335 PRTREQPOOLINT pPool = pThread->pPool;
336
337 /*
338 * The work loop.
339 */
340 uint64_t cReqPrevProcessedIdle = UINT64_MAX;
341 uint64_t cReqPrevProcessedStat = 0;
342 uint64_t cNsPrevTotalReqProcessing = 0;
343 uint64_t cNsPrevTotalReqQueued = 0;
344 while (!pPool->fDestructing)
345 {
346 /*
347 * Process pending work.
348 */
349
350 /* Check if anything is scheduled directly to us. */
351 PRTREQINT pReq = ASMAtomicXchgPtrT(&pThread->pTodoReq, NULL, PRTREQINT);
352 if (pReq)
353 {
354 Assert(RTListIsEmpty(&pThread->IdleNode)); /* Must not be in the idle list. */
355 rtReqPoolThreadProcessRequest(pPool, pThread, pReq);
356 continue;
357 }
358
359 ASMAtomicIncU32(&pPool->cIdleThreads);
360 RTCritSectEnter(&pPool->CritSect);
361
362 /* Update the global statistics. */
363 if (cReqPrevProcessedStat != pThread->cReqProcessed)
364 {
365 pPool->cReqProcessed = pThread->cReqProcessed - cReqPrevProcessedStat;
366 cReqPrevProcessedStat = pThread->cReqProcessed;
367 pPool->cNsTotalReqProcessing += pThread->cNsTotalReqProcessing - cNsPrevTotalReqProcessing;
368 cNsPrevTotalReqProcessing = pThread->cNsTotalReqProcessing;
369 pPool->cNsTotalReqQueued += pThread->cNsTotalReqQueued - cNsPrevTotalReqQueued;
370 cNsPrevTotalReqQueued = pThread->cNsTotalReqQueued;
371 }
372
373 /* Recheck the todo request pointer after entering the critsect. */
374 pReq = ASMAtomicXchgPtrT(&pThread->pTodoReq, NULL, PRTREQINT);
375 if (pReq)
376 {
377 Assert(RTListIsEmpty(&pThread->IdleNode)); /* Must not be in the idle list. */
378 RTCritSectLeave(&pPool->CritSect);
379
380 rtReqPoolThreadProcessRequest(pPool, pThread, pReq);
381 continue;
382 }
383
384 /* Any pending requests in the queue? */
385 pReq = pPool->pPendingRequests;
386 if (pReq)
387 {
388 pPool->pPendingRequests = pReq->pNext;
389 if (pReq->pNext == NULL)
390 pPool->ppPendingRequests = &pPool->pPendingRequests;
391 Assert(pPool->cCurPendingRequests > 0);
392 pPool->cCurPendingRequests--;
393
394 /* Un-idle ourselves and process the request. */
395 if (!RTListIsEmpty(&pThread->IdleNode))
396 {
397 RTListNodeRemove(&pThread->IdleNode);
398 RTListInit(&pThread->IdleNode);
399 ASMAtomicDecU32(&pPool->cIdleThreads);
400 }
401 ASMAtomicDecU32(&pPool->cIdleThreads);
402 RTCritSectLeave(&pPool->CritSect);
403
404 rtReqPoolThreadProcessRequest(pPool, pThread, pReq);
405 continue;
406 }
407
408 /*
409 * Nothing to do, go idle.
410 */
411 if (cReqPrevProcessedIdle != pThread->cReqProcessed)
412 {
413 cReqPrevProcessedIdle = pThread->cReqProcessed;
414 pThread->uIdleNanoTs = RTTimeNanoTS();
415 }
416 else if (pPool->cCurThreads > pPool->cMinThreads)
417 {
418 uint64_t cNsIdle = RTTimeNanoTS() - pThread->uIdleNanoTs;
419 if (cNsIdle >= pPool->cNsMinIdle)
420 return rtReqPoolThreadExit(pPool, pThread, true /*fLocked*/);
421 }
422
423 if (RTListIsEmpty(&pThread->IdleNode))
424 RTListPrepend(&pPool->IdleThreads, &pThread->IdleNode);
425 else
426 ASMAtomicDecU32(&pPool->cIdleThreads);
427 RTThreadUserReset(hThreadSelf);
428 uint32_t const cMsSleep = pPool->cMsIdleSleep;
429
430 RTCritSectLeave(&pPool->CritSect);
431
432 RTThreadUserWait(hThreadSelf, cMsSleep);
433 }
434
435 return rtReqPoolThreadExit(pPool, pThread, false /*fLocked*/);
436}
437
438
439/**
440 * Create a new worker thread.
441 *
442 * @param pPool The pool needing new worker thread.
443 * @remarks Caller owns the critical section
444 */
445static void rtReqPoolCreateNewWorker(RTREQPOOL pPool)
446{
447 PRTREQPOOLTHREAD pThread = (PRTREQPOOLTHREAD)RTMemAllocZ(sizeof(RTREQPOOLTHREAD));
448 if (!pThread)
449 return;
450
451 pThread->uBirthNanoTs = RTTimeNanoTS();
452 pThread->pPool = pPool;
453 pThread->idLastCpu = NIL_RTCPUID;
454 pThread->hThread = NIL_RTTHREAD;
455 RTListInit(&pThread->IdleNode);
456 RTListAppend(&pPool->WorkerThreads, &pThread->ListNode);
457 pPool->cCurThreads++;
458 pPool->cThreadsCreated++;
459
460 static uint32_t s_idThread = 0;
461 int rc = RTThreadCreateF(&pThread->hThread, rtReqPoolThreadProc, pThread, 0 /*default stack size*/,
462 pPool->enmThreadType, 0 /*fFlags*/, "REQPT%02u", ++s_idThread);
463 if (RT_SUCCESS(rc))
464 pPool->uLastThreadCreateNanoTs = pThread->uBirthNanoTs;
465 else
466 {
467 pPool->cCurThreads--;
468 RTListNodeRemove(&pThread->ListNode);
469 RTMemFree(pThread);
470 }
471}
472
473
474/**
475 * Repel the submitter, giving the worker threads a chance to process the
476 * incoming request.
477 *
478 * @returns Success if a worker picked up the request, failure if not. The
479 * critical section has been left on success, while we'll be inside it
480 * on failure.
481 * @param pPool The pool.
482 * @param pReq The incoming request.
483 */
484static int rtReqPoolPushBack(PRTREQPOOLINT pPool, PRTREQINT pReq)
485{
486 /*
487 * Lazily create the push back semaphore that we'll be blociing on.
488 */
489 int rc;
490 RTSEMEVENTMULTI hEvt = pReq->hPushBackEvt;
491 if (hEvt == NIL_RTSEMEVENTMULTI)
492 {
493 rc = RTSemEventMultiCreate(&hEvt);
494 if (RT_FAILURE(rc))
495 return rc;
496 pReq->hPushBackEvt = hEvt;
497 }
498
499 /*
500 * Prepare the request and semaphore.
501 */
502 uint32_t const cMsTimeout = pPool->cMsCurPushBack;
503 pReq->fSignalPushBack = true;
504 RTReqRetain(pReq);
505 RTSemEventMultiReset(hEvt);
506
507 RTCritSectLeave(&pPool->CritSect);
508
509 /*
510 * Block.
511 */
512 rc = RTSemEventMultiWait(hEvt, cMsTimeout);
513 if (RT_FAILURE(rc))
514 {
515 AssertMsg(rc == VERR_TIMEOUT, ("%Rrc\n", rc));
516 RTCritSectEnter(&pPool->CritSect);
517 }
518 RTReqRelease(pReq);
519 return rc;
520}
521
522
523
524DECLHIDDEN(void) rtReqPoolSubmit(PRTREQPOOLINT pPool, PRTREQINT pReq)
525{
526 RTCritSectEnter(&pPool->CritSect);
527
528 pPool->cReqSubmitted++;
529
530 /*
531 * Try schedule the request to a thread that's currently idle.
532 */
533 PRTREQPOOLTHREAD pThread = RTListGetFirst(&pPool->IdleThreads, RTREQPOOLTHREAD, IdleNode);
534 if (pThread)
535 {
536 /** @todo CPU affinity??? */
537 ASMAtomicWritePtr(&pThread->pTodoReq, pReq);
538
539 RTListNodeRemove(&pThread->IdleNode);
540 RTListInit(&pThread->IdleNode);
541 ASMAtomicDecU32(&pPool->cIdleThreads);
542
543 RTThreadUserSignal(pThread->hThread);
544
545 RTCritSectLeave(&pPool->CritSect);
546 return;
547 }
548 Assert(RTListIsEmpty(&pPool->IdleThreads));
549
550 /*
551 * Put the request in the pending queue.
552 */
553 pReq->pNext = NULL;
554 *pPool->ppPendingRequests = pReq;
555 pPool->ppPendingRequests = (PRTREQINT*)&pReq->pNext;
556 pPool->cCurPendingRequests++;
557
558 /*
559 * If there is an incoming worker thread already or we've reached the
560 * maximum number of worker threads, we're done.
561 */
562 if ( pPool->cIdleThreads > 0
563 || pPool->cCurThreads >= pPool->cMaxThreads)
564 {
565 RTCritSectLeave(&pPool->CritSect);
566 return;
567 }
568
569 /*
570 * Push back before creating a new worker thread.
571 */
572 if ( pPool->cCurThreads > pPool->cThreadsPushBackThreshold
573 && (RTTimeNanoTS() - pReq->uSubmitNanoTs) / RT_NS_1MS >= pPool->cMsCurPushBack )
574 {
575 int rc = rtReqPoolPushBack(pPool, pReq);
576 if (RT_SUCCESS(rc))
577 return;
578 }
579
580 /*
581 * Create a new thread for processing the request.
582 * For simplicity, we don't bother leaving the critical section while doing so.
583 */
584 rtReqPoolCreateNewWorker(pPool);
585
586 RTCritSectLeave(&pPool->CritSect);
587 return;
588}
589
590
591/**
592 * Frees a requst.
593 *
594 * @returns true if recycled, false if not.
595 * @param pPool The request thread pool.
596 * @param pReq The request.
597 */
598DECLHIDDEN(bool) rtReqPoolRecycle(PRTREQPOOLINT pPool, PRTREQINT pReq)
599{
600 if ( pPool
601 && ASMAtomicReadU32(&pPool->cCurFreeRequests) < pPool->cMaxFreeRequests)
602 {
603 RTCritSectEnter(&pPool->CritSect);
604 if (pPool->cCurFreeRequests < pPool->cMaxFreeRequests)
605 {
606 pReq->pNext = pPool->pFreeRequests;
607 pPool->pFreeRequests = pReq;
608 ASMAtomicIncU32(&pPool->cCurFreeRequests);
609
610 RTCritSectLeave(&pPool->CritSect);
611 return true;
612 }
613
614 RTCritSectLeave(&pPool->CritSect);
615 }
616 return false;
617}
618
619
620RTDECL(int) RTReqPoolSetCfgVar(RTREQPOOL hPool, RTREQPOOLCFGVAR enmVar, uint64_t uValue)
621{
622 PRTREQPOOLINT pPool = hPool;
623 AssertPtrReturn(pPool, VERR_INVALID_HANDLE);
624 AssertReturn(pPool->u32Magic == RTREQPOOL_MAGIC, VERR_INVALID_HANDLE);
625 AssertReturn(enmVar > RTREQPOOLCFGVAR_INVALID && enmVar < RTREQPOOLCFGVAR_END, VERR_INVALID_PARAMETER);
626
627 RTCritSectEnter(&pPool->CritSect);
628
629 bool fWakeUpIdleThreads = false;
630 int rc = VINF_SUCCESS;
631 switch (enmVar)
632 {
633 case RTREQPOOLCFGVAR_THREAD_TYPE:
634 AssertMsgBreakStmt(uValue > (uint64_t)RTTHREADTYPE_INVALID && uValue < (uint64_t)RTTHREADTYPE_END,
635 ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
636
637 pPool->enmThreadType = (RTTHREADTYPE)uValue;
638 break;
639
640 case RTREQPOOLCFGVAR_MIN_THREADS:
641 AssertMsgBreakStmt(uValue <= RTREQPOOL_MAX_THREADS, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
642 fWakeUpIdleThreads = pPool->cMinThreads > (uint32_t)uValue;
643 pPool->cMinThreads = (uint32_t)uValue;
644 if (pPool->cMinThreads > pPool->cMaxThreads)
645 pPool->cMaxThreads = pPool->cMinThreads;
646 if ( pPool->cThreadsPushBackThreshold < pPool->cMinThreads
647 || pPool->cThreadsPushBackThreshold > pPool->cMaxThreads)
648 pPool->cThreadsPushBackThreshold = pPool->cMinThreads + (pPool->cMaxThreads - pPool->cMinThreads) / 2;
649 rtReqPoolRecalcPushBack(pPool);
650 break;
651
652 case RTREQPOOLCFGVAR_MAX_THREADS:
653 AssertMsgBreakStmt(uValue <= RTREQPOOL_MAX_THREADS && uValue >= 1, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
654 pPool->cMaxThreads = (uint32_t)uValue;
655 if (pPool->cMaxThreads < pPool->cMinThreads)
656 {
657 pPool->cMinThreads = pPool->cMaxThreads;
658 fWakeUpIdleThreads = true;
659 }
660 if (pPool->cMaxThreads < pPool->cThreadsPushBackThreshold)
661 pPool->cThreadsPushBackThreshold = pPool->cMinThreads + (pPool->cMaxThreads - pPool->cMinThreads) / 2;
662 rtReqPoolRecalcPushBack(pPool);
663 break;
664
665 case RTREQPOOLCFGVAR_MS_MIN_IDLE:
666 AssertMsgBreakStmt(uValue < UINT32_MAX || uValue == RT_INDEFINITE_WAIT, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
667 if (uValue < UINT32_MAX && uValue != RT_INDEFINITE_WAIT)
668 {
669 fWakeUpIdleThreads = pPool->cMsMinIdle != (uint32_t)uValue;
670 pPool->cMsMinIdle = (uint32_t)uValue;
671 pPool->cNsMinIdle = pPool->cMsMinIdle * RT_NS_1MS_64;
672 if (pPool->cMsIdleSleep > pPool->cMsMinIdle)
673 pPool->cMsIdleSleep = RT_MAX(RT_MS_1SEC, pPool->cMsMinIdle);
674 }
675 else
676 {
677 pPool->cMsMinIdle = UINT32_MAX;
678 pPool->cNsMinIdle = UINT64_MAX;
679 pPool->cMsIdleSleep = RT_INDEFINITE_WAIT;
680 }
681 break;
682
683 case RTREQPOOLCFGVAR_MS_IDLE_SLEEP:
684 AssertMsgBreakStmt(uValue <= RT_INDEFINITE_WAIT, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
685 fWakeUpIdleThreads = pPool->cMsMinIdle > (RTMSINTERVAL)uValue;
686 pPool->cMsIdleSleep = (RTMSINTERVAL)uValue;
687 if (pPool->cMsIdleSleep == RT_INDEFINITE_WAIT)
688 {
689 pPool->cMsMinIdle = UINT32_MAX;
690 pPool->cNsMinIdle = UINT64_MAX;
691 }
692 break;
693
694 case RTREQPOOLCFGVAR_PUSH_BACK_THRESHOLD:
695 if (uValue == UINT64_MAX)
696 pPool->cThreadsPushBackThreshold = pPool->cMaxThreads;
697 else if (uValue == 0)
698 pPool->cThreadsPushBackThreshold = pPool->cMinThreads;
699 else
700 {
701 AssertMsgBreakStmt(uValue <= pPool->cMaxThreads, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
702 AssertMsgBreakStmt(uValue >= pPool->cMinThreads, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
703 pPool->cThreadsPushBackThreshold = (uint32_t)uValue;
704 }
705 break;
706
707 case RTREQPOOLCFGVAR_PUSH_BACK_MIN_MS:
708 AssertMsgBreakStmt(uValue <= RTREQPOOL_PUSH_BACK_MAX_MS, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
709 pPool->cMsMinPushBack = (uint32_t)uValue;
710 if (pPool->cMsMaxPushBack < pPool->cMsMinPushBack)
711 pPool->cMsMaxPushBack = pPool->cMsMinPushBack;
712 rtReqPoolRecalcPushBack(pPool);
713 break;
714
715 case RTREQPOOLCFGVAR_PUSH_BACK_MAX_MS:
716 AssertMsgBreakStmt(uValue <= RTREQPOOL_PUSH_BACK_MAX_MS, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
717 pPool->cMsMaxPushBack = (uint32_t)uValue;
718 if (pPool->cMsMinPushBack < pPool->cMsMaxPushBack)
719 pPool->cMsMinPushBack = pPool->cMsMaxPushBack;
720 rtReqPoolRecalcPushBack(pPool);
721 break;
722
723 case RTREQPOOLCFGVAR_MAX_FREE_REQUESTS:
724 if (uValue == UINT64_MAX)
725 {
726 pPool->cMaxFreeRequests = pPool->cMaxThreads * 2;
727 if (pPool->cMaxFreeRequests < 16)
728 pPool->cMaxFreeRequests = 16;
729 }
730 else
731 {
732 AssertMsgBreakStmt(uValue <= RTREQPOOL_MAX_FREE_REQUESTS, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
733 pPool->cMaxFreeRequests = (uint32_t)uValue;
734 }
735
736 while (pPool->cCurFreeRequests > pPool->cMaxFreeRequests)
737 {
738 PRTREQINT pReq = pPool->pFreeRequests;
739 pPool->pFreeRequests = pReq->pNext;
740 ASMAtomicDecU32(&pPool->cCurFreeRequests);
741 rtReqFreeIt(pReq);
742 }
743 break;
744
745 default:
746 AssertFailed();
747 rc = VERR_IPE_NOT_REACHED_DEFAULT_CASE;
748 }
749
750 /* Wake up all idle threads if required. */
751 if (fWakeUpIdleThreads)
752 {
753 Assert(rc == VINF_SUCCESS);
754 PRTREQPOOLTHREAD pThread;
755 RTListForEach(&pPool->WorkerThreads, pThread, RTREQPOOLTHREAD, ListNode)
756 {
757 RTThreadUserSignal(pThread->hThread);
758 }
759 }
760
761 RTCritSectLeave(&pPool->CritSect);
762
763 return rc;
764}
765RT_EXPORT_SYMBOL(RTReqPoolSetCfgVar);
766
767
768RTDECL(int) RTReqPoolQueryCfgVar(RTREQPOOL hPool, RTREQPOOLCFGVAR enmVar, uint64_t *puValue)
769{
770 PRTREQPOOLINT pPool = hPool;
771 AssertPtrReturn(pPool, VERR_INVALID_HANDLE);
772 AssertReturn(pPool->u32Magic == RTREQPOOL_MAGIC, VERR_INVALID_HANDLE);
773 AssertReturn(enmVar > RTREQPOOLCFGVAR_INVALID && enmVar < RTREQPOOLCFGVAR_END, VERR_INVALID_PARAMETER);
774
775 RTCritSectEnter(&pPool->CritSect);
776
777 int rc = VINF_SUCCESS;
778 switch (enmVar)
779 {
780 case RTREQPOOLCFGVAR_THREAD_TYPE:
781 *puValue = pPool->enmThreadType;
782 break;
783
784 case RTREQPOOLCFGVAR_MIN_THREADS:
785 *puValue = pPool->cMinThreads;
786 break;
787
788 case RTREQPOOLCFGVAR_MAX_THREADS:
789 *puValue = pPool->cMaxThreads;
790 break;
791
792 case RTREQPOOLCFGVAR_MS_MIN_IDLE:
793 *puValue = pPool->cMsMinIdle;
794 break;
795
796 case RTREQPOOLCFGVAR_MS_IDLE_SLEEP:
797 *puValue = pPool->cMsIdleSleep;
798 break;
799
800 case RTREQPOOLCFGVAR_PUSH_BACK_THRESHOLD:
801 *puValue = pPool->cThreadsPushBackThreshold;
802 break;
803
804 case RTREQPOOLCFGVAR_PUSH_BACK_MIN_MS:
805 *puValue = pPool->cMsMinPushBack;
806 break;
807
808 case RTREQPOOLCFGVAR_PUSH_BACK_MAX_MS:
809 *puValue = pPool->cMsMaxPushBack;
810 break;
811
812 case RTREQPOOLCFGVAR_MAX_FREE_REQUESTS:
813 *puValue = pPool->cMaxFreeRequests;
814 break;
815
816 default:
817 AssertFailed();
818 rc = VERR_IPE_NOT_REACHED_DEFAULT_CASE;
819 *puValue = UINT64_MAX;
820 break;
821 }
822
823 RTCritSectLeave(&pPool->CritSect);
824
825 return rc;
826}
827RT_EXPORT_SYMBOL(RTReqPoolQueryCfgVar);
828
829
830RTDECL(uint64_t) RTReqPoolGetStat(RTREQPOOL hPool, RTREQPOOLSTAT enmStat)
831{
832 PRTREQPOOLINT pPool = hPool;
833 AssertPtrReturn(pPool, UINT64_MAX);
834 AssertReturn(pPool->u32Magic == RTREQPOOL_MAGIC, UINT64_MAX);
835 AssertReturn(enmStat > RTREQPOOLSTAT_INVALID && enmStat < RTREQPOOLSTAT_END, UINT64_MAX);
836
837 RTCritSectEnter(&pPool->CritSect);
838
839 uint64_t u64;
840 switch (enmStat)
841 {
842 case RTREQPOOLSTAT_THREADS: u64 = pPool->cCurThreads; break;
843 case RTREQPOOLSTAT_THREADS_CREATED: u64 = pPool->cThreadsCreated; break;
844 case RTREQPOOLSTAT_REQUESTS_PROCESSED: u64 = pPool->cReqProcessed; break;
845 case RTREQPOOLSTAT_REQUESTS_SUBMITTED: u64 = pPool->cReqSubmitted; break;
846 case RTREQPOOLSTAT_REQUESTS_PENDING: u64 = pPool->cCurPendingRequests; break;
847 case RTREQPOOLSTAT_REQUESTS_ACTIVE: u64 = pPool->cCurActiveRequests; break;
848 case RTREQPOOLSTAT_REQUESTS_FREE: u64 = pPool->cCurFreeRequests; break;
849 case RTREQPOOLSTAT_NS_TOTAL_REQ_PROCESSING: u64 = pPool->cNsTotalReqProcessing; break;
850 case RTREQPOOLSTAT_NS_TOTAL_REQ_QUEUED: u64 = pPool->cNsTotalReqQueued; break;
851 case RTREQPOOLSTAT_NS_AVERAGE_REQ_PROCESSING: u64 = pPool->cNsTotalReqProcessing / RT_MAX(pPool->cReqProcessed, 1); break;
852 case RTREQPOOLSTAT_NS_AVERAGE_REQ_QUEUED: u64 = pPool->cNsTotalReqQueued / RT_MAX(pPool->cReqProcessed, 1); break;
853 default:
854 AssertFailed();
855 u64 = UINT64_MAX;
856 break;
857 }
858
859 RTCritSectLeave(&pPool->CritSect);
860
861 return u64;
862}
863RT_EXPORT_SYMBOL(RTReqPoolGetStat);
864
865
866RTDECL(uint32_t) RTReqPoolRetain(RTREQPOOL hPool)
867{
868 PRTREQPOOLINT pPool = hPool;
869 AssertPtrReturn(pPool, UINT32_MAX);
870 AssertReturn(pPool->u32Magic == RTREQPOOL_MAGIC, UINT32_MAX);
871
872 return ASMAtomicIncU32(&pPool->cRefs);
873}
874RT_EXPORT_SYMBOL(RTReqPoolRetain);
875
876
877RTDECL(uint32_t) RTReqPoolRelease(RTREQPOOL hPool)
878{
879 /*
880 * Ignore NULL and validate the request.
881 */
882 if (!hPool)
883 return 0;
884 PRTREQPOOLINT pPool = hPool;
885 AssertPtrReturn(pPool, UINT32_MAX);
886 AssertReturn(pPool->u32Magic == RTREQPOOL_MAGIC, UINT32_MAX);
887
888 /*
889 * Drop a reference, free it when it reaches zero.
890 */
891 uint32_t cRefs = ASMAtomicDecU32(&pPool->cRefs);
892 if (cRefs == 0)
893 {
894 AssertReturn(ASMAtomicCmpXchgU32(&pPool->u32Magic, RTREQPOOL_MAGIC_DEAD, RTREQPOOL_MAGIC), UINT32_MAX);
895
896 RTCritSectEnter(&pPool->CritSect);
897#ifdef RT_STRICT
898 RTTHREAD const hSelf = RTThreadSelf();
899#endif
900
901 /* Indicate to the worker threads that we're shutting down. */
902 ASMAtomicWriteBool(&pPool->fDestructing, true);
903 PRTREQPOOLTHREAD pThread;
904 RTListForEach(&pPool->WorkerThreads, pThread, RTREQPOOLTHREAD, ListNode)
905 {
906 Assert(pThread->hThread != hSelf);
907 RTThreadUserSignal(pThread->hThread);
908 }
909
910 /* Cancel pending requests. */
911 Assert(!pPool->pPendingRequests);
912 while (pPool->pPendingRequests)
913 {
914 PRTREQINT pReq = pPool->pPendingRequests;
915 pPool->pPendingRequests = pReq->pNext;
916 rtReqPoolCancelReq(pReq);
917 }
918 pPool->ppPendingRequests = NULL;
919 pPool->cCurPendingRequests = 0;
920
921 /* Wait for the workers to shut down. */
922 while (!RTListIsEmpty(&pPool->WorkerThreads))
923 {
924 RTCritSectLeave(&pPool->CritSect);
925 RTSemEventMultiWait(pPool->hThreadTermEvt, RT_MS_1MIN);
926 RTCritSectEnter(&pPool->CritSect);
927 /** @todo should we wait forever here? */
928 }
929
930 /* Free recycled requests. */
931 for (;;)
932 {
933 PRTREQINT pReq = pPool->pFreeRequests;
934 if (!pReq)
935 break;
936 pPool->pFreeRequests = pReq->pNext;
937 pPool->cCurFreeRequests--;
938 rtReqFreeIt(pReq);
939 }
940
941 /* Finally, free the handle. */
942 RTMemFree(pPool);
943 }
944
945 return cRefs;
946}
947RT_EXPORT_SYMBOL(RTReqPoolRelease);
948
949
950RTDECL(int) RTReqPoolAlloc(RTREQPOOL hPool, RTREQTYPE enmType, PRTREQ *phReq)
951{
952 PRTREQPOOLINT pPool = hPool;
953 AssertPtrReturn(pPool, VERR_INVALID_HANDLE);
954 AssertReturn(pPool->u32Magic == RTREQPOOL_MAGIC, VERR_INVALID_HANDLE);
955
956 /*
957 * Try recycle old requests.
958 */
959 if (ASMAtomicReadU32(&pPool->cCurFreeRequests) > 0)
960 {
961 RTCritSectEnter(&pPool->CritSect);
962 PRTREQINT pReq = pPool->pFreeRequests;
963 if (pReq)
964 {
965 ASMAtomicDecU32(&pPool->cCurFreeRequests);
966 pPool->pFreeRequests = pReq->pNext;
967
968 RTCritSectLeave(&pPool->CritSect);
969
970 Assert(pReq->fPoolOrQueue);
971 Assert(pReq->uOwner.hPool == pPool);
972
973 int rc = rtReqReInit(pReq, enmType);
974 if (RT_SUCCESS(rc))
975 {
976 *phReq = pReq;
977 LogFlow(("RTReqPoolAlloc: returns VINF_SUCCESS *phReq=%p recycled\n", pReq));
978 return rc;
979 }
980 }
981 else
982 RTCritSectLeave(&pPool->CritSect);
983 }
984
985 /*
986 * Allocate a new request.
987 */
988 int rc = rtReqAlloc(enmType, true /*fPoolOrQueue*/, pPool, phReq);
989 LogFlow(("RTReqPoolAlloc: returns %Rrc *phReq=%p\n", rc, *phReq));
990 return VINF_SUCCESS;
991}
992RT_EXPORT_SYMBOL(RTReqPoolAlloc);
993
Note: See TracBrowser for help on using the repository browser.

© 2024 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette