VirtualBox

source: vbox/trunk/src/VBox/Runtime/common/misc/reqpool.cpp@ 96014

Last change on this file since 96014 was 93115, checked in by vboxsync, 3 years ago

scm --update-copyright-year

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
File size: 43.3 KB
Line 
1/* $Id: reqpool.cpp 93115 2022-01-01 11:31:46Z vboxsync $ */
2/** @file
3 * IPRT - Request Pool.
4 */
5
6/*
7 * Copyright (C) 2006-2022 Oracle Corporation
8 *
9 * This file is part of VirtualBox Open Source Edition (OSE), as
10 * available from http://www.virtualbox.org. This file is free software;
11 * you can redistribute it and/or modify it under the terms of the GNU
12 * General Public License (GPL) as published by the Free Software
13 * Foundation, in version 2 as it comes in the "COPYING" file of the
14 * VirtualBox OSE distribution. VirtualBox OSE is distributed in the
15 * hope that it will be useful, but WITHOUT ANY WARRANTY of any kind.
16 *
17 * The contents of this file may alternatively be used under the terms
18 * of the Common Development and Distribution License Version 1.0
19 * (CDDL) only, as it comes in the "COPYING.CDDL" file of the
20 * VirtualBox OSE distribution, in which case the provisions of the
21 * CDDL are applicable instead of those of the GPL.
22 *
23 * You may elect to license modified versions of this file under the
24 * terms and conditions of either the GPL or the CDDL or both.
25 */
26
27
28/*********************************************************************************************************************************
29* Header Files *
30*********************************************************************************************************************************/
31#include <iprt/req.h>
32#include "internal/iprt.h"
33
34#include <iprt/assert.h>
35#include <iprt/asm.h>
36#include <iprt/critsect.h>
37#include <iprt/err.h>
38#include <iprt/list.h>
39#include <iprt/log.h>
40#include <iprt/mem.h>
41#include <iprt/string.h>
42#include <iprt/time.h>
43#include <iprt/semaphore.h>
44#include <iprt/thread.h>
45
46#include "internal/req.h"
47#include "internal/magics.h"
48
49
50/*********************************************************************************************************************************
51* Defined Constants And Macros *
52*********************************************************************************************************************************/
53/** The max number of worker threads. */
54#define RTREQPOOL_MAX_THREADS UINT32_C(16384)
55/** The max number of milliseconds to push back. */
56#define RTREQPOOL_PUSH_BACK_MAX_MS RT_MS_1MIN
57/** The max number of free requests to keep around. */
58#define RTREQPOOL_MAX_FREE_REQUESTS (RTREQPOOL_MAX_THREADS * 2U)
59
60
61/*********************************************************************************************************************************
62* Structures and Typedefs *
63*********************************************************************************************************************************/
64typedef struct RTREQPOOLTHREAD
65{
66 /** Node in the RTREQPOOLINT::IdleThreads list. */
67 RTLISTNODE IdleNode;
68 /** Node in the RTREQPOOLINT::WorkerThreads list. */
69 RTLISTNODE ListNode;
70
71 /** The submit timestamp of the pending request. */
72 uint64_t uPendingNanoTs;
73 /** The submit timestamp of the request processing. */
74 uint64_t uProcessingNanoTs;
75 /** When this CPU went idle the last time. */
76 uint64_t uIdleNanoTs;
77 /** The number of requests processed by this thread. */
78 uint64_t cReqProcessed;
79 /** Total time the requests processed by this thread took to process. */
80 uint64_t cNsTotalReqProcessing;
81 /** Total time the requests processed by this thread had to wait in
82 * the queue before being scheduled. */
83 uint64_t cNsTotalReqQueued;
84 /** The CPU this was scheduled last time we checked. */
85 RTCPUID idLastCpu;
86
87 /** The submitter will put an incoming request here when scheduling an idle
88 * thread. */
89 PRTREQINT volatile pTodoReq;
90 /** The request the thread is currently processing. */
91 PRTREQINT volatile pPendingReq;
92
93 /** The thread handle. */
94 RTTHREAD hThread;
95 /** Nano seconds timestamp representing the birth time of the thread. */
96 uint64_t uBirthNanoTs;
97 /** Pointer to the request thread pool instance the thread is associated
98 * with. */
99 struct RTREQPOOLINT *pPool;
100} RTREQPOOLTHREAD;
101/** Pointer to a worker thread. */
102typedef RTREQPOOLTHREAD *PRTREQPOOLTHREAD;
103
104/**
105 * Request thread pool instance data.
106 */
107typedef struct RTREQPOOLINT
108{
109 /** Magic value (RTREQPOOL_MAGIC). */
110 uint32_t u32Magic;
111 /** The request pool name. */
112 char szName[12];
113
114 /** @name Config
115 * @{ */
116 /** The worker thread type. */
117 RTTHREADTYPE enmThreadType;
118 /** The work thread flags (RTTHREADFLAGS). */
119 uint32_t fThreadFlags;
120 /** The maximum number of worker threads. */
121 uint32_t cMaxThreads;
122 /** The minimum number of worker threads. */
123 uint32_t cMinThreads;
124 /** The number of milliseconds a thread needs to be idle before it is
125 * considered for retirement. */
126 uint32_t cMsMinIdle;
127 /** cMsMinIdle in nano seconds. */
128 uint64_t cNsMinIdle;
129 /** The idle thread sleep interval in milliseconds. */
130 RTMSINTERVAL cMsIdleSleep;
131 /** The number of threads which should be spawned before throttling kicks
132 * in. */
133 uint32_t cThreadsPushBackThreshold;
134 /** The max number of milliseconds to push back a submitter before creating
135 * a new worker thread once the threshold has been reached. */
136 uint32_t cMsMaxPushBack;
137 /** The minimum number of milliseconds to push back a submitter before
138 * creating a new worker thread once the threshold has been reached. */
139 uint32_t cMsMinPushBack;
140 /** The max number of free requests in the recycle LIFO. */
141 uint32_t cMaxFreeRequests;
142 /** @} */
143
144 /** Signaled by terminating worker threads. */
145 RTSEMEVENTMULTI hThreadTermEvt;
146
147 /** Destruction indicator. The worker threads checks in their loop. */
148 bool volatile fDestructing;
149
150 /** The current submitter push back in milliseconds.
151 * This is recalculated when worker threads come and go. */
152 uint32_t cMsCurPushBack;
153 /** The current number of worker threads. */
154 uint32_t cCurThreads;
155 /** Statistics: The total number of threads created. */
156 uint32_t cThreadsCreated;
157 /** Statistics: The timestamp when the last thread was created. */
158 uint64_t uLastThreadCreateNanoTs;
159 /** Linked list of worker threads. */
160 RTLISTANCHOR WorkerThreads;
161
162 /** The number of requests processed and counted in the time totals. */
163 uint64_t cReqProcessed;
164 /** Total time the requests processed by this thread took to process. */
165 uint64_t cNsTotalReqProcessing;
166 /** Total time the requests processed by this thread had to wait in
167 * the queue before being scheduled. */
168 uint64_t cNsTotalReqQueued;
169
170 /** Reference counter. */
171 uint32_t volatile cRefs;
172 /** The number of idle thread or threads in the process of becoming
173 * idle. This is increased before the to-be-idle thread tries to enter
174 * the critical section and add itself to the list. */
175 uint32_t volatile cIdleThreads;
176 /** Linked list of idle threads. */
177 RTLISTANCHOR IdleThreads;
178
179 /** Head of the request FIFO. */
180 PRTREQINT pPendingRequests;
181 /** Where to insert the next request. */
182 PRTREQINT *ppPendingRequests;
183 /** The number of requests currently pending. */
184 uint32_t cCurPendingRequests;
185 /** The number of requests currently being executed. */
186 uint32_t volatile cCurActiveRequests;
187 /** The number of requests submitted. */
188 uint64_t cReqSubmitted;
189 /** The number of cancelled. */
190 uint64_t cReqCancelled;
191
192 /** Head of the request recycling LIFO. */
193 PRTREQINT pFreeRequests;
194 /** The number of requests in the recycling LIFO. This is read without
195 * entering the critical section, thus volatile. */
196 uint32_t volatile cCurFreeRequests;
197
198 /** Critical section serializing access to members of this structure. */
199 RTCRITSECT CritSect;
200
201} RTREQPOOLINT;
202
203
204/**
205 * Used by exiting thread and the pool destruction code to cancel unexpected
206 * requests.
207 *
208 * @param pReq The request.
209 */
210static void rtReqPoolCancelReq(PRTREQINT pReq)
211{
212 pReq->uOwner.hPool = NIL_RTREQPOOL; /* force free */
213 pReq->enmState = RTREQSTATE_COMPLETED;
214 ASMAtomicWriteS32(&pReq->iStatusX, VERR_CANCELLED);
215 if (pReq->hPushBackEvt != NIL_RTSEMEVENTMULTI)
216 RTSemEventMultiSignal(pReq->hPushBackEvt);
217 RTSemEventSignal(pReq->EventSem);
218
219 RTReqRelease(pReq);
220}
221
222
223/**
224 * Recalculate the max pushback interval when adding or removing worker threads.
225 *
226 * @param pPool The pool. cMsCurPushBack will be changed.
227 */
228static void rtReqPoolRecalcPushBack(PRTREQPOOLINT pPool)
229{
230 uint32_t const cMsRange = pPool->cMsMaxPushBack - pPool->cMsMinPushBack;
231 uint32_t const cSteps = pPool->cMaxThreads - pPool->cThreadsPushBackThreshold;
232 uint32_t const iStep = pPool->cCurThreads - pPool->cThreadsPushBackThreshold;
233
234 uint32_t cMsCurPushBack;
235 if (cSteps == 0 /* disabled */)
236 cMsCurPushBack = 0;
237 else if ((cMsRange >> 2) >= cSteps)
238 cMsCurPushBack = cMsRange / cSteps * iStep;
239 else
240 cMsCurPushBack = (uint32_t)( (uint64_t)cMsRange * RT_NS_1MS / cSteps * iStep / RT_NS_1MS );
241 cMsCurPushBack += pPool->cMsMinPushBack;
242
243 pPool->cMsCurPushBack = cMsCurPushBack;
244}
245
246
247
248/**
249 * Performs thread exit.
250 *
251 * @returns Thread termination status code (VINF_SUCCESS).
252 * @param pPool The pool.
253 * @param pThread The thread.
254 * @param fLocked Whether we are inside the critical section
255 * already.
256 */
257static int rtReqPoolThreadExit(PRTREQPOOLINT pPool, PRTREQPOOLTHREAD pThread, bool fLocked)
258{
259 if (!fLocked)
260 RTCritSectEnter(&pPool->CritSect);
261
262 /* Get out of the idle list. */
263 if (!RTListIsEmpty(&pThread->IdleNode))
264 {
265 RTListNodeRemove(&pThread->IdleNode);
266 Assert(pPool->cIdleThreads > 0);
267 ASMAtomicDecU32(&pPool->cIdleThreads);
268 }
269
270 /* Get out of the thread list. */
271 RTListNodeRemove(&pThread->ListNode);
272 Assert(pPool->cCurThreads > 0);
273 pPool->cCurThreads--;
274 rtReqPoolRecalcPushBack(pPool);
275
276 /* This shouldn't happen... */
277 PRTREQINT pReq = pThread->pTodoReq;
278 if (pReq)
279 {
280 AssertFailed();
281 pThread->pTodoReq = NULL;
282 rtReqPoolCancelReq(pReq);
283 }
284
285 /* If we're the last thread terminating, ping the destruction thread before
286 we leave the critical section. */
287 if ( RTListIsEmpty(&pPool->WorkerThreads)
288 && pPool->hThreadTermEvt != NIL_RTSEMEVENT)
289 RTSemEventMultiSignal(pPool->hThreadTermEvt);
290
291 RTCritSectLeave(&pPool->CritSect);
292
293 RTMemFree(pThread);
294 return VINF_SUCCESS;
295}
296
297
298
299/**
300 * Process one request.
301 *
302 * @param pPool The pool.
303 * @param pThread The worker thread.
304 * @param pReq The request to process.
305 */
306static void rtReqPoolThreadProcessRequest(PRTREQPOOLINT pPool, PRTREQPOOLTHREAD pThread, PRTREQINT pReq)
307{
308 /*
309 * Update thread state.
310 */
311 pThread->uProcessingNanoTs = RTTimeNanoTS();
312 pThread->uPendingNanoTs = pReq->uSubmitNanoTs;
313 pThread->pPendingReq = pReq;
314 ASMAtomicIncU32(&pPool->cCurActiveRequests);
315 Assert(pReq->u32Magic == RTREQ_MAGIC);
316
317 /*
318 * Do the actual processing.
319 */
320 rtReqProcessOne(pReq);
321
322 /*
323 * Update thread statistics and state.
324 */
325 ASMAtomicDecU32(&pPool->cCurActiveRequests);
326 pThread->pPendingReq = NULL;
327 uint64_t const uNsTsEnd = RTTimeNanoTS();
328 pThread->cNsTotalReqProcessing += uNsTsEnd - pThread->uProcessingNanoTs;
329 pThread->cNsTotalReqQueued += pThread->uProcessingNanoTs - pThread->uPendingNanoTs;
330 pThread->cReqProcessed++;
331}
332
333
334
335/**
336 * The Worker Thread Procedure.
337 *
338 * @returns VINF_SUCCESS.
339 * @param hThreadSelf The thread handle (unused).
340 * @param pvArg Pointer to the thread data.
341 */
342static DECLCALLBACK(int) rtReqPoolThreadProc(RTTHREAD hThreadSelf, void *pvArg)
343{
344 PRTREQPOOLTHREAD pThread = (PRTREQPOOLTHREAD)pvArg;
345 PRTREQPOOLINT pPool = pThread->pPool;
346
347 /*
348 * The work loop.
349 */
350 uint64_t cReqPrevProcessedIdle = UINT64_MAX;
351 uint64_t cReqPrevProcessedStat = 0;
352 uint64_t cNsPrevTotalReqProcessing = 0;
353 uint64_t cNsPrevTotalReqQueued = 0;
354 while (!pPool->fDestructing)
355 {
356 /*
357 * Process pending work.
358 */
359
360 /* Check if anything is scheduled directly to us. */
361 PRTREQINT pReq = ASMAtomicXchgPtrT(&pThread->pTodoReq, NULL, PRTREQINT);
362 if (pReq)
363 {
364 Assert(RTListIsEmpty(&pThread->IdleNode)); /* Must not be in the idle list. */
365 rtReqPoolThreadProcessRequest(pPool, pThread, pReq);
366 continue;
367 }
368
369 ASMAtomicIncU32(&pPool->cIdleThreads);
370 RTCritSectEnter(&pPool->CritSect);
371
372 /* Update the global statistics. */
373 if (cReqPrevProcessedStat != pThread->cReqProcessed)
374 {
375 pPool->cReqProcessed += pThread->cReqProcessed - cReqPrevProcessedStat;
376 cReqPrevProcessedStat = pThread->cReqProcessed;
377 pPool->cNsTotalReqProcessing += pThread->cNsTotalReqProcessing - cNsPrevTotalReqProcessing;
378 cNsPrevTotalReqProcessing = pThread->cNsTotalReqProcessing;
379 pPool->cNsTotalReqQueued += pThread->cNsTotalReqQueued - cNsPrevTotalReqQueued;
380 cNsPrevTotalReqQueued = pThread->cNsTotalReqQueued;
381 }
382
383 /* Recheck the todo request pointer after entering the critsect. */
384 pReq = ASMAtomicXchgPtrT(&pThread->pTodoReq, NULL, PRTREQINT);
385 if (pReq)
386 {
387 Assert(RTListIsEmpty(&pThread->IdleNode)); /* Must not be in the idle list. */
388 RTCritSectLeave(&pPool->CritSect);
389
390 rtReqPoolThreadProcessRequest(pPool, pThread, pReq);
391 continue;
392 }
393
394 /* Any pending requests in the queue? */
395 pReq = pPool->pPendingRequests;
396 if (pReq)
397 {
398 pPool->pPendingRequests = pReq->pNext;
399 if (pReq->pNext == NULL)
400 pPool->ppPendingRequests = &pPool->pPendingRequests;
401 Assert(pPool->cCurPendingRequests > 0);
402 pPool->cCurPendingRequests--;
403
404 /* Un-idle ourselves and process the request. */
405 if (!RTListIsEmpty(&pThread->IdleNode))
406 {
407 RTListNodeRemove(&pThread->IdleNode);
408 RTListInit(&pThread->IdleNode);
409 ASMAtomicDecU32(&pPool->cIdleThreads);
410 }
411 ASMAtomicDecU32(&pPool->cIdleThreads);
412 RTCritSectLeave(&pPool->CritSect);
413
414 rtReqPoolThreadProcessRequest(pPool, pThread, pReq);
415 continue;
416 }
417
418 /*
419 * Nothing to do, go idle.
420 */
421 if (cReqPrevProcessedIdle != pThread->cReqProcessed)
422 {
423 cReqPrevProcessedIdle = pThread->cReqProcessed;
424 pThread->uIdleNanoTs = RTTimeNanoTS();
425 }
426 else if (pPool->cCurThreads > pPool->cMinThreads)
427 {
428 uint64_t cNsIdle = RTTimeNanoTS() - pThread->uIdleNanoTs;
429 if (cNsIdle >= pPool->cNsMinIdle)
430 return rtReqPoolThreadExit(pPool, pThread, true /*fLocked*/);
431 }
432
433 if (RTListIsEmpty(&pThread->IdleNode))
434 RTListPrepend(&pPool->IdleThreads, &pThread->IdleNode);
435 else
436 ASMAtomicDecU32(&pPool->cIdleThreads);
437 RTThreadUserReset(hThreadSelf);
438 uint32_t const cMsSleep = pPool->cMsIdleSleep;
439
440 RTCritSectLeave(&pPool->CritSect);
441
442 RTThreadUserWait(hThreadSelf, cMsSleep);
443 }
444
445 return rtReqPoolThreadExit(pPool, pThread, false /*fLocked*/);
446}
447
448
449/**
450 * Create a new worker thread.
451 *
452 * @param pPool The pool needing new worker thread.
453 * @remarks Caller owns the critical section
454 */
455static void rtReqPoolCreateNewWorker(RTREQPOOL pPool)
456{
457 PRTREQPOOLTHREAD pThread = (PRTREQPOOLTHREAD)RTMemAllocZ(sizeof(RTREQPOOLTHREAD));
458 if (!pThread)
459 return;
460
461 pThread->uBirthNanoTs = RTTimeNanoTS();
462 pThread->pPool = pPool;
463 pThread->idLastCpu = NIL_RTCPUID;
464 pThread->hThread = NIL_RTTHREAD;
465 RTListInit(&pThread->IdleNode);
466 RTListAppend(&pPool->WorkerThreads, &pThread->ListNode);
467 pPool->cCurThreads++;
468 pPool->cThreadsCreated++;
469
470 int rc = RTThreadCreateF(&pThread->hThread, rtReqPoolThreadProc, pThread, 0 /*default stack size*/,
471 pPool->enmThreadType, pPool->fThreadFlags, "%s%02u", pPool->szName, pPool->cThreadsCreated);
472 if (RT_SUCCESS(rc))
473 pPool->uLastThreadCreateNanoTs = pThread->uBirthNanoTs;
474 else
475 {
476 pPool->cCurThreads--;
477 RTListNodeRemove(&pThread->ListNode);
478 RTMemFree(pThread);
479 }
480}
481
482
483/**
484 * Repel the submitter, giving the worker threads a chance to process the
485 * incoming request.
486 *
487 * @returns Success if a worker picked up the request, failure if not. The
488 * critical section has been left on success, while we'll be inside it
489 * on failure.
490 * @param pPool The pool.
491 * @param pReq The incoming request.
492 */
493static int rtReqPoolPushBack(PRTREQPOOLINT pPool, PRTREQINT pReq)
494{
495 /*
496 * Lazily create the push back semaphore that we'll be blociing on.
497 */
498 int rc;
499 RTSEMEVENTMULTI hEvt = pReq->hPushBackEvt;
500 if (hEvt == NIL_RTSEMEVENTMULTI)
501 {
502 rc = RTSemEventMultiCreate(&hEvt);
503 if (RT_FAILURE(rc))
504 return rc;
505 pReq->hPushBackEvt = hEvt;
506 }
507
508 /*
509 * Prepare the request and semaphore.
510 */
511 uint32_t const cMsTimeout = pPool->cMsCurPushBack;
512 pReq->fSignalPushBack = true;
513 RTReqRetain(pReq);
514 RTSemEventMultiReset(hEvt);
515
516 RTCritSectLeave(&pPool->CritSect);
517
518 /*
519 * Block.
520 */
521 rc = RTSemEventMultiWait(hEvt, cMsTimeout);
522 if (RT_FAILURE(rc))
523 {
524 AssertMsg(rc == VERR_TIMEOUT, ("%Rrc\n", rc));
525 RTCritSectEnter(&pPool->CritSect);
526 }
527 RTReqRelease(pReq);
528 return rc;
529}
530
531
532
533DECLHIDDEN(void) rtReqPoolSubmit(PRTREQPOOLINT pPool, PRTREQINT pReq)
534{
535 RTCritSectEnter(&pPool->CritSect);
536
537 pPool->cReqSubmitted++;
538
539 /*
540 * Try schedule the request to a thread that's currently idle.
541 */
542 PRTREQPOOLTHREAD pThread = RTListGetFirst(&pPool->IdleThreads, RTREQPOOLTHREAD, IdleNode);
543 if (pThread)
544 {
545 /** @todo CPU affinity??? */
546 ASMAtomicWritePtr(&pThread->pTodoReq, pReq);
547
548 RTListNodeRemove(&pThread->IdleNode);
549 RTListInit(&pThread->IdleNode);
550 ASMAtomicDecU32(&pPool->cIdleThreads);
551
552 RTThreadUserSignal(pThread->hThread);
553
554 RTCritSectLeave(&pPool->CritSect);
555 return;
556 }
557 Assert(RTListIsEmpty(&pPool->IdleThreads));
558
559 /*
560 * Put the request in the pending queue.
561 */
562 pReq->pNext = NULL;
563 *pPool->ppPendingRequests = pReq;
564 pPool->ppPendingRequests = (PRTREQINT *)&pReq->pNext;
565 pPool->cCurPendingRequests++;
566
567 /*
568 * If there is an incoming worker thread already or we've reached the
569 * maximum number of worker threads, we're done.
570 */
571 if ( pPool->cIdleThreads > 0
572 || pPool->cCurThreads >= pPool->cMaxThreads)
573 {
574 RTCritSectLeave(&pPool->CritSect);
575 return;
576 }
577
578 /*
579 * Push back before creating a new worker thread.
580 */
581 if ( pPool->cCurThreads > pPool->cThreadsPushBackThreshold
582 && (RTTimeNanoTS() - pReq->uSubmitNanoTs) / RT_NS_1MS >= pPool->cMsCurPushBack )
583 {
584 int rc = rtReqPoolPushBack(pPool, pReq);
585 if (RT_SUCCESS(rc))
586 return;
587 }
588
589 /*
590 * Create a new thread for processing the request.
591 * For simplicity, we don't bother leaving the critical section while doing so.
592 */
593 rtReqPoolCreateNewWorker(pPool);
594
595 RTCritSectLeave(&pPool->CritSect);
596 return;
597}
598
599
600/**
601 * Worker for RTReqCancel that looks for the request in the pending list and
602 * completes it if found there.
603 *
604 * @param pPool The request thread pool.
605 * @param pReq The request.
606 */
607DECLHIDDEN(void) rtReqPoolCancel(PRTREQPOOLINT pPool, PRTREQINT pReq)
608{
609 RTCritSectEnter(&pPool->CritSect);
610
611 pPool->cReqCancelled++;
612
613 /*
614 * Check if the request is in the pending list.
615 */
616 PRTREQINT pPrev = NULL;
617 PRTREQINT pCur = pPool->pPendingRequests;
618 while (pCur)
619 if (pCur != pReq)
620 {
621 pPrev = pCur;
622 pCur = pCur->pNext;
623 }
624 else
625 {
626 /*
627 * Unlink it and process it.
628 */
629 if (!pPrev)
630 {
631 pPool->pPendingRequests = pReq->pNext;
632 if (!pReq->pNext)
633 pPool->ppPendingRequests = &pPool->pPendingRequests;
634 }
635 else
636 {
637 pPrev->pNext = pReq->pNext;
638 if (!pReq->pNext)
639 pPool->ppPendingRequests = (PRTREQINT *)&pPrev->pNext;
640 }
641 Assert(pPool->cCurPendingRequests > 0);
642 pPool->cCurPendingRequests--;
643
644 rtReqProcessOne(pReq);
645 break;
646 }
647
648 RTCritSectLeave(&pPool->CritSect);
649 return;
650}
651
652
653/**
654 * Frees a requst.
655 *
656 * @returns true if recycled, false if not.
657 * @param pPool The request thread pool.
658 * @param pReq The request.
659 */
660DECLHIDDEN(bool) rtReqPoolRecycle(PRTREQPOOLINT pPool, PRTREQINT pReq)
661{
662 if ( pPool
663 && ASMAtomicReadU32(&pPool->cCurFreeRequests) < pPool->cMaxFreeRequests)
664 {
665 RTCritSectEnter(&pPool->CritSect);
666 if (pPool->cCurFreeRequests < pPool->cMaxFreeRequests)
667 {
668 pReq->pNext = pPool->pFreeRequests;
669 pPool->pFreeRequests = pReq;
670 ASMAtomicIncU32(&pPool->cCurFreeRequests);
671
672 RTCritSectLeave(&pPool->CritSect);
673 return true;
674 }
675
676 RTCritSectLeave(&pPool->CritSect);
677 }
678 return false;
679}
680
681
682RTDECL(int) RTReqPoolCreate(uint32_t cMaxThreads, RTMSINTERVAL cMsMinIdle,
683 uint32_t cThreadsPushBackThreshold, uint32_t cMsMaxPushBack,
684 const char *pszName, PRTREQPOOL phPool)
685{
686 /*
687 * Validate and massage the config.
688 */
689 if (cMaxThreads == UINT32_MAX)
690 cMaxThreads = RTREQPOOL_MAX_THREADS;
691 AssertMsgReturn(cMaxThreads > 0 && cMaxThreads <= RTREQPOOL_MAX_THREADS, ("%u\n", cMaxThreads), VERR_OUT_OF_RANGE);
692 uint32_t const cMinThreads = cMaxThreads > 2 ? 2 : cMaxThreads - 1;
693
694 if (cThreadsPushBackThreshold == 0)
695 cThreadsPushBackThreshold = cMinThreads;
696 else if (cThreadsPushBackThreshold == UINT32_MAX)
697 cThreadsPushBackThreshold = cMaxThreads;
698 AssertMsgReturn(cThreadsPushBackThreshold <= cMaxThreads, ("%u/%u\n", cThreadsPushBackThreshold, cMaxThreads), VERR_OUT_OF_RANGE);
699
700 if (cMsMaxPushBack == UINT32_MAX)
701 cMsMaxPushBack = RTREQPOOL_PUSH_BACK_MAX_MS;
702 AssertMsgReturn(cMsMaxPushBack <= RTREQPOOL_PUSH_BACK_MAX_MS, ("%llu\n", cMsMaxPushBack), VERR_OUT_OF_RANGE);
703 uint32_t const cMsMinPushBack = cMsMaxPushBack >= 200 ? 100 : cMsMaxPushBack / 2;
704
705 AssertPtrReturn(pszName, VERR_INVALID_POINTER);
706 size_t cchName = strlen(pszName);
707 AssertReturn(cchName > 0, VERR_INVALID_PARAMETER);
708 Assert(cchName <= 10);
709
710 AssertPtrReturn(phPool, VERR_INVALID_POINTER);
711
712 /*
713 * Create and initialize the pool.
714 */
715 PRTREQPOOLINT pPool = (PRTREQPOOLINT)RTMemAlloc(sizeof(*pPool));
716 if (!pPool)
717 return VERR_NO_MEMORY;
718
719 pPool->u32Magic = RTREQPOOL_MAGIC;
720 RTStrCopy(pPool->szName, sizeof(pPool->szName), pszName);
721
722 pPool->enmThreadType = RTTHREADTYPE_DEFAULT;
723 pPool->fThreadFlags = 0;
724 pPool->cMaxThreads = cMaxThreads;
725 pPool->cMinThreads = cMinThreads;
726 pPool->cMsMinIdle = cMsMinIdle == RT_INDEFINITE_WAIT || cMsMinIdle >= UINT32_MAX ? UINT32_MAX : cMsMinIdle;
727 pPool->cNsMinIdle = pPool->cMsMinIdle == UINT32_MAX ? UINT64_MAX : cMsMinIdle * RT_NS_1MS_64;
728 pPool->cMsIdleSleep = pPool->cMsMinIdle == UINT32_MAX ? RT_INDEFINITE_WAIT : RT_MAX(RT_MS_1SEC, pPool->cMsMinIdle);
729 pPool->cThreadsPushBackThreshold = cThreadsPushBackThreshold;
730 pPool->cMsMaxPushBack = cMsMaxPushBack;
731 pPool->cMsMinPushBack = cMsMinPushBack;
732 pPool->cMaxFreeRequests = cMaxThreads * 2;
733 pPool->hThreadTermEvt = NIL_RTSEMEVENTMULTI;
734 pPool->fDestructing = false;
735 pPool->cMsCurPushBack = 0;
736 pPool->cCurThreads = 0;
737 pPool->cThreadsCreated = 0;
738 pPool->uLastThreadCreateNanoTs = 0;
739 RTListInit(&pPool->WorkerThreads);
740 pPool->cReqProcessed = 0;
741 pPool->cNsTotalReqProcessing= 0;
742 pPool->cNsTotalReqQueued = 0;
743 pPool->cRefs = 1;
744 pPool->cIdleThreads = 0;
745 RTListInit(&pPool->IdleThreads);
746 pPool->pPendingRequests = NULL;
747 pPool->ppPendingRequests = &pPool->pPendingRequests;
748 pPool->cCurPendingRequests = 0;
749 pPool->cCurActiveRequests = 0;
750 pPool->cReqSubmitted = 0;
751 pPool->cReqCancelled = 0;
752 pPool->pFreeRequests = NULL;
753 pPool->cCurFreeRequests = 0;
754
755 int rc = RTSemEventMultiCreate(&pPool->hThreadTermEvt);
756 if (RT_SUCCESS(rc))
757 {
758 rc = RTCritSectInit(&pPool->CritSect);
759 if (RT_SUCCESS(rc))
760 {
761 *phPool = pPool;
762 return VINF_SUCCESS;
763 }
764
765 RTSemEventMultiDestroy(pPool->hThreadTermEvt);
766 }
767 pPool->u32Magic = RTREQPOOL_MAGIC_DEAD;
768 RTMemFree(pPool);
769 return rc;
770}
771
772
773
774RTDECL(int) RTReqPoolSetCfgVar(RTREQPOOL hPool, RTREQPOOLCFGVAR enmVar, uint64_t uValue)
775{
776 PRTREQPOOLINT pPool = hPool;
777 AssertPtrReturn(pPool, VERR_INVALID_HANDLE);
778 AssertReturn(pPool->u32Magic == RTREQPOOL_MAGIC, VERR_INVALID_HANDLE);
779 AssertReturn(enmVar > RTREQPOOLCFGVAR_INVALID && enmVar < RTREQPOOLCFGVAR_END, VERR_INVALID_PARAMETER);
780
781 RTCritSectEnter(&pPool->CritSect);
782
783 bool fWakeUpIdleThreads = false;
784 int rc = VINF_SUCCESS;
785 switch (enmVar)
786 {
787 case RTREQPOOLCFGVAR_THREAD_TYPE:
788 AssertMsgBreakStmt(uValue > (uint64_t)RTTHREADTYPE_INVALID && uValue < (uint64_t)RTTHREADTYPE_END,
789 ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
790
791 pPool->enmThreadType = (RTTHREADTYPE)uValue;
792 break;
793
794 case RTREQPOOLCFGVAR_THREAD_FLAGS:
795 AssertMsgBreakStmt(!(uValue & ~(uint64_t)RTTHREADFLAGS_MASK) && !(uValue & RTTHREADFLAGS_WAITABLE),
796 ("%#llx\n", uValue), rc = VERR_INVALID_FLAGS);
797
798 pPool->fThreadFlags = (uint32_t)uValue;
799 break;
800
801 case RTREQPOOLCFGVAR_MIN_THREADS:
802 AssertMsgBreakStmt(uValue <= RTREQPOOL_MAX_THREADS, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
803 fWakeUpIdleThreads = pPool->cMinThreads > (uint32_t)uValue;
804 pPool->cMinThreads = (uint32_t)uValue;
805 if (pPool->cMinThreads > pPool->cMaxThreads)
806 pPool->cMaxThreads = pPool->cMinThreads;
807 if ( pPool->cThreadsPushBackThreshold < pPool->cMinThreads
808 || pPool->cThreadsPushBackThreshold > pPool->cMaxThreads)
809 pPool->cThreadsPushBackThreshold = pPool->cMinThreads + (pPool->cMaxThreads - pPool->cMinThreads) / 2;
810 rtReqPoolRecalcPushBack(pPool);
811 break;
812
813 case RTREQPOOLCFGVAR_MAX_THREADS:
814 AssertMsgBreakStmt(uValue <= RTREQPOOL_MAX_THREADS && uValue >= 1, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
815 pPool->cMaxThreads = (uint32_t)uValue;
816 if (pPool->cMaxThreads < pPool->cMinThreads)
817 {
818 pPool->cMinThreads = pPool->cMaxThreads;
819 fWakeUpIdleThreads = true;
820 }
821 if (pPool->cMaxThreads < pPool->cThreadsPushBackThreshold)
822 pPool->cThreadsPushBackThreshold = pPool->cMinThreads + (pPool->cMaxThreads - pPool->cMinThreads) / 2;
823 rtReqPoolRecalcPushBack(pPool);
824 break;
825
826 case RTREQPOOLCFGVAR_MS_MIN_IDLE:
827 AssertMsgBreakStmt(uValue < UINT32_MAX || uValue == RT_INDEFINITE_WAIT, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
828 if (uValue < UINT32_MAX && uValue != RT_INDEFINITE_WAIT)
829 {
830 fWakeUpIdleThreads = pPool->cMsMinIdle != (uint32_t)uValue;
831 pPool->cMsMinIdle = (uint32_t)uValue;
832 pPool->cNsMinIdle = pPool->cMsMinIdle * RT_NS_1MS_64;
833 if (pPool->cMsIdleSleep > pPool->cMsMinIdle)
834 pPool->cMsIdleSleep = RT_MAX(RT_MS_1SEC, pPool->cMsMinIdle);
835 }
836 else
837 {
838 pPool->cMsMinIdle = UINT32_MAX;
839 pPool->cNsMinIdle = UINT64_MAX;
840 pPool->cMsIdleSleep = RT_INDEFINITE_WAIT;
841 }
842 break;
843
844 case RTREQPOOLCFGVAR_MS_IDLE_SLEEP:
845 AssertMsgBreakStmt(uValue <= RT_INDEFINITE_WAIT, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
846 fWakeUpIdleThreads = pPool->cMsMinIdle > (RTMSINTERVAL)uValue;
847 pPool->cMsIdleSleep = (RTMSINTERVAL)uValue;
848 if (pPool->cMsIdleSleep == RT_INDEFINITE_WAIT)
849 {
850 pPool->cMsMinIdle = UINT32_MAX;
851 pPool->cNsMinIdle = UINT64_MAX;
852 }
853 break;
854
855 case RTREQPOOLCFGVAR_PUSH_BACK_THRESHOLD:
856 if (uValue == UINT64_MAX)
857 pPool->cThreadsPushBackThreshold = pPool->cMaxThreads;
858 else if (uValue == 0)
859 pPool->cThreadsPushBackThreshold = pPool->cMinThreads;
860 else
861 {
862 AssertMsgBreakStmt(uValue <= pPool->cMaxThreads, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
863 AssertMsgBreakStmt(uValue >= pPool->cMinThreads, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
864 pPool->cThreadsPushBackThreshold = (uint32_t)uValue;
865 }
866 break;
867
868 case RTREQPOOLCFGVAR_PUSH_BACK_MIN_MS:
869 if (uValue == UINT32_MAX || uValue == UINT64_MAX)
870 uValue = RTREQPOOL_PUSH_BACK_MAX_MS;
871 else
872 AssertMsgBreakStmt(uValue <= RTREQPOOL_PUSH_BACK_MAX_MS, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
873 pPool->cMsMinPushBack = (uint32_t)uValue;
874 if (pPool->cMsMaxPushBack < pPool->cMsMinPushBack)
875 pPool->cMsMaxPushBack = pPool->cMsMinPushBack;
876 rtReqPoolRecalcPushBack(pPool);
877 break;
878
879 case RTREQPOOLCFGVAR_PUSH_BACK_MAX_MS:
880 if (uValue == UINT32_MAX || uValue == UINT64_MAX)
881 uValue = RTREQPOOL_PUSH_BACK_MAX_MS;
882 else
883 AssertMsgBreakStmt(uValue <= RTREQPOOL_PUSH_BACK_MAX_MS, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
884 pPool->cMsMaxPushBack = (uint32_t)uValue;
885 if (pPool->cMsMinPushBack < pPool->cMsMaxPushBack)
886 pPool->cMsMinPushBack = pPool->cMsMaxPushBack;
887 rtReqPoolRecalcPushBack(pPool);
888 break;
889
890 case RTREQPOOLCFGVAR_MAX_FREE_REQUESTS:
891 if (uValue == UINT64_MAX)
892 {
893 pPool->cMaxFreeRequests = pPool->cMaxThreads * 2;
894 if (pPool->cMaxFreeRequests < 16)
895 pPool->cMaxFreeRequests = 16;
896 }
897 else
898 {
899 AssertMsgBreakStmt(uValue <= RTREQPOOL_MAX_FREE_REQUESTS, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
900 pPool->cMaxFreeRequests = (uint32_t)uValue;
901 }
902
903 while (pPool->cCurFreeRequests > pPool->cMaxFreeRequests)
904 {
905 PRTREQINT pReq = pPool->pFreeRequests;
906 pPool->pFreeRequests = pReq->pNext;
907 ASMAtomicDecU32(&pPool->cCurFreeRequests);
908 rtReqFreeIt(pReq);
909 }
910 break;
911
912 default:
913 AssertFailed();
914 rc = VERR_IPE_NOT_REACHED_DEFAULT_CASE;
915 }
916
917 /* Wake up all idle threads if required. */
918 if (fWakeUpIdleThreads)
919 {
920 Assert(rc == VINF_SUCCESS);
921 PRTREQPOOLTHREAD pThread;
922 RTListForEach(&pPool->WorkerThreads, pThread, RTREQPOOLTHREAD, ListNode)
923 {
924 RTThreadUserSignal(pThread->hThread);
925 }
926 }
927
928 RTCritSectLeave(&pPool->CritSect);
929
930 return rc;
931}
932RT_EXPORT_SYMBOL(RTReqPoolSetCfgVar);
933
934
935RTDECL(uint64_t) RTReqPoolGetCfgVar(RTREQPOOL hPool, RTREQPOOLCFGVAR enmVar)
936{
937 PRTREQPOOLINT pPool = hPool;
938 AssertPtrReturn(pPool, UINT64_MAX);
939 AssertReturn(pPool->u32Magic == RTREQPOOL_MAGIC, UINT64_MAX);
940 AssertReturn(enmVar > RTREQPOOLCFGVAR_INVALID && enmVar < RTREQPOOLCFGVAR_END, UINT64_MAX);
941
942 RTCritSectEnter(&pPool->CritSect);
943
944 uint64_t u64;
945 switch (enmVar)
946 {
947 case RTREQPOOLCFGVAR_THREAD_TYPE:
948 u64 = pPool->enmThreadType;
949 break;
950
951 case RTREQPOOLCFGVAR_THREAD_FLAGS:
952 u64 = pPool->fThreadFlags;
953 break;
954
955 case RTREQPOOLCFGVAR_MIN_THREADS:
956 u64 = pPool->cMinThreads;
957 break;
958
959 case RTREQPOOLCFGVAR_MAX_THREADS:
960 u64 = pPool->cMaxThreads;
961 break;
962
963 case RTREQPOOLCFGVAR_MS_MIN_IDLE:
964 u64 = pPool->cMsMinIdle;
965 break;
966
967 case RTREQPOOLCFGVAR_MS_IDLE_SLEEP:
968 u64 = pPool->cMsIdleSleep;
969 break;
970
971 case RTREQPOOLCFGVAR_PUSH_BACK_THRESHOLD:
972 u64 = pPool->cThreadsPushBackThreshold;
973 break;
974
975 case RTREQPOOLCFGVAR_PUSH_BACK_MIN_MS:
976 u64 = pPool->cMsMinPushBack;
977 break;
978
979 case RTREQPOOLCFGVAR_PUSH_BACK_MAX_MS:
980 u64 = pPool->cMsMaxPushBack;
981 break;
982
983 case RTREQPOOLCFGVAR_MAX_FREE_REQUESTS:
984 u64 = pPool->cMaxFreeRequests;
985 break;
986
987 default:
988 AssertFailed();
989 u64 = UINT64_MAX;
990 break;
991 }
992
993 RTCritSectLeave(&pPool->CritSect);
994
995 return u64;
996}
997RT_EXPORT_SYMBOL(RTReqGetQueryCfgVar);
998
999
1000RTDECL(uint64_t) RTReqPoolGetStat(RTREQPOOL hPool, RTREQPOOLSTAT enmStat)
1001{
1002 PRTREQPOOLINT pPool = hPool;
1003 AssertPtrReturn(pPool, UINT64_MAX);
1004 AssertReturn(pPool->u32Magic == RTREQPOOL_MAGIC, UINT64_MAX);
1005 AssertReturn(enmStat > RTREQPOOLSTAT_INVALID && enmStat < RTREQPOOLSTAT_END, UINT64_MAX);
1006
1007 RTCritSectEnter(&pPool->CritSect);
1008
1009 uint64_t u64;
1010 switch (enmStat)
1011 {
1012 case RTREQPOOLSTAT_THREADS: u64 = pPool->cCurThreads; break;
1013 case RTREQPOOLSTAT_THREADS_CREATED: u64 = pPool->cThreadsCreated; break;
1014 case RTREQPOOLSTAT_REQUESTS_PROCESSED: u64 = pPool->cReqProcessed; break;
1015 case RTREQPOOLSTAT_REQUESTS_SUBMITTED: u64 = pPool->cReqSubmitted; break;
1016 case RTREQPOOLSTAT_REQUESTS_CANCELLED: u64 = pPool->cReqCancelled; break;
1017 case RTREQPOOLSTAT_REQUESTS_PENDING: u64 = pPool->cCurPendingRequests; break;
1018 case RTREQPOOLSTAT_REQUESTS_ACTIVE: u64 = pPool->cCurActiveRequests; break;
1019 case RTREQPOOLSTAT_REQUESTS_FREE: u64 = pPool->cCurFreeRequests; break;
1020 case RTREQPOOLSTAT_NS_TOTAL_REQ_PROCESSING: u64 = pPool->cNsTotalReqProcessing; break;
1021 case RTREQPOOLSTAT_NS_TOTAL_REQ_QUEUED: u64 = pPool->cNsTotalReqQueued; break;
1022 case RTREQPOOLSTAT_NS_AVERAGE_REQ_PROCESSING: u64 = pPool->cNsTotalReqProcessing / RT_MAX(pPool->cReqProcessed, 1); break;
1023 case RTREQPOOLSTAT_NS_AVERAGE_REQ_QUEUED: u64 = pPool->cNsTotalReqQueued / RT_MAX(pPool->cReqProcessed, 1); break;
1024 default:
1025 AssertFailed();
1026 u64 = UINT64_MAX;
1027 break;
1028 }
1029
1030 RTCritSectLeave(&pPool->CritSect);
1031
1032 return u64;
1033}
1034RT_EXPORT_SYMBOL(RTReqPoolGetStat);
1035
1036
1037RTDECL(uint32_t) RTReqPoolRetain(RTREQPOOL hPool)
1038{
1039 PRTREQPOOLINT pPool = hPool;
1040 AssertPtrReturn(pPool, UINT32_MAX);
1041 AssertReturn(pPool->u32Magic == RTREQPOOL_MAGIC, UINT32_MAX);
1042
1043 return ASMAtomicIncU32(&pPool->cRefs);
1044}
1045RT_EXPORT_SYMBOL(RTReqPoolRetain);
1046
1047
1048RTDECL(uint32_t) RTReqPoolRelease(RTREQPOOL hPool)
1049{
1050 /*
1051 * Ignore NULL and validate the request.
1052 */
1053 if (!hPool)
1054 return 0;
1055 PRTREQPOOLINT pPool = hPool;
1056 AssertPtrReturn(pPool, UINT32_MAX);
1057 AssertReturn(pPool->u32Magic == RTREQPOOL_MAGIC, UINT32_MAX);
1058
1059 /*
1060 * Drop a reference, free it when it reaches zero.
1061 */
1062 uint32_t cRefs = ASMAtomicDecU32(&pPool->cRefs);
1063 if (cRefs == 0)
1064 {
1065 AssertReturn(ASMAtomicCmpXchgU32(&pPool->u32Magic, RTREQPOOL_MAGIC_DEAD, RTREQPOOL_MAGIC), UINT32_MAX);
1066
1067 RTCritSectEnter(&pPool->CritSect);
1068#ifdef RT_STRICT
1069 RTTHREAD const hSelf = RTThreadSelf();
1070#endif
1071
1072 /* Indicate to the worker threads that we're shutting down. */
1073 ASMAtomicWriteBool(&pPool->fDestructing, true);
1074 PRTREQPOOLTHREAD pThread;
1075 RTListForEach(&pPool->WorkerThreads, pThread, RTREQPOOLTHREAD, ListNode)
1076 {
1077 Assert(pThread->hThread != hSelf);
1078 RTThreadUserSignal(pThread->hThread);
1079 }
1080
1081 /* Cancel pending requests. */
1082 Assert(!pPool->pPendingRequests);
1083 while (pPool->pPendingRequests)
1084 {
1085 PRTREQINT pReq = pPool->pPendingRequests;
1086 pPool->pPendingRequests = pReq->pNext;
1087 rtReqPoolCancelReq(pReq);
1088 }
1089 pPool->ppPendingRequests = NULL;
1090 pPool->cCurPendingRequests = 0;
1091
1092 /* Wait for the workers to shut down. */
1093 while (!RTListIsEmpty(&pPool->WorkerThreads))
1094 {
1095 RTCritSectLeave(&pPool->CritSect);
1096 RTSemEventMultiWait(pPool->hThreadTermEvt, RT_MS_1MIN);
1097 RTCritSectEnter(&pPool->CritSect);
1098 /** @todo should we wait forever here? */
1099 }
1100
1101 /* Free recycled requests. */
1102 for (;;)
1103 {
1104 PRTREQINT pReq = pPool->pFreeRequests;
1105 if (!pReq)
1106 break;
1107 pPool->pFreeRequests = pReq->pNext;
1108 pPool->cCurFreeRequests--;
1109 rtReqFreeIt(pReq);
1110 }
1111
1112 /* Finally, free the critical section and pool instance. */
1113 RTSemEventMultiDestroy(pPool->hThreadTermEvt);
1114 RTCritSectLeave(&pPool->CritSect);
1115 RTCritSectDelete(&pPool->CritSect);
1116 RTMemFree(pPool);
1117 }
1118
1119 return cRefs;
1120}
1121RT_EXPORT_SYMBOL(RTReqPoolRelease);
1122
1123
1124RTDECL(int) RTReqPoolAlloc(RTREQPOOL hPool, RTREQTYPE enmType, PRTREQ *phReq)
1125{
1126 PRTREQPOOLINT pPool = hPool;
1127 AssertPtrReturn(pPool, VERR_INVALID_HANDLE);
1128 AssertReturn(pPool->u32Magic == RTREQPOOL_MAGIC, VERR_INVALID_HANDLE);
1129
1130 /*
1131 * Try recycle old requests.
1132 */
1133 if (ASMAtomicReadU32(&pPool->cCurFreeRequests) > 0)
1134 {
1135 RTCritSectEnter(&pPool->CritSect);
1136 PRTREQINT pReq = pPool->pFreeRequests;
1137 if (pReq)
1138 {
1139 ASMAtomicDecU32(&pPool->cCurFreeRequests);
1140 pPool->pFreeRequests = pReq->pNext;
1141
1142 RTCritSectLeave(&pPool->CritSect);
1143
1144 Assert(pReq->fPoolOrQueue);
1145 Assert(pReq->uOwner.hPool == pPool);
1146
1147 int rc = rtReqReInit(pReq, enmType);
1148 if (RT_SUCCESS(rc))
1149 {
1150 *phReq = pReq;
1151 LogFlow(("RTReqPoolAlloc: returns VINF_SUCCESS *phReq=%p recycled\n", pReq));
1152 return rc;
1153 }
1154 }
1155 else
1156 RTCritSectLeave(&pPool->CritSect);
1157 }
1158
1159 /*
1160 * Allocate a new request.
1161 */
1162 int rc = rtReqAlloc(enmType, true /*fPoolOrQueue*/, pPool, phReq);
1163 LogFlow(("RTReqPoolAlloc: returns %Rrc *phReq=%p\n", rc, *phReq));
1164 return rc;
1165}
1166RT_EXPORT_SYMBOL(RTReqPoolAlloc);
1167
1168
1169RTDECL(int) RTReqPoolCallEx( RTREQPOOL hPool, RTMSINTERVAL cMillies, PRTREQ *phReq, uint32_t fFlags, PFNRT pfnFunction, unsigned cArgs, ...)
1170{
1171 va_list va;
1172 va_start(va, cArgs);
1173 int rc = RTReqPoolCallExV(hPool, cMillies, phReq, fFlags, pfnFunction, cArgs, va);
1174 va_end(va);
1175 return rc;
1176}
1177RT_EXPORT_SYMBOL(RTReqPoolCallEx);
1178
1179
1180RTDECL(int) RTReqPoolCallExV(RTREQPOOL hPool, RTMSINTERVAL cMillies, PRTREQ *phReq, uint32_t fFlags, PFNRT pfnFunction, unsigned cArgs, va_list va)
1181{
1182 /*
1183 * Check input.
1184 */
1185 AssertPtrReturn(pfnFunction, VERR_INVALID_POINTER);
1186 AssertMsgReturn(!((uint32_t)fFlags & ~(uint32_t)(RTREQFLAGS_NO_WAIT | RTREQFLAGS_RETURN_MASK)), ("%#x\n", (uint32_t)fFlags), VERR_INVALID_PARAMETER);
1187 if (!(fFlags & RTREQFLAGS_NO_WAIT) || phReq)
1188 {
1189 AssertPtrReturn(phReq, VERR_INVALID_POINTER);
1190 *phReq = NIL_RTREQ;
1191 }
1192
1193 PRTREQINT pReq = NULL;
1194 AssertMsgReturn(cArgs * sizeof(uintptr_t) <= sizeof(pReq->u.Internal.aArgs), ("cArgs=%u\n", cArgs), VERR_TOO_MUCH_DATA);
1195
1196 /*
1197 * Allocate and initialize the request.
1198 */
1199 int rc = RTReqPoolAlloc(hPool, RTREQTYPE_INTERNAL, &pReq);
1200 if (RT_FAILURE(rc))
1201 return rc;
1202 pReq->fFlags = fFlags;
1203 pReq->u.Internal.pfn = pfnFunction;
1204 pReq->u.Internal.cArgs = cArgs;
1205 for (unsigned iArg = 0; iArg < cArgs; iArg++)
1206 pReq->u.Internal.aArgs[iArg] = va_arg(va, uintptr_t);
1207
1208 /*
1209 * Submit the request.
1210 */
1211 rc = RTReqSubmit(pReq, cMillies);
1212 if ( rc != VINF_SUCCESS
1213 && rc != VERR_TIMEOUT)
1214 {
1215 Assert(rc != VERR_INTERRUPTED);
1216 RTReqRelease(pReq);
1217 pReq = NULL;
1218 }
1219
1220 if (phReq)
1221 {
1222 *phReq = pReq;
1223 LogFlow(("RTReqPoolCallExV: returns %Rrc *phReq=%p\n", rc, pReq));
1224 }
1225 else
1226 {
1227 RTReqRelease(pReq);
1228 LogFlow(("RTReqPoolCallExV: returns %Rrc\n", rc));
1229 }
1230 return rc;
1231}
1232RT_EXPORT_SYMBOL(RTReqPoolCallExV);
1233
1234
1235RTDECL(int) RTReqPoolCallWait(RTREQPOOL hPool, PFNRT pfnFunction, unsigned cArgs, ...)
1236{
1237 PRTREQINT pReq;
1238 va_list va;
1239 va_start(va, cArgs);
1240 int rc = RTReqPoolCallExV(hPool, RT_INDEFINITE_WAIT, &pReq, RTREQFLAGS_IPRT_STATUS,
1241 pfnFunction, cArgs, va);
1242 va_end(va);
1243 if (RT_SUCCESS(rc))
1244 rc = pReq->iStatusX;
1245 RTReqRelease(pReq);
1246 return rc;
1247}
1248RT_EXPORT_SYMBOL(RTReqPoolCallWait);
1249
1250
1251RTDECL(int) RTReqPoolCallNoWait(RTREQPOOL hPool, PFNRT pfnFunction, unsigned cArgs, ...)
1252{
1253 va_list va;
1254 va_start(va, cArgs);
1255 int rc = RTReqPoolCallExV(hPool, 0, NULL, RTREQFLAGS_IPRT_STATUS | RTREQFLAGS_NO_WAIT,
1256 pfnFunction, cArgs, va);
1257 va_end(va);
1258 return rc;
1259}
1260RT_EXPORT_SYMBOL(RTReqPoolCallNoWait);
1261
1262
1263RTDECL(int) RTReqPoolCallVoidWait(RTREQPOOL hPool, PFNRT pfnFunction, unsigned cArgs, ...)
1264{
1265 PRTREQINT pReq;
1266 va_list va;
1267 va_start(va, cArgs);
1268 int rc = RTReqPoolCallExV(hPool, RT_INDEFINITE_WAIT, &pReq, RTREQFLAGS_VOID,
1269 pfnFunction, cArgs, va);
1270 va_end(va);
1271 if (RT_SUCCESS(rc))
1272 rc = pReq->iStatusX;
1273 RTReqRelease(pReq);
1274 return rc;
1275}
1276RT_EXPORT_SYMBOL(RTReqPoolCallVoidWait);
1277
1278
1279RTDECL(int) RTReqPoolCallVoidNoWait(RTREQPOOL hPool, PFNRT pfnFunction, unsigned cArgs, ...)
1280{
1281 va_list va;
1282 va_start(va, cArgs);
1283 int rc = RTReqPoolCallExV(hPool, 0, NULL, RTREQFLAGS_VOID | RTREQFLAGS_NO_WAIT,
1284 pfnFunction, cArgs, va);
1285 va_end(va);
1286 return rc;
1287}
1288RT_EXPORT_SYMBOL(RTReqPoolCallVoidNoWait);
1289
Note: See TracBrowser for help on using the repository browser.

© 2024 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette