VirtualBox

source: vbox/trunk/src/VBox/Runtime/common/misc/reqpool.cpp@ 78052

Last change on this file since 78052 was 77257, checked in by vboxsync, 6 years ago

Runtime/RTReqPool: Memory leak fixes during destruction

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
File size: 41.2 KB
Line 
1/* $Id: reqpool.cpp 77257 2019-02-11 12:50:00Z vboxsync $ */
2/** @file
3 * IPRT - Request Pool.
4 */
5
6/*
7 * Copyright (C) 2006-2019 Oracle Corporation
8 *
9 * This file is part of VirtualBox Open Source Edition (OSE), as
10 * available from http://www.virtualbox.org. This file is free software;
11 * you can redistribute it and/or modify it under the terms of the GNU
12 * General Public License (GPL) as published by the Free Software
13 * Foundation, in version 2 as it comes in the "COPYING" file of the
14 * VirtualBox OSE distribution. VirtualBox OSE is distributed in the
15 * hope that it will be useful, but WITHOUT ANY WARRANTY of any kind.
16 *
17 * The contents of this file may alternatively be used under the terms
18 * of the Common Development and Distribution License Version 1.0
19 * (CDDL) only, as it comes in the "COPYING.CDDL" file of the
20 * VirtualBox OSE distribution, in which case the provisions of the
21 * CDDL are applicable instead of those of the GPL.
22 *
23 * You may elect to license modified versions of this file under the
24 * terms and conditions of either the GPL or the CDDL or both.
25 */
26
27
28/*********************************************************************************************************************************
29* Header Files *
30*********************************************************************************************************************************/
31#include <iprt/req.h>
32#include "internal/iprt.h"
33
34#include <iprt/assert.h>
35#include <iprt/asm.h>
36#include <iprt/critsect.h>
37#include <iprt/err.h>
38#include <iprt/list.h>
39#include <iprt/log.h>
40#include <iprt/mem.h>
41#include <iprt/string.h>
42#include <iprt/time.h>
43#include <iprt/semaphore.h>
44#include <iprt/thread.h>
45
46#include "internal/req.h"
47#include "internal/magics.h"
48
49
50/*********************************************************************************************************************************
51* Defined Constants And Macros *
52*********************************************************************************************************************************/
53/** The max number of worker threads. */
54#define RTREQPOOL_MAX_THREADS UINT32_C(16384)
55/** The max number of milliseconds to push back. */
56#define RTREQPOOL_PUSH_BACK_MAX_MS RT_MS_1MIN
57/** The max number of free requests to keep around. */
58#define RTREQPOOL_MAX_FREE_REQUESTS (RTREQPOOL_MAX_THREADS * 2U)
59
60
61/*********************************************************************************************************************************
62* Structures and Typedefs *
63*********************************************************************************************************************************/
64typedef struct RTREQPOOLTHREAD
65{
66 /** Node in the RTREQPOOLINT::IdleThreads list. */
67 RTLISTNODE IdleNode;
68 /** Node in the RTREQPOOLINT::WorkerThreads list. */
69 RTLISTNODE ListNode;
70
71 /** The submit timestamp of the pending request. */
72 uint64_t uPendingNanoTs;
73 /** The submit timestamp of the request processing. */
74 uint64_t uProcessingNanoTs;
75 /** When this CPU went idle the last time. */
76 uint64_t uIdleNanoTs;
77 /** The number of requests processed by this thread. */
78 uint64_t cReqProcessed;
79 /** Total time the requests processed by this thread took to process. */
80 uint64_t cNsTotalReqProcessing;
81 /** Total time the requests processed by this thread had to wait in
82 * the queue before being scheduled. */
83 uint64_t cNsTotalReqQueued;
84 /** The CPU this was scheduled last time we checked. */
85 RTCPUID idLastCpu;
86
87 /** The submitter will put an incoming request here when scheduling an idle
88 * thread. */
89 PRTREQINT volatile pTodoReq;
90 /** The request the thread is currently processing. */
91 PRTREQINT volatile pPendingReq;
92
93 /** The thread handle. */
94 RTTHREAD hThread;
95 /** Nano seconds timestamp representing the birth time of the thread. */
96 uint64_t uBirthNanoTs;
97 /** Pointer to the request thread pool instance the thread is associated
98 * with. */
99 struct RTREQPOOLINT *pPool;
100} RTREQPOOLTHREAD;
101/** Pointer to a worker thread. */
102typedef RTREQPOOLTHREAD *PRTREQPOOLTHREAD;
103
104/**
105 * Request thread pool instance data.
106 */
107typedef struct RTREQPOOLINT
108{
109 /** Magic value (RTREQPOOL_MAGIC). */
110 uint32_t u32Magic;
111 /** The request pool name. */
112 char szName[12];
113
114 /** @name Config
115 * @{ */
116 /** The worker thread type. */
117 RTTHREADTYPE enmThreadType;
118 /** The maximum number of worker threads. */
119 uint32_t cMaxThreads;
120 /** The minimum number of worker threads. */
121 uint32_t cMinThreads;
122 /** The number of milliseconds a thread needs to be idle before it is
123 * considered for retirement. */
124 uint32_t cMsMinIdle;
125 /** cMsMinIdle in nano seconds. */
126 uint64_t cNsMinIdle;
127 /** The idle thread sleep interval in milliseconds. */
128 RTMSINTERVAL cMsIdleSleep;
129 /** The number of threads which should be spawned before throttling kicks
130 * in. */
131 uint32_t cThreadsPushBackThreshold;
132 /** The max number of milliseconds to push back a submitter before creating
133 * a new worker thread once the threshold has been reached. */
134 uint32_t cMsMaxPushBack;
135 /** The minimum number of milliseconds to push back a submitter before
136 * creating a new worker thread once the threshold has been reached. */
137 uint32_t cMsMinPushBack;
138 /** The max number of free requests in the recycle LIFO. */
139 uint32_t cMaxFreeRequests;
140 /** @} */
141
142 /** Signaled by terminating worker threads. */
143 RTSEMEVENTMULTI hThreadTermEvt;
144
145 /** Destruction indicator. The worker threads checks in their loop. */
146 bool volatile fDestructing;
147
148 /** The current submitter push back in milliseconds.
149 * This is recalculated when worker threads come and go. */
150 uint32_t cMsCurPushBack;
151 /** The current number of worker threads. */
152 uint32_t cCurThreads;
153 /** Statistics: The total number of threads created. */
154 uint32_t cThreadsCreated;
155 /** Statistics: The timestamp when the last thread was created. */
156 uint64_t uLastThreadCreateNanoTs;
157 /** Linked list of worker threads. */
158 RTLISTANCHOR WorkerThreads;
159
160 /** The number of requests processed and counted in the time totals. */
161 uint64_t cReqProcessed;
162 /** Total time the requests processed by this thread took to process. */
163 uint64_t cNsTotalReqProcessing;
164 /** Total time the requests processed by this thread had to wait in
165 * the queue before being scheduled. */
166 uint64_t cNsTotalReqQueued;
167
168 /** Reference counter. */
169 uint32_t volatile cRefs;
170 /** The number of idle thread or threads in the process of becoming
171 * idle. This is increased before the to-be-idle thread tries to enter
172 * the critical section and add itself to the list. */
173 uint32_t volatile cIdleThreads;
174 /** Linked list of idle threads. */
175 RTLISTANCHOR IdleThreads;
176
177 /** Head of the request FIFO. */
178 PRTREQINT pPendingRequests;
179 /** Where to insert the next request. */
180 PRTREQINT *ppPendingRequests;
181 /** The number of requests currently pending. */
182 uint32_t cCurPendingRequests;
183 /** The number of requests currently being executed. */
184 uint32_t volatile cCurActiveRequests;
185 /** The number of requests submitted. */
186 uint64_t cReqSubmitted;
187
188 /** Head of the request recycling LIFO. */
189 PRTREQINT pFreeRequests;
190 /** The number of requests in the recycling LIFO. This is read without
191 * entering the critical section, thus volatile. */
192 uint32_t volatile cCurFreeRequests;
193
194 /** Critical section serializing access to members of this structure. */
195 RTCRITSECT CritSect;
196
197} RTREQPOOLINT;
198
199
200/**
201 * Used by exiting thread and the pool destruction code to cancel unexpected
202 * requests.
203 *
204 * @param pReq The request.
205 */
206static void rtReqPoolCancelReq(PRTREQINT pReq)
207{
208 pReq->uOwner.hPool = NIL_RTREQPOOL; /* force free */
209 pReq->enmState = RTREQSTATE_COMPLETED;
210 ASMAtomicWriteS32(&pReq->iStatusX, VERR_CANCELLED);
211 if (pReq->hPushBackEvt != NIL_RTSEMEVENTMULTI)
212 RTSemEventMultiSignal(pReq->hPushBackEvt);
213 RTSemEventSignal(pReq->EventSem);
214
215 RTReqRelease(pReq);
216}
217
218
219/**
220 * Recalculate the max pushback interval when adding or removing worker threads.
221 *
222 * @param pPool The pool. cMsCurPushBack will be changed.
223 */
224static void rtReqPoolRecalcPushBack(PRTREQPOOLINT pPool)
225{
226 uint32_t const cMsRange = pPool->cMsMaxPushBack - pPool->cMsMinPushBack;
227 uint32_t const cSteps = pPool->cMaxThreads - pPool->cThreadsPushBackThreshold;
228 uint32_t const iStep = pPool->cCurThreads - pPool->cThreadsPushBackThreshold;
229
230 uint32_t cMsCurPushBack;
231 if ((cMsRange >> 2) >= cSteps)
232 cMsCurPushBack = cMsRange / cSteps * iStep;
233 else
234 cMsCurPushBack = (uint32_t)( (uint64_t)cMsRange * RT_NS_1MS / cSteps * iStep / RT_NS_1MS );
235 cMsCurPushBack += pPool->cMsMinPushBack;
236
237 pPool->cMsCurPushBack = cMsCurPushBack;
238}
239
240
241
242/**
243 * Performs thread exit.
244 *
245 * @returns Thread termination status code (VINF_SUCCESS).
246 * @param pPool The pool.
247 * @param pThread The thread.
248 * @param fLocked Whether we are inside the critical section
249 * already.
250 */
251static int rtReqPoolThreadExit(PRTREQPOOLINT pPool, PRTREQPOOLTHREAD pThread, bool fLocked)
252{
253 if (!fLocked)
254 RTCritSectEnter(&pPool->CritSect);
255
256 /* Get out of the idle list. */
257 if (!RTListIsEmpty(&pThread->IdleNode))
258 {
259 RTListNodeRemove(&pThread->IdleNode);
260 Assert(pPool->cIdleThreads > 0);
261 ASMAtomicDecU32(&pPool->cIdleThreads);
262 }
263
264 /* Get out of the thread list. */
265 RTListNodeRemove(&pThread->ListNode);
266 Assert(pPool->cCurThreads > 0);
267 pPool->cCurThreads--;
268 rtReqPoolRecalcPushBack(pPool);
269
270 /* This shouldn't happen... */
271 PRTREQINT pReq = pThread->pTodoReq;
272 if (pReq)
273 {
274 AssertFailed();
275 pThread->pTodoReq = NULL;
276 rtReqPoolCancelReq(pReq);
277 }
278
279 /* If we're the last thread terminating, ping the destruction thread before
280 we leave the critical section. */
281 if ( RTListIsEmpty(&pPool->WorkerThreads)
282 && pPool->hThreadTermEvt != NIL_RTSEMEVENT)
283 RTSemEventMultiSignal(pPool->hThreadTermEvt);
284
285 RTCritSectLeave(&pPool->CritSect);
286
287 RTMemFree(pThread);
288 return VINF_SUCCESS;
289}
290
291
292
293/**
294 * Process one request.
295 *
296 * @param pPool The pool.
297 * @param pThread The worker thread.
298 * @param pReq The request to process.
299 */
300static void rtReqPoolThreadProcessRequest(PRTREQPOOLINT pPool, PRTREQPOOLTHREAD pThread, PRTREQINT pReq)
301{
302 /*
303 * Update thread state.
304 */
305 pThread->uProcessingNanoTs = RTTimeNanoTS();
306 pThread->uPendingNanoTs = pReq->uSubmitNanoTs;
307 pThread->pPendingReq = pReq;
308 ASMAtomicIncU32(&pPool->cCurActiveRequests);
309 Assert(pReq->u32Magic == RTREQ_MAGIC);
310
311 /*
312 * Do the actual processing.
313 */
314 rtReqProcessOne(pReq);
315
316 /*
317 * Update thread statistics and state.
318 */
319 ASMAtomicDecU32(&pPool->cCurActiveRequests);
320 pThread->pPendingReq = NULL;
321 uint64_t const uNsTsEnd = RTTimeNanoTS();
322 pThread->cNsTotalReqProcessing += uNsTsEnd - pThread->uProcessingNanoTs;
323 pThread->cNsTotalReqQueued += pThread->uProcessingNanoTs - pThread->uPendingNanoTs;
324 pThread->cReqProcessed++;
325}
326
327
328
329/**
330 * The Worker Thread Procedure.
331 *
332 * @returns VINF_SUCCESS.
333 * @param hThreadSelf The thread handle (unused).
334 * @param pvArg Pointer to the thread data.
335 */
336static DECLCALLBACK(int) rtReqPoolThreadProc(RTTHREAD hThreadSelf, void *pvArg)
337{
338 PRTREQPOOLTHREAD pThread = (PRTREQPOOLTHREAD)pvArg;
339 PRTREQPOOLINT pPool = pThread->pPool;
340
341 /*
342 * The work loop.
343 */
344 uint64_t cReqPrevProcessedIdle = UINT64_MAX;
345 uint64_t cReqPrevProcessedStat = 0;
346 uint64_t cNsPrevTotalReqProcessing = 0;
347 uint64_t cNsPrevTotalReqQueued = 0;
348 while (!pPool->fDestructing)
349 {
350 /*
351 * Process pending work.
352 */
353
354 /* Check if anything is scheduled directly to us. */
355 PRTREQINT pReq = ASMAtomicXchgPtrT(&pThread->pTodoReq, NULL, PRTREQINT);
356 if (pReq)
357 {
358 Assert(RTListIsEmpty(&pThread->IdleNode)); /* Must not be in the idle list. */
359 rtReqPoolThreadProcessRequest(pPool, pThread, pReq);
360 continue;
361 }
362
363 ASMAtomicIncU32(&pPool->cIdleThreads);
364 RTCritSectEnter(&pPool->CritSect);
365
366 /* Update the global statistics. */
367 if (cReqPrevProcessedStat != pThread->cReqProcessed)
368 {
369 pPool->cReqProcessed += pThread->cReqProcessed - cReqPrevProcessedStat;
370 cReqPrevProcessedStat = pThread->cReqProcessed;
371 pPool->cNsTotalReqProcessing += pThread->cNsTotalReqProcessing - cNsPrevTotalReqProcessing;
372 cNsPrevTotalReqProcessing = pThread->cNsTotalReqProcessing;
373 pPool->cNsTotalReqQueued += pThread->cNsTotalReqQueued - cNsPrevTotalReqQueued;
374 cNsPrevTotalReqQueued = pThread->cNsTotalReqQueued;
375 }
376
377 /* Recheck the todo request pointer after entering the critsect. */
378 pReq = ASMAtomicXchgPtrT(&pThread->pTodoReq, NULL, PRTREQINT);
379 if (pReq)
380 {
381 Assert(RTListIsEmpty(&pThread->IdleNode)); /* Must not be in the idle list. */
382 RTCritSectLeave(&pPool->CritSect);
383
384 rtReqPoolThreadProcessRequest(pPool, pThread, pReq);
385 continue;
386 }
387
388 /* Any pending requests in the queue? */
389 pReq = pPool->pPendingRequests;
390 if (pReq)
391 {
392 pPool->pPendingRequests = pReq->pNext;
393 if (pReq->pNext == NULL)
394 pPool->ppPendingRequests = &pPool->pPendingRequests;
395 Assert(pPool->cCurPendingRequests > 0);
396 pPool->cCurPendingRequests--;
397
398 /* Un-idle ourselves and process the request. */
399 if (!RTListIsEmpty(&pThread->IdleNode))
400 {
401 RTListNodeRemove(&pThread->IdleNode);
402 RTListInit(&pThread->IdleNode);
403 ASMAtomicDecU32(&pPool->cIdleThreads);
404 }
405 ASMAtomicDecU32(&pPool->cIdleThreads);
406 RTCritSectLeave(&pPool->CritSect);
407
408 rtReqPoolThreadProcessRequest(pPool, pThread, pReq);
409 continue;
410 }
411
412 /*
413 * Nothing to do, go idle.
414 */
415 if (cReqPrevProcessedIdle != pThread->cReqProcessed)
416 {
417 cReqPrevProcessedIdle = pThread->cReqProcessed;
418 pThread->uIdleNanoTs = RTTimeNanoTS();
419 }
420 else if (pPool->cCurThreads > pPool->cMinThreads)
421 {
422 uint64_t cNsIdle = RTTimeNanoTS() - pThread->uIdleNanoTs;
423 if (cNsIdle >= pPool->cNsMinIdle)
424 return rtReqPoolThreadExit(pPool, pThread, true /*fLocked*/);
425 }
426
427 if (RTListIsEmpty(&pThread->IdleNode))
428 RTListPrepend(&pPool->IdleThreads, &pThread->IdleNode);
429 else
430 ASMAtomicDecU32(&pPool->cIdleThreads);
431 RTThreadUserReset(hThreadSelf);
432 uint32_t const cMsSleep = pPool->cMsIdleSleep;
433
434 RTCritSectLeave(&pPool->CritSect);
435
436 RTThreadUserWait(hThreadSelf, cMsSleep);
437 }
438
439 return rtReqPoolThreadExit(pPool, pThread, false /*fLocked*/);
440}
441
442
443/**
444 * Create a new worker thread.
445 *
446 * @param pPool The pool needing new worker thread.
447 * @remarks Caller owns the critical section
448 */
449static void rtReqPoolCreateNewWorker(RTREQPOOL pPool)
450{
451 PRTREQPOOLTHREAD pThread = (PRTREQPOOLTHREAD)RTMemAllocZ(sizeof(RTREQPOOLTHREAD));
452 if (!pThread)
453 return;
454
455 pThread->uBirthNanoTs = RTTimeNanoTS();
456 pThread->pPool = pPool;
457 pThread->idLastCpu = NIL_RTCPUID;
458 pThread->hThread = NIL_RTTHREAD;
459 RTListInit(&pThread->IdleNode);
460 RTListAppend(&pPool->WorkerThreads, &pThread->ListNode);
461 pPool->cCurThreads++;
462 pPool->cThreadsCreated++;
463
464 int rc = RTThreadCreateF(&pThread->hThread, rtReqPoolThreadProc, pThread, 0 /*default stack size*/,
465 pPool->enmThreadType, 0 /*fFlags*/, "%s%02u", pPool->szName, pPool->cThreadsCreated);
466 if (RT_SUCCESS(rc))
467 pPool->uLastThreadCreateNanoTs = pThread->uBirthNanoTs;
468 else
469 {
470 pPool->cCurThreads--;
471 RTListNodeRemove(&pThread->ListNode);
472 RTMemFree(pThread);
473 }
474}
475
476
477/**
478 * Repel the submitter, giving the worker threads a chance to process the
479 * incoming request.
480 *
481 * @returns Success if a worker picked up the request, failure if not. The
482 * critical section has been left on success, while we'll be inside it
483 * on failure.
484 * @param pPool The pool.
485 * @param pReq The incoming request.
486 */
487static int rtReqPoolPushBack(PRTREQPOOLINT pPool, PRTREQINT pReq)
488{
489 /*
490 * Lazily create the push back semaphore that we'll be blociing on.
491 */
492 int rc;
493 RTSEMEVENTMULTI hEvt = pReq->hPushBackEvt;
494 if (hEvt == NIL_RTSEMEVENTMULTI)
495 {
496 rc = RTSemEventMultiCreate(&hEvt);
497 if (RT_FAILURE(rc))
498 return rc;
499 pReq->hPushBackEvt = hEvt;
500 }
501
502 /*
503 * Prepare the request and semaphore.
504 */
505 uint32_t const cMsTimeout = pPool->cMsCurPushBack;
506 pReq->fSignalPushBack = true;
507 RTReqRetain(pReq);
508 RTSemEventMultiReset(hEvt);
509
510 RTCritSectLeave(&pPool->CritSect);
511
512 /*
513 * Block.
514 */
515 rc = RTSemEventMultiWait(hEvt, cMsTimeout);
516 if (RT_FAILURE(rc))
517 {
518 AssertMsg(rc == VERR_TIMEOUT, ("%Rrc\n", rc));
519 RTCritSectEnter(&pPool->CritSect);
520 }
521 RTReqRelease(pReq);
522 return rc;
523}
524
525
526
527DECLHIDDEN(void) rtReqPoolSubmit(PRTREQPOOLINT pPool, PRTREQINT pReq)
528{
529 RTCritSectEnter(&pPool->CritSect);
530
531 pPool->cReqSubmitted++;
532
533 /*
534 * Try schedule the request to a thread that's currently idle.
535 */
536 PRTREQPOOLTHREAD pThread = RTListGetFirst(&pPool->IdleThreads, RTREQPOOLTHREAD, IdleNode);
537 if (pThread)
538 {
539 /** @todo CPU affinity??? */
540 ASMAtomicWritePtr(&pThread->pTodoReq, pReq);
541
542 RTListNodeRemove(&pThread->IdleNode);
543 RTListInit(&pThread->IdleNode);
544 ASMAtomicDecU32(&pPool->cIdleThreads);
545
546 RTThreadUserSignal(pThread->hThread);
547
548 RTCritSectLeave(&pPool->CritSect);
549 return;
550 }
551 Assert(RTListIsEmpty(&pPool->IdleThreads));
552
553 /*
554 * Put the request in the pending queue.
555 */
556 pReq->pNext = NULL;
557 *pPool->ppPendingRequests = pReq;
558 pPool->ppPendingRequests = (PRTREQINT*)&pReq->pNext;
559 pPool->cCurPendingRequests++;
560
561 /*
562 * If there is an incoming worker thread already or we've reached the
563 * maximum number of worker threads, we're done.
564 */
565 if ( pPool->cIdleThreads > 0
566 || pPool->cCurThreads >= pPool->cMaxThreads)
567 {
568 RTCritSectLeave(&pPool->CritSect);
569 return;
570 }
571
572 /*
573 * Push back before creating a new worker thread.
574 */
575 if ( pPool->cCurThreads > pPool->cThreadsPushBackThreshold
576 && (RTTimeNanoTS() - pReq->uSubmitNanoTs) / RT_NS_1MS >= pPool->cMsCurPushBack )
577 {
578 int rc = rtReqPoolPushBack(pPool, pReq);
579 if (RT_SUCCESS(rc))
580 return;
581 }
582
583 /*
584 * Create a new thread for processing the request.
585 * For simplicity, we don't bother leaving the critical section while doing so.
586 */
587 rtReqPoolCreateNewWorker(pPool);
588
589 RTCritSectLeave(&pPool->CritSect);
590 return;
591}
592
593
594/**
595 * Frees a requst.
596 *
597 * @returns true if recycled, false if not.
598 * @param pPool The request thread pool.
599 * @param pReq The request.
600 */
601DECLHIDDEN(bool) rtReqPoolRecycle(PRTREQPOOLINT pPool, PRTREQINT pReq)
602{
603 if ( pPool
604 && ASMAtomicReadU32(&pPool->cCurFreeRequests) < pPool->cMaxFreeRequests)
605 {
606 RTCritSectEnter(&pPool->CritSect);
607 if (pPool->cCurFreeRequests < pPool->cMaxFreeRequests)
608 {
609 pReq->pNext = pPool->pFreeRequests;
610 pPool->pFreeRequests = pReq;
611 ASMAtomicIncU32(&pPool->cCurFreeRequests);
612
613 RTCritSectLeave(&pPool->CritSect);
614 return true;
615 }
616
617 RTCritSectLeave(&pPool->CritSect);
618 }
619 return false;
620}
621
622
623RTDECL(int) RTReqPoolCreate(uint32_t cMaxThreads, RTMSINTERVAL cMsMinIdle,
624 uint32_t cThreadsPushBackThreshold, uint32_t cMsMaxPushBack,
625 const char *pszName, PRTREQPOOL phPool)
626{
627 /*
628 * Validate and massage the config.
629 */
630 if (cMaxThreads == UINT32_MAX)
631 cMaxThreads = RTREQPOOL_MAX_THREADS;
632 AssertMsgReturn(cMaxThreads > 0 && cMaxThreads <= RTREQPOOL_MAX_THREADS, ("%u\n", cMaxThreads), VERR_OUT_OF_RANGE);
633 uint32_t const cMinThreads = cMaxThreads > 2 ? 2 : cMaxThreads - 1;
634
635 if (cThreadsPushBackThreshold == 0)
636 cThreadsPushBackThreshold = cMinThreads;
637 else if (cThreadsPushBackThreshold == UINT32_MAX)
638 cThreadsPushBackThreshold = cMaxThreads;
639 AssertMsgReturn(cThreadsPushBackThreshold <= cMaxThreads, ("%u/%u\n", cThreadsPushBackThreshold, cMaxThreads), VERR_OUT_OF_RANGE);
640
641 if (cMsMaxPushBack == UINT32_MAX)
642 cMsMaxPushBack = RTREQPOOL_PUSH_BACK_MAX_MS;
643 AssertMsgReturn(cMsMaxPushBack <= RTREQPOOL_PUSH_BACK_MAX_MS, ("%llu\n", cMsMaxPushBack), VERR_OUT_OF_RANGE);
644 uint32_t const cMsMinPushBack = cMsMaxPushBack >= 200 ? 100 : cMsMaxPushBack / 2;
645
646 AssertPtrReturn(pszName, VERR_INVALID_POINTER);
647 size_t cchName = strlen(pszName);
648 AssertReturn(cchName > 0, VERR_INVALID_PARAMETER);
649 Assert(cchName <= 10);
650
651 AssertPtrReturn(phPool, VERR_INVALID_POINTER);
652
653 /*
654 * Create and initialize the pool.
655 */
656 PRTREQPOOLINT pPool = (PRTREQPOOLINT)RTMemAlloc(sizeof(*pPool));
657 if (!pPool)
658 return VERR_NO_MEMORY;
659
660 pPool->u32Magic = RTREQPOOL_MAGIC;
661 RTStrCopy(pPool->szName, sizeof(pPool->szName), pszName);
662
663 pPool->enmThreadType = RTTHREADTYPE_DEFAULT;
664 pPool->cMaxThreads = cMaxThreads;
665 pPool->cMinThreads = cMinThreads;
666 pPool->cMsMinIdle = cMsMinIdle == RT_INDEFINITE_WAIT || cMsMinIdle >= UINT32_MAX ? UINT32_MAX : cMsMinIdle;
667 pPool->cNsMinIdle = pPool->cMsMinIdle == UINT32_MAX ? UINT64_MAX : cMsMinIdle * RT_NS_1MS_64;
668 pPool->cMsIdleSleep = pPool->cMsMinIdle == UINT32_MAX ? RT_INDEFINITE_WAIT : RT_MAX(RT_MS_1SEC, pPool->cMsMinIdle);
669 pPool->cThreadsPushBackThreshold = cThreadsPushBackThreshold;
670 pPool->cMsMaxPushBack = cMsMaxPushBack;
671 pPool->cMsMinPushBack = cMsMinPushBack;
672 pPool->cMaxFreeRequests = cMaxThreads * 2;
673 pPool->hThreadTermEvt = NIL_RTSEMEVENTMULTI;
674 pPool->fDestructing = false;
675 pPool->cMsCurPushBack = 0;
676 pPool->cCurThreads = 0;
677 pPool->cThreadsCreated = 0;
678 pPool->uLastThreadCreateNanoTs = 0;
679 RTListInit(&pPool->WorkerThreads);
680 pPool->cReqProcessed = 0;
681 pPool->cNsTotalReqProcessing= 0;
682 pPool->cNsTotalReqQueued = 0;
683 pPool->cRefs = 1;
684 pPool->cIdleThreads = 0;
685 RTListInit(&pPool->IdleThreads);
686 pPool->pPendingRequests = NULL;
687 pPool->ppPendingRequests = &pPool->pPendingRequests;
688 pPool->cCurPendingRequests = 0;
689 pPool->cCurActiveRequests = 0;
690 pPool->cReqSubmitted = 0;
691 pPool->pFreeRequests = NULL;
692 pPool->cCurFreeRequests = 0;
693
694 int rc = RTSemEventMultiCreate(&pPool->hThreadTermEvt);
695 if (RT_SUCCESS(rc))
696 {
697 rc = RTCritSectInit(&pPool->CritSect);
698 if (RT_SUCCESS(rc))
699 {
700 *phPool = pPool;
701 return VINF_SUCCESS;
702 }
703
704 RTSemEventMultiDestroy(pPool->hThreadTermEvt);
705 }
706 pPool->u32Magic = RTREQPOOL_MAGIC_DEAD;
707 RTMemFree(pPool);
708 return rc;
709}
710
711
712
713RTDECL(int) RTReqPoolSetCfgVar(RTREQPOOL hPool, RTREQPOOLCFGVAR enmVar, uint64_t uValue)
714{
715 PRTREQPOOLINT pPool = hPool;
716 AssertPtrReturn(pPool, VERR_INVALID_HANDLE);
717 AssertReturn(pPool->u32Magic == RTREQPOOL_MAGIC, VERR_INVALID_HANDLE);
718 AssertReturn(enmVar > RTREQPOOLCFGVAR_INVALID && enmVar < RTREQPOOLCFGVAR_END, VERR_INVALID_PARAMETER);
719
720 RTCritSectEnter(&pPool->CritSect);
721
722 bool fWakeUpIdleThreads = false;
723 int rc = VINF_SUCCESS;
724 switch (enmVar)
725 {
726 case RTREQPOOLCFGVAR_THREAD_TYPE:
727 AssertMsgBreakStmt(uValue > (uint64_t)RTTHREADTYPE_INVALID && uValue < (uint64_t)RTTHREADTYPE_END,
728 ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
729
730 pPool->enmThreadType = (RTTHREADTYPE)uValue;
731 break;
732
733 case RTREQPOOLCFGVAR_MIN_THREADS:
734 AssertMsgBreakStmt(uValue <= RTREQPOOL_MAX_THREADS, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
735 fWakeUpIdleThreads = pPool->cMinThreads > (uint32_t)uValue;
736 pPool->cMinThreads = (uint32_t)uValue;
737 if (pPool->cMinThreads > pPool->cMaxThreads)
738 pPool->cMaxThreads = pPool->cMinThreads;
739 if ( pPool->cThreadsPushBackThreshold < pPool->cMinThreads
740 || pPool->cThreadsPushBackThreshold > pPool->cMaxThreads)
741 pPool->cThreadsPushBackThreshold = pPool->cMinThreads + (pPool->cMaxThreads - pPool->cMinThreads) / 2;
742 rtReqPoolRecalcPushBack(pPool);
743 break;
744
745 case RTREQPOOLCFGVAR_MAX_THREADS:
746 AssertMsgBreakStmt(uValue <= RTREQPOOL_MAX_THREADS && uValue >= 1, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
747 pPool->cMaxThreads = (uint32_t)uValue;
748 if (pPool->cMaxThreads < pPool->cMinThreads)
749 {
750 pPool->cMinThreads = pPool->cMaxThreads;
751 fWakeUpIdleThreads = true;
752 }
753 if (pPool->cMaxThreads < pPool->cThreadsPushBackThreshold)
754 pPool->cThreadsPushBackThreshold = pPool->cMinThreads + (pPool->cMaxThreads - pPool->cMinThreads) / 2;
755 rtReqPoolRecalcPushBack(pPool);
756 break;
757
758 case RTREQPOOLCFGVAR_MS_MIN_IDLE:
759 AssertMsgBreakStmt(uValue < UINT32_MAX || uValue == RT_INDEFINITE_WAIT, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
760 if (uValue < UINT32_MAX && uValue != RT_INDEFINITE_WAIT)
761 {
762 fWakeUpIdleThreads = pPool->cMsMinIdle != (uint32_t)uValue;
763 pPool->cMsMinIdle = (uint32_t)uValue;
764 pPool->cNsMinIdle = pPool->cMsMinIdle * RT_NS_1MS_64;
765 if (pPool->cMsIdleSleep > pPool->cMsMinIdle)
766 pPool->cMsIdleSleep = RT_MAX(RT_MS_1SEC, pPool->cMsMinIdle);
767 }
768 else
769 {
770 pPool->cMsMinIdle = UINT32_MAX;
771 pPool->cNsMinIdle = UINT64_MAX;
772 pPool->cMsIdleSleep = RT_INDEFINITE_WAIT;
773 }
774 break;
775
776 case RTREQPOOLCFGVAR_MS_IDLE_SLEEP:
777 AssertMsgBreakStmt(uValue <= RT_INDEFINITE_WAIT, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
778 fWakeUpIdleThreads = pPool->cMsMinIdle > (RTMSINTERVAL)uValue;
779 pPool->cMsIdleSleep = (RTMSINTERVAL)uValue;
780 if (pPool->cMsIdleSleep == RT_INDEFINITE_WAIT)
781 {
782 pPool->cMsMinIdle = UINT32_MAX;
783 pPool->cNsMinIdle = UINT64_MAX;
784 }
785 break;
786
787 case RTREQPOOLCFGVAR_PUSH_BACK_THRESHOLD:
788 if (uValue == UINT64_MAX)
789 pPool->cThreadsPushBackThreshold = pPool->cMaxThreads;
790 else if (uValue == 0)
791 pPool->cThreadsPushBackThreshold = pPool->cMinThreads;
792 else
793 {
794 AssertMsgBreakStmt(uValue <= pPool->cMaxThreads, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
795 AssertMsgBreakStmt(uValue >= pPool->cMinThreads, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
796 pPool->cThreadsPushBackThreshold = (uint32_t)uValue;
797 }
798 break;
799
800 case RTREQPOOLCFGVAR_PUSH_BACK_MIN_MS:
801 if (uValue == UINT32_MAX || uValue == UINT64_MAX)
802 uValue = RTREQPOOL_PUSH_BACK_MAX_MS;
803 else
804 AssertMsgBreakStmt(uValue <= RTREQPOOL_PUSH_BACK_MAX_MS, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
805 pPool->cMsMinPushBack = (uint32_t)uValue;
806 if (pPool->cMsMaxPushBack < pPool->cMsMinPushBack)
807 pPool->cMsMaxPushBack = pPool->cMsMinPushBack;
808 rtReqPoolRecalcPushBack(pPool);
809 break;
810
811 case RTREQPOOLCFGVAR_PUSH_BACK_MAX_MS:
812 if (uValue == UINT32_MAX || uValue == UINT64_MAX)
813 uValue = RTREQPOOL_PUSH_BACK_MAX_MS;
814 else
815 AssertMsgBreakStmt(uValue <= RTREQPOOL_PUSH_BACK_MAX_MS, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
816 pPool->cMsMaxPushBack = (uint32_t)uValue;
817 if (pPool->cMsMinPushBack < pPool->cMsMaxPushBack)
818 pPool->cMsMinPushBack = pPool->cMsMaxPushBack;
819 rtReqPoolRecalcPushBack(pPool);
820 break;
821
822 case RTREQPOOLCFGVAR_MAX_FREE_REQUESTS:
823 if (uValue == UINT64_MAX)
824 {
825 pPool->cMaxFreeRequests = pPool->cMaxThreads * 2;
826 if (pPool->cMaxFreeRequests < 16)
827 pPool->cMaxFreeRequests = 16;
828 }
829 else
830 {
831 AssertMsgBreakStmt(uValue <= RTREQPOOL_MAX_FREE_REQUESTS, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
832 pPool->cMaxFreeRequests = (uint32_t)uValue;
833 }
834
835 while (pPool->cCurFreeRequests > pPool->cMaxFreeRequests)
836 {
837 PRTREQINT pReq = pPool->pFreeRequests;
838 pPool->pFreeRequests = pReq->pNext;
839 ASMAtomicDecU32(&pPool->cCurFreeRequests);
840 rtReqFreeIt(pReq);
841 }
842 break;
843
844 default:
845 AssertFailed();
846 rc = VERR_IPE_NOT_REACHED_DEFAULT_CASE;
847 }
848
849 /* Wake up all idle threads if required. */
850 if (fWakeUpIdleThreads)
851 {
852 Assert(rc == VINF_SUCCESS);
853 PRTREQPOOLTHREAD pThread;
854 RTListForEach(&pPool->WorkerThreads, pThread, RTREQPOOLTHREAD, ListNode)
855 {
856 RTThreadUserSignal(pThread->hThread);
857 }
858 }
859
860 RTCritSectLeave(&pPool->CritSect);
861
862 return rc;
863}
864RT_EXPORT_SYMBOL(RTReqPoolSetCfgVar);
865
866
867RTDECL(uint64_t) RTReqPoolGetCfgVar(RTREQPOOL hPool, RTREQPOOLCFGVAR enmVar)
868{
869 PRTREQPOOLINT pPool = hPool;
870 AssertPtrReturn(pPool, UINT64_MAX);
871 AssertReturn(pPool->u32Magic == RTREQPOOL_MAGIC, UINT64_MAX);
872 AssertReturn(enmVar > RTREQPOOLCFGVAR_INVALID && enmVar < RTREQPOOLCFGVAR_END, UINT64_MAX);
873
874 RTCritSectEnter(&pPool->CritSect);
875
876 uint64_t u64;
877 switch (enmVar)
878 {
879 case RTREQPOOLCFGVAR_THREAD_TYPE:
880 u64 = pPool->enmThreadType;
881 break;
882
883 case RTREQPOOLCFGVAR_MIN_THREADS:
884 u64 = pPool->cMinThreads;
885 break;
886
887 case RTREQPOOLCFGVAR_MAX_THREADS:
888 u64 = pPool->cMaxThreads;
889 break;
890
891 case RTREQPOOLCFGVAR_MS_MIN_IDLE:
892 u64 = pPool->cMsMinIdle;
893 break;
894
895 case RTREQPOOLCFGVAR_MS_IDLE_SLEEP:
896 u64 = pPool->cMsIdleSleep;
897 break;
898
899 case RTREQPOOLCFGVAR_PUSH_BACK_THRESHOLD:
900 u64 = pPool->cThreadsPushBackThreshold;
901 break;
902
903 case RTREQPOOLCFGVAR_PUSH_BACK_MIN_MS:
904 u64 = pPool->cMsMinPushBack;
905 break;
906
907 case RTREQPOOLCFGVAR_PUSH_BACK_MAX_MS:
908 u64 = pPool->cMsMaxPushBack;
909 break;
910
911 case RTREQPOOLCFGVAR_MAX_FREE_REQUESTS:
912 u64 = pPool->cMaxFreeRequests;
913 break;
914
915 default:
916 AssertFailed();
917 u64 = UINT64_MAX;
918 break;
919 }
920
921 RTCritSectLeave(&pPool->CritSect);
922
923 return u64;
924}
925RT_EXPORT_SYMBOL(RTReqGetQueryCfgVar);
926
927
928RTDECL(uint64_t) RTReqPoolGetStat(RTREQPOOL hPool, RTREQPOOLSTAT enmStat)
929{
930 PRTREQPOOLINT pPool = hPool;
931 AssertPtrReturn(pPool, UINT64_MAX);
932 AssertReturn(pPool->u32Magic == RTREQPOOL_MAGIC, UINT64_MAX);
933 AssertReturn(enmStat > RTREQPOOLSTAT_INVALID && enmStat < RTREQPOOLSTAT_END, UINT64_MAX);
934
935 RTCritSectEnter(&pPool->CritSect);
936
937 uint64_t u64;
938 switch (enmStat)
939 {
940 case RTREQPOOLSTAT_THREADS: u64 = pPool->cCurThreads; break;
941 case RTREQPOOLSTAT_THREADS_CREATED: u64 = pPool->cThreadsCreated; break;
942 case RTREQPOOLSTAT_REQUESTS_PROCESSED: u64 = pPool->cReqProcessed; break;
943 case RTREQPOOLSTAT_REQUESTS_SUBMITTED: u64 = pPool->cReqSubmitted; break;
944 case RTREQPOOLSTAT_REQUESTS_PENDING: u64 = pPool->cCurPendingRequests; break;
945 case RTREQPOOLSTAT_REQUESTS_ACTIVE: u64 = pPool->cCurActiveRequests; break;
946 case RTREQPOOLSTAT_REQUESTS_FREE: u64 = pPool->cCurFreeRequests; break;
947 case RTREQPOOLSTAT_NS_TOTAL_REQ_PROCESSING: u64 = pPool->cNsTotalReqProcessing; break;
948 case RTREQPOOLSTAT_NS_TOTAL_REQ_QUEUED: u64 = pPool->cNsTotalReqQueued; break;
949 case RTREQPOOLSTAT_NS_AVERAGE_REQ_PROCESSING: u64 = pPool->cNsTotalReqProcessing / RT_MAX(pPool->cReqProcessed, 1); break;
950 case RTREQPOOLSTAT_NS_AVERAGE_REQ_QUEUED: u64 = pPool->cNsTotalReqQueued / RT_MAX(pPool->cReqProcessed, 1); break;
951 default:
952 AssertFailed();
953 u64 = UINT64_MAX;
954 break;
955 }
956
957 RTCritSectLeave(&pPool->CritSect);
958
959 return u64;
960}
961RT_EXPORT_SYMBOL(RTReqPoolGetStat);
962
963
964RTDECL(uint32_t) RTReqPoolRetain(RTREQPOOL hPool)
965{
966 PRTREQPOOLINT pPool = hPool;
967 AssertPtrReturn(pPool, UINT32_MAX);
968 AssertReturn(pPool->u32Magic == RTREQPOOL_MAGIC, UINT32_MAX);
969
970 return ASMAtomicIncU32(&pPool->cRefs);
971}
972RT_EXPORT_SYMBOL(RTReqPoolRetain);
973
974
975RTDECL(uint32_t) RTReqPoolRelease(RTREQPOOL hPool)
976{
977 /*
978 * Ignore NULL and validate the request.
979 */
980 if (!hPool)
981 return 0;
982 PRTREQPOOLINT pPool = hPool;
983 AssertPtrReturn(pPool, UINT32_MAX);
984 AssertReturn(pPool->u32Magic == RTREQPOOL_MAGIC, UINT32_MAX);
985
986 /*
987 * Drop a reference, free it when it reaches zero.
988 */
989 uint32_t cRefs = ASMAtomicDecU32(&pPool->cRefs);
990 if (cRefs == 0)
991 {
992 AssertReturn(ASMAtomicCmpXchgU32(&pPool->u32Magic, RTREQPOOL_MAGIC_DEAD, RTREQPOOL_MAGIC), UINT32_MAX);
993
994 RTCritSectEnter(&pPool->CritSect);
995#ifdef RT_STRICT
996 RTTHREAD const hSelf = RTThreadSelf();
997#endif
998
999 /* Indicate to the worker threads that we're shutting down. */
1000 ASMAtomicWriteBool(&pPool->fDestructing, true);
1001 PRTREQPOOLTHREAD pThread;
1002 RTListForEach(&pPool->WorkerThreads, pThread, RTREQPOOLTHREAD, ListNode)
1003 {
1004 Assert(pThread->hThread != hSelf);
1005 RTThreadUserSignal(pThread->hThread);
1006 }
1007
1008 /* Cancel pending requests. */
1009 Assert(!pPool->pPendingRequests);
1010 while (pPool->pPendingRequests)
1011 {
1012 PRTREQINT pReq = pPool->pPendingRequests;
1013 pPool->pPendingRequests = pReq->pNext;
1014 rtReqPoolCancelReq(pReq);
1015 }
1016 pPool->ppPendingRequests = NULL;
1017 pPool->cCurPendingRequests = 0;
1018
1019 /* Wait for the workers to shut down. */
1020 while (!RTListIsEmpty(&pPool->WorkerThreads))
1021 {
1022 RTCritSectLeave(&pPool->CritSect);
1023 RTSemEventMultiWait(pPool->hThreadTermEvt, RT_MS_1MIN);
1024 RTCritSectEnter(&pPool->CritSect);
1025 /** @todo should we wait forever here? */
1026 }
1027
1028 /* Free recycled requests. */
1029 for (;;)
1030 {
1031 PRTREQINT pReq = pPool->pFreeRequests;
1032 if (!pReq)
1033 break;
1034 pPool->pFreeRequests = pReq->pNext;
1035 pPool->cCurFreeRequests--;
1036 rtReqFreeIt(pReq);
1037 }
1038
1039 /* Finally, free the critical section and pool instance. */
1040 RTSemEventMultiDestroy(pPool->hThreadTermEvt);
1041 RTCritSectLeave(&pPool->CritSect);
1042 RTCritSectDelete(&pPool->CritSect);
1043 RTMemFree(pPool);
1044 }
1045
1046 return cRefs;
1047}
1048RT_EXPORT_SYMBOL(RTReqPoolRelease);
1049
1050
1051RTDECL(int) RTReqPoolAlloc(RTREQPOOL hPool, RTREQTYPE enmType, PRTREQ *phReq)
1052{
1053 PRTREQPOOLINT pPool = hPool;
1054 AssertPtrReturn(pPool, VERR_INVALID_HANDLE);
1055 AssertReturn(pPool->u32Magic == RTREQPOOL_MAGIC, VERR_INVALID_HANDLE);
1056
1057 /*
1058 * Try recycle old requests.
1059 */
1060 if (ASMAtomicReadU32(&pPool->cCurFreeRequests) > 0)
1061 {
1062 RTCritSectEnter(&pPool->CritSect);
1063 PRTREQINT pReq = pPool->pFreeRequests;
1064 if (pReq)
1065 {
1066 ASMAtomicDecU32(&pPool->cCurFreeRequests);
1067 pPool->pFreeRequests = pReq->pNext;
1068
1069 RTCritSectLeave(&pPool->CritSect);
1070
1071 Assert(pReq->fPoolOrQueue);
1072 Assert(pReq->uOwner.hPool == pPool);
1073
1074 int rc = rtReqReInit(pReq, enmType);
1075 if (RT_SUCCESS(rc))
1076 {
1077 *phReq = pReq;
1078 LogFlow(("RTReqPoolAlloc: returns VINF_SUCCESS *phReq=%p recycled\n", pReq));
1079 return rc;
1080 }
1081 }
1082 else
1083 RTCritSectLeave(&pPool->CritSect);
1084 }
1085
1086 /*
1087 * Allocate a new request.
1088 */
1089 int rc = rtReqAlloc(enmType, true /*fPoolOrQueue*/, pPool, phReq);
1090 LogFlow(("RTReqPoolAlloc: returns %Rrc *phReq=%p\n", rc, *phReq));
1091 return rc;
1092}
1093RT_EXPORT_SYMBOL(RTReqPoolAlloc);
1094
1095
1096RTDECL(int) RTReqPoolCallEx( RTREQPOOL hPool, RTMSINTERVAL cMillies, PRTREQ *phReq, uint32_t fFlags, PFNRT pfnFunction, unsigned cArgs, ...)
1097{
1098 va_list va;
1099 va_start(va, cArgs);
1100 int rc = RTReqPoolCallExV(hPool, cMillies, phReq, fFlags, pfnFunction, cArgs, va);
1101 va_end(va);
1102 return rc;
1103}
1104RT_EXPORT_SYMBOL(RTReqPoolCallEx);
1105
1106
1107RTDECL(int) RTReqPoolCallExV(RTREQPOOL hPool, RTMSINTERVAL cMillies, PRTREQ *phReq, uint32_t fFlags, PFNRT pfnFunction, unsigned cArgs, va_list va)
1108{
1109 /*
1110 * Check input.
1111 */
1112 AssertPtrReturn(pfnFunction, VERR_INVALID_POINTER);
1113 AssertMsgReturn(!((uint32_t)fFlags & ~(uint32_t)(RTREQFLAGS_NO_WAIT | RTREQFLAGS_RETURN_MASK)), ("%#x\n", (uint32_t)fFlags), VERR_INVALID_PARAMETER);
1114 if (!(fFlags & RTREQFLAGS_NO_WAIT))
1115 {
1116 AssertPtrReturn(phReq, VERR_INVALID_POINTER);
1117 *phReq = NIL_RTREQ;
1118 }
1119
1120 PRTREQINT pReq = NULL;
1121 AssertMsgReturn(cArgs * sizeof(uintptr_t) <= sizeof(pReq->u.Internal.aArgs), ("cArgs=%u\n", cArgs), VERR_TOO_MUCH_DATA);
1122
1123 /*
1124 * Allocate and initialize the request.
1125 */
1126 int rc = RTReqPoolAlloc(hPool, RTREQTYPE_INTERNAL, &pReq);
1127 if (RT_FAILURE(rc))
1128 return rc;
1129 pReq->fFlags = fFlags;
1130 pReq->u.Internal.pfn = pfnFunction;
1131 pReq->u.Internal.cArgs = cArgs;
1132 for (unsigned iArg = 0; iArg < cArgs; iArg++)
1133 pReq->u.Internal.aArgs[iArg] = va_arg(va, uintptr_t);
1134
1135 /*
1136 * Submit the request.
1137 */
1138 rc = RTReqSubmit(pReq, cMillies);
1139 if ( rc != VINF_SUCCESS
1140 && rc != VERR_TIMEOUT)
1141 {
1142 Assert(rc != VERR_INTERRUPTED);
1143 RTReqRelease(pReq);
1144 pReq = NULL;
1145 }
1146
1147 if (!(fFlags & RTREQFLAGS_NO_WAIT))
1148 {
1149 *phReq = pReq;
1150 LogFlow(("RTReqPoolCallExV: returns %Rrc *phReq=%p\n", rc, pReq));
1151 }
1152 else
1153 LogFlow(("RTReqPoolCallExV: returns %Rrc\n", rc));
1154 return rc;
1155}
1156RT_EXPORT_SYMBOL(RTReqPoolCallExV);
1157
1158
1159RTDECL(int) RTReqPoolCallWait(RTREQPOOL hPool, PFNRT pfnFunction, unsigned cArgs, ...)
1160{
1161 PRTREQINT pReq;
1162 va_list va;
1163 va_start(va, cArgs);
1164 int rc = RTReqPoolCallExV(hPool, RT_INDEFINITE_WAIT, &pReq, RTREQFLAGS_IPRT_STATUS,
1165 pfnFunction, cArgs, va);
1166 va_end(va);
1167 if (RT_SUCCESS(rc))
1168 rc = pReq->iStatusX;
1169 RTReqRelease(pReq);
1170 return rc;
1171}
1172RT_EXPORT_SYMBOL(RTReqPoolCallWait);
1173
1174
1175RTDECL(int) RTReqPoolCallNoWait(RTREQPOOL hPool, PFNRT pfnFunction, unsigned cArgs, ...)
1176{
1177 va_list va;
1178 va_start(va, cArgs);
1179 int rc = RTReqPoolCallExV(hPool, 0, NULL, RTREQFLAGS_IPRT_STATUS | RTREQFLAGS_NO_WAIT,
1180 pfnFunction, cArgs, va);
1181 va_end(va);
1182 return rc;
1183}
1184RT_EXPORT_SYMBOL(RTReqPoolCallNoWait);
1185
1186
1187RTDECL(int) RTReqPoolCallVoidWait(RTREQPOOL hPool, PFNRT pfnFunction, unsigned cArgs, ...)
1188{
1189 PRTREQINT pReq;
1190 va_list va;
1191 va_start(va, cArgs);
1192 int rc = RTReqPoolCallExV(hPool, RT_INDEFINITE_WAIT, &pReq, RTREQFLAGS_VOID,
1193 pfnFunction, cArgs, va);
1194 va_end(va);
1195 if (RT_SUCCESS(rc))
1196 rc = pReq->iStatusX;
1197 RTReqRelease(pReq);
1198 return rc;
1199}
1200RT_EXPORT_SYMBOL(RTReqPoolCallVoidWait);
1201
1202
1203RTDECL(int) RTReqPoolCallVoidNoWait(RTREQPOOL hPool, PFNRT pfnFunction, unsigned cArgs, ...)
1204{
1205 va_list va;
1206 va_start(va, cArgs);
1207 int rc = RTReqPoolCallExV(hPool, 0, NULL, RTREQFLAGS_VOID | RTREQFLAGS_NO_WAIT,
1208 pfnFunction, cArgs, va);
1209 va_end(va);
1210 return rc;
1211}
1212RT_EXPORT_SYMBOL(RTReqPoolCallVoidNoWait);
1213
Note: See TracBrowser for help on using the repository browser.

© 2024 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette