VirtualBox

source: vbox/trunk/src/VBox/Runtime/common/misc/reqpool.cpp@ 39632

Last change on this file since 39632 was 39632, checked in by vboxsync, 13 years ago

Most of the reqpool code is there now. The testcase didn't turn up any bad bugs yet, so there must be some pretty bad stuff hiding in there. according to the rules...

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
File size: 40.7 KB
Line 
1/* $Id: reqpool.cpp 39632 2011-12-15 16:37:48Z vboxsync $ */
2/** @file
3 * IPRT - Request Pool.
4 */
5
6/*
7 * Copyright (C) 2006-2011 Oracle Corporation
8 *
9 * This file is part of VirtualBox Open Source Edition (OSE), as
10 * available from http://www.virtualbox.org. This file is free software;
11 * you can redistribute it and/or modify it under the terms of the GNU
12 * General Public License (GPL) as published by the Free Software
13 * Foundation, in version 2 as it comes in the "COPYING" file of the
14 * VirtualBox OSE distribution. VirtualBox OSE is distributed in the
15 * hope that it will be useful, but WITHOUT ANY WARRANTY of any kind.
16 *
17 * The contents of this file may alternatively be used under the terms
18 * of the Common Development and Distribution License Version 1.0
19 * (CDDL) only, as it comes in the "COPYING.CDDL" file of the
20 * VirtualBox OSE distribution, in which case the provisions of the
21 * CDDL are applicable instead of those of the GPL.
22 *
23 * You may elect to license modified versions of this file under the
24 * terms and conditions of either the GPL or the CDDL or both.
25 */
26
27
28/*******************************************************************************
29* Header Files *
30*******************************************************************************/
31#include <iprt/req.h>
32#include "internal/iprt.h"
33
34#include <iprt/assert.h>
35#include <iprt/asm.h>
36#include <iprt/critsect.h>
37#include <iprt/list.h>
38#include <iprt/log.h>
39#include <iprt/mem.h>
40#include <iprt/string.h>
41#include <iprt/time.h>
42#include <iprt/semaphore.h>
43#include <iprt/thread.h>
44
45#include "internal/req.h"
46#include "internal/magics.h"
47
48
49/*******************************************************************************
50* Defined Constants And Macros *
51*******************************************************************************/
52/** The max number of worker threads. */
53#define RTREQPOOL_MAX_THREADS UINT32_C(16384)
54/** The max number of milliseconds to push back. */
55#define RTREQPOOL_PUSH_BACK_MAX_MS RT_MS_1MIN
56/** The max number of free requests to keep around. */
57#define RTREQPOOL_MAX_FREE_REQUESTS (RTREQPOOL_MAX_THREADS * 2U)
58
59
60/*******************************************************************************
61* Structures and Typedefs *
62*******************************************************************************/
63typedef struct RTREQPOOLTHREAD
64{
65 /** Node in the RTREQPOOLINT::IdleThreads list. */
66 RTLISTNODE IdleNode;
67 /** Node in the RTREQPOOLINT::WorkerThreads list. */
68 RTLISTNODE ListNode;
69
70 /** The submit timestamp of the pending request. */
71 uint64_t uPendingNanoTs;
72 /** The submit timestamp of the request processing. */
73 uint64_t uProcessingNanoTs;
74 /** When this CPU went idle the last time. */
75 uint64_t uIdleNanoTs;
76 /** The number of requests processed by this thread. */
77 uint64_t cReqProcessed;
78 /** Total time the requests processed by this thread took to process. */
79 uint64_t cNsTotalReqProcessing;
80 /** Total time the requests processed by this thread had to wait in
81 * the queue before being scheduled. */
82 uint64_t cNsTotalReqQueued;
83 /** The CPU this was scheduled last time we checked. */
84 RTCPUID idLastCpu;
85
86 /** The submitter will put an incoming request here when scheduling an idle
87 * thread. */
88 PRTREQINT volatile pTodoReq;
89 /** The request the thread is currently processing. */
90 PRTREQINT volatile pPendingReq;
91
92 /** The thread handle. */
93 RTTHREAD hThread;
94 /** Nano seconds timestamp representing the birth time of the thread. */
95 uint64_t uBirthNanoTs;
96 /** Pointer to the request thread pool instance the thread is associated
97 * with. */
98 struct RTREQPOOLINT *pPool;
99} RTREQPOOLTHREAD;
100/** Pointer to a worker thread. */
101typedef RTREQPOOLTHREAD *PRTREQPOOLTHREAD;
102
103/**
104 * Request thread pool instance data.
105 */
106typedef struct RTREQPOOLINT
107{
108 /** Magic value (RTREQPOOL_MAGIC). */
109 uint32_t u32Magic;
110 /** The request pool name. */
111 char szName[12];
112
113 /** @name Config
114 * @{ */
115 /** The worker thread type. */
116 RTTHREADTYPE enmThreadType;
117 /** The maximum number of worker threads. */
118 uint32_t cMaxThreads;
119 /** The minimum number of worker threads. */
120 uint32_t cMinThreads;
121 /** The number of milliseconds a thread needs to be idle before it is
122 * considered for retirement. */
123 uint32_t cMsMinIdle;
124 /** cMsMinIdle in nano seconds. */
125 uint64_t cNsMinIdle;
126 /** The idle thread sleep interval in milliseconds. */
127 RTMSINTERVAL cMsIdleSleep;
128 /** The number of threads which should be spawned before throttling kicks
129 * in. */
130 uint32_t cThreadsPushBackThreshold;
131 /** The max number of milliseconds to push back a submitter before creating
132 * a new worker thread once the threshold has been reached. */
133 uint32_t cMsMaxPushBack;
134 /** The minimum number of milliseconds to push back a submitter before
135 * creating a new worker thread once the threshold has been reached. */
136 uint32_t cMsMinPushBack;
137 /** The max number of free requests in the recycle LIFO. */
138 uint32_t cMaxFreeRequests;
139 /** @} */
140
141 /** Signaled by terminating worker threads. */
142 RTSEMEVENTMULTI hThreadTermEvt;
143
144 /** Destruction indicator. The worker threads checks in their loop. */
145 bool volatile fDestructing;
146
147 /** The current submitter push back in milliseconds.
148 * This is recalculated when worker threads come and go. */
149 uint32_t cMsCurPushBack;
150 /** The current number of worker threads. */
151 uint32_t cCurThreads;
152 /** Statistics: The total number of threads created. */
153 uint32_t cThreadsCreated;
154 /** Statistics: The timestamp when the last thread was created. */
155 uint64_t uLastThreadCreateNanoTs;
156 /** Linked list of worker threads. */
157 RTLISTANCHOR WorkerThreads;
158
159 /** The number of requests processed and counted in the time totals. */
160 uint64_t cReqProcessed;
161 /** Total time the requests processed by this thread took to process. */
162 uint64_t cNsTotalReqProcessing;
163 /** Total time the requests processed by this thread had to wait in
164 * the queue before being scheduled. */
165 uint64_t cNsTotalReqQueued;
166
167 /** Reference counter. */
168 uint32_t volatile cRefs;
169 /** The number of idle thread or threads in the process of becoming
170 * idle. This is increased before the to-be-idle thread tries to enter
171 * the critical section and add itself to the list. */
172 uint32_t volatile cIdleThreads;
173 /** Linked list of idle threads. */
174 RTLISTANCHOR IdleThreads;
175
176 /** Head of the request FIFO. */
177 PRTREQINT pPendingRequests;
178 /** Where to insert the next request. */
179 PRTREQINT *ppPendingRequests;
180 /** The number of requests currently pending. */
181 uint32_t cCurPendingRequests;
182 /** The number of requests currently being executed. */
183 uint32_t volatile cCurActiveRequests;
184 /** The number of requests submitted. */
185 uint64_t cReqSubmitted;
186
187 /** Head of the request recycling LIFO. */
188 PRTREQINT pFreeRequests;
189 /** The number of requests in the recycling LIFO. This is read without
190 * entering the critical section, thus volatile. */
191 uint32_t volatile cCurFreeRequests;
192
193 /** Critical section serializing access to members of this structure. */
194 RTCRITSECT CritSect;
195
196} RTREQPOOLINT;
197
198
199/**
200 * Used by exiting thread and the pool destruction code to cancel unexpected
201 * requests.
202 *
203 * @param pReq The request.
204 */
205static void rtReqPoolCancelReq(PRTREQINT pReq)
206{
207 pReq->uOwner.hPool = NIL_RTREQPOOL; /* force free */
208 pReq->enmState = RTREQSTATE_COMPLETED;
209 ASMAtomicWriteS32(&pReq->iStatusX, VERR_CANCELLED);
210 if (pReq->hPushBackEvt != NIL_RTSEMEVENTMULTI)
211 RTSemEventMultiSignal(pReq->hPushBackEvt);
212 RTSemEventSignal(pReq->EventSem);
213
214 RTReqRelease(pReq);
215}
216
217
218/**
219 * Recalculate the max pushback interval when adding or removing worker threads.
220 *
221 * @param pPool The pool. cMsCurPushBack will be changed.
222 */
223static void rtReqPoolRecalcPushBack(PRTREQPOOLINT pPool)
224{
225 uint32_t const cMsRange = pPool->cMsMaxPushBack - pPool->cMsMinPushBack;
226 uint32_t const cSteps = pPool->cMaxThreads - pPool->cThreadsPushBackThreshold;
227 uint32_t const iStep = pPool->cCurThreads - pPool->cThreadsPushBackThreshold;
228
229 uint32_t cMsCurPushBack;
230 if ((cMsRange >> 2) >= cSteps)
231 cMsCurPushBack = cMsRange / cSteps * iStep;
232 else
233 cMsCurPushBack = (uint32_t)( (uint64_t)cMsRange * RT_NS_1MS / cSteps * iStep / RT_NS_1MS );
234 cMsCurPushBack += pPool->cMsMinPushBack;
235
236 pPool->cMsCurPushBack = cMsCurPushBack;
237}
238
239
240
241/**
242 * Performs thread exit.
243 *
244 * @returns Thread termination status code (VINF_SUCCESS).
245 * @param pPool The pool.
246 * @param pThread The thread.
247 * @param fLocked Whether we are inside the critical section
248 * already.
249 */
250static int rtReqPoolThreadExit(PRTREQPOOLINT pPool, PRTREQPOOLTHREAD pThread, bool fLocked)
251{
252 if (!fLocked)
253 RTCritSectEnter(&pPool->CritSect);
254
255 /* Get out of the idle list. */
256 if (!RTListIsEmpty(&pThread->IdleNode))
257 {
258 RTListNodeRemove(&pThread->IdleNode);
259 Assert(pPool->cIdleThreads > 0);
260 ASMAtomicDecU32(&pPool->cIdleThreads);
261 }
262
263 /* Get out of the thread list. */
264 RTListNodeRemove(&pThread->ListNode);
265 Assert(pPool->cCurThreads > 0);
266 pPool->cCurThreads--;
267 rtReqPoolRecalcPushBack(pPool);
268
269 /* This shouldn't happen... */
270 PRTREQINT pReq = pThread->pTodoReq;
271 if (pReq)
272 {
273 AssertFailed();
274 pThread->pTodoReq = NULL;
275 rtReqPoolCancelReq(pReq);
276 }
277
278 /* If we're the last thread terminating, ping the destruction thread before
279 we leave the critical section. */
280 if ( RTListIsEmpty(&pPool->WorkerThreads)
281 && pPool->hThreadTermEvt != NIL_RTSEMEVENT)
282 RTSemEventMultiSignal(pPool->hThreadTermEvt);
283
284 RTCritSectLeave(&pPool->CritSect);
285
286 return VINF_SUCCESS;
287}
288
289
290
291/**
292 * Process one request.
293 *
294 * @param pPool The pool.
295 * @param pThread The worker thread.
296 * @param pReq The request to process.
297 */
298static void rtReqPoolThreadProcessRequest(PRTREQPOOLINT pPool, PRTREQPOOLTHREAD pThread, PRTREQINT pReq)
299{
300 /*
301 * Update thread state.
302 */
303 pThread->uProcessingNanoTs = RTTimeNanoTS();
304 pThread->uPendingNanoTs = pReq->uSubmitNanoTs;
305 pThread->pPendingReq = pReq;
306 ASMAtomicIncU32(&pPool->cCurActiveRequests);
307 Assert(pReq->u32Magic == RTREQ_MAGIC);
308
309 /*
310 * Do the actual processing.
311 */
312 rtReqProcessOne(pReq);
313
314 /*
315 * Update thread statistics and state.
316 */
317 ASMAtomicDecU32(&pPool->cCurActiveRequests);
318 pThread->pPendingReq = NULL;
319 uint64_t const uNsTsEnd = RTTimeNanoTS();
320 pThread->cNsTotalReqProcessing += uNsTsEnd - pThread->uProcessingNanoTs;
321 pThread->cNsTotalReqQueued += uNsTsEnd - pThread->uPendingNanoTs;
322 pThread->cReqProcessed++;
323}
324
325
326
327/**
328 * The Worker Thread Procedure.
329 *
330 * @returns VINF_SUCCESS.
331 * @param hThreadSelf The thread handle (unused).
332 * @param pvArg Pointer to the thread data.
333 */
334static DECLCALLBACK(int) rtReqPoolThreadProc(RTTHREAD hThreadSelf, void *pvArg)
335{
336 PRTREQPOOLTHREAD pThread = (PRTREQPOOLTHREAD)pvArg;
337 PRTREQPOOLINT pPool = pThread->pPool;
338
339 /*
340 * The work loop.
341 */
342 uint64_t cReqPrevProcessedIdle = UINT64_MAX;
343 uint64_t cReqPrevProcessedStat = 0;
344 uint64_t cNsPrevTotalReqProcessing = 0;
345 uint64_t cNsPrevTotalReqQueued = 0;
346 while (!pPool->fDestructing)
347 {
348 /*
349 * Process pending work.
350 */
351
352 /* Check if anything is scheduled directly to us. */
353 PRTREQINT pReq = ASMAtomicXchgPtrT(&pThread->pTodoReq, NULL, PRTREQINT);
354 if (pReq)
355 {
356 Assert(RTListIsEmpty(&pThread->IdleNode)); /* Must not be in the idle list. */
357 rtReqPoolThreadProcessRequest(pPool, pThread, pReq);
358 continue;
359 }
360
361 ASMAtomicIncU32(&pPool->cIdleThreads);
362 RTCritSectEnter(&pPool->CritSect);
363
364 /* Update the global statistics. */
365 if (cReqPrevProcessedStat != pThread->cReqProcessed)
366 {
367 pPool->cReqProcessed = pThread->cReqProcessed - cReqPrevProcessedStat;
368 cReqPrevProcessedStat = pThread->cReqProcessed;
369 pPool->cNsTotalReqProcessing += pThread->cNsTotalReqProcessing - cNsPrevTotalReqProcessing;
370 cNsPrevTotalReqProcessing = pThread->cNsTotalReqProcessing;
371 pPool->cNsTotalReqQueued += pThread->cNsTotalReqQueued - cNsPrevTotalReqQueued;
372 cNsPrevTotalReqQueued = pThread->cNsTotalReqQueued;
373 }
374
375 /* Recheck the todo request pointer after entering the critsect. */
376 pReq = ASMAtomicXchgPtrT(&pThread->pTodoReq, NULL, PRTREQINT);
377 if (pReq)
378 {
379 Assert(RTListIsEmpty(&pThread->IdleNode)); /* Must not be in the idle list. */
380 RTCritSectLeave(&pPool->CritSect);
381
382 rtReqPoolThreadProcessRequest(pPool, pThread, pReq);
383 continue;
384 }
385
386 /* Any pending requests in the queue? */
387 pReq = pPool->pPendingRequests;
388 if (pReq)
389 {
390 pPool->pPendingRequests = pReq->pNext;
391 if (pReq->pNext == NULL)
392 pPool->ppPendingRequests = &pPool->pPendingRequests;
393 Assert(pPool->cCurPendingRequests > 0);
394 pPool->cCurPendingRequests--;
395
396 /* Un-idle ourselves and process the request. */
397 if (!RTListIsEmpty(&pThread->IdleNode))
398 {
399 RTListNodeRemove(&pThread->IdleNode);
400 RTListInit(&pThread->IdleNode);
401 ASMAtomicDecU32(&pPool->cIdleThreads);
402 }
403 ASMAtomicDecU32(&pPool->cIdleThreads);
404 RTCritSectLeave(&pPool->CritSect);
405
406 rtReqPoolThreadProcessRequest(pPool, pThread, pReq);
407 continue;
408 }
409
410 /*
411 * Nothing to do, go idle.
412 */
413 if (cReqPrevProcessedIdle != pThread->cReqProcessed)
414 {
415 cReqPrevProcessedIdle = pThread->cReqProcessed;
416 pThread->uIdleNanoTs = RTTimeNanoTS();
417 }
418 else if (pPool->cCurThreads > pPool->cMinThreads)
419 {
420 uint64_t cNsIdle = RTTimeNanoTS() - pThread->uIdleNanoTs;
421 if (cNsIdle >= pPool->cNsMinIdle)
422 return rtReqPoolThreadExit(pPool, pThread, true /*fLocked*/);
423 }
424
425 if (RTListIsEmpty(&pThread->IdleNode))
426 RTListPrepend(&pPool->IdleThreads, &pThread->IdleNode);
427 else
428 ASMAtomicDecU32(&pPool->cIdleThreads);
429 RTThreadUserReset(hThreadSelf);
430 uint32_t const cMsSleep = pPool->cMsIdleSleep;
431
432 RTCritSectLeave(&pPool->CritSect);
433
434 RTThreadUserWait(hThreadSelf, cMsSleep);
435 }
436
437 return rtReqPoolThreadExit(pPool, pThread, false /*fLocked*/);
438}
439
440
441/**
442 * Create a new worker thread.
443 *
444 * @param pPool The pool needing new worker thread.
445 * @remarks Caller owns the critical section
446 */
447static void rtReqPoolCreateNewWorker(RTREQPOOL pPool)
448{
449 PRTREQPOOLTHREAD pThread = (PRTREQPOOLTHREAD)RTMemAllocZ(sizeof(RTREQPOOLTHREAD));
450 if (!pThread)
451 return;
452
453 pThread->uBirthNanoTs = RTTimeNanoTS();
454 pThread->pPool = pPool;
455 pThread->idLastCpu = NIL_RTCPUID;
456 pThread->hThread = NIL_RTTHREAD;
457 RTListInit(&pThread->IdleNode);
458 RTListAppend(&pPool->WorkerThreads, &pThread->ListNode);
459 pPool->cCurThreads++;
460 pPool->cThreadsCreated++;
461
462 static uint32_t s_idThread = 0;
463 int rc = RTThreadCreateF(&pThread->hThread, rtReqPoolThreadProc, pThread, 0 /*default stack size*/,
464 pPool->enmThreadType, 0 /*fFlags*/, "REQPT%02u", ++s_idThread);
465 if (RT_SUCCESS(rc))
466 pPool->uLastThreadCreateNanoTs = pThread->uBirthNanoTs;
467 else
468 {
469 pPool->cCurThreads--;
470 RTListNodeRemove(&pThread->ListNode);
471 RTMemFree(pThread);
472 }
473}
474
475
476/**
477 * Repel the submitter, giving the worker threads a chance to process the
478 * incoming request.
479 *
480 * @returns Success if a worker picked up the request, failure if not. The
481 * critical section has been left on success, while we'll be inside it
482 * on failure.
483 * @param pPool The pool.
484 * @param pReq The incoming request.
485 */
486static int rtReqPoolPushBack(PRTREQPOOLINT pPool, PRTREQINT pReq)
487{
488 /*
489 * Lazily create the push back semaphore that we'll be blociing on.
490 */
491 int rc;
492 RTSEMEVENTMULTI hEvt = pReq->hPushBackEvt;
493 if (hEvt == NIL_RTSEMEVENTMULTI)
494 {
495 rc = RTSemEventMultiCreate(&hEvt);
496 if (RT_FAILURE(rc))
497 return rc;
498 pReq->hPushBackEvt = hEvt;
499 }
500
501 /*
502 * Prepare the request and semaphore.
503 */
504 uint32_t const cMsTimeout = pPool->cMsCurPushBack;
505 pReq->fSignalPushBack = true;
506 RTReqRetain(pReq);
507 RTSemEventMultiReset(hEvt);
508
509 RTCritSectLeave(&pPool->CritSect);
510
511 /*
512 * Block.
513 */
514 rc = RTSemEventMultiWait(hEvt, cMsTimeout);
515 if (RT_FAILURE(rc))
516 {
517 AssertMsg(rc == VERR_TIMEOUT, ("%Rrc\n", rc));
518 RTCritSectEnter(&pPool->CritSect);
519 }
520 RTReqRelease(pReq);
521 return rc;
522}
523
524
525
526DECLHIDDEN(void) rtReqPoolSubmit(PRTREQPOOLINT pPool, PRTREQINT pReq)
527{
528 RTCritSectEnter(&pPool->CritSect);
529
530 pPool->cReqSubmitted++;
531
532 /*
533 * Try schedule the request to a thread that's currently idle.
534 */
535 PRTREQPOOLTHREAD pThread = RTListGetFirst(&pPool->IdleThreads, RTREQPOOLTHREAD, IdleNode);
536 if (pThread)
537 {
538 /** @todo CPU affinity??? */
539 ASMAtomicWritePtr(&pThread->pTodoReq, pReq);
540
541 RTListNodeRemove(&pThread->IdleNode);
542 RTListInit(&pThread->IdleNode);
543 ASMAtomicDecU32(&pPool->cIdleThreads);
544
545 RTThreadUserSignal(pThread->hThread);
546
547 RTCritSectLeave(&pPool->CritSect);
548 return;
549 }
550 Assert(RTListIsEmpty(&pPool->IdleThreads));
551
552 /*
553 * Put the request in the pending queue.
554 */
555 pReq->pNext = NULL;
556 *pPool->ppPendingRequests = pReq;
557 pPool->ppPendingRequests = (PRTREQINT*)&pReq->pNext;
558 pPool->cCurPendingRequests++;
559
560 /*
561 * If there is an incoming worker thread already or we've reached the
562 * maximum number of worker threads, we're done.
563 */
564 if ( pPool->cIdleThreads > 0
565 || pPool->cCurThreads >= pPool->cMaxThreads)
566 {
567 RTCritSectLeave(&pPool->CritSect);
568 return;
569 }
570
571 /*
572 * Push back before creating a new worker thread.
573 */
574 if ( pPool->cCurThreads > pPool->cThreadsPushBackThreshold
575 && (RTTimeNanoTS() - pReq->uSubmitNanoTs) / RT_NS_1MS >= pPool->cMsCurPushBack )
576 {
577 int rc = rtReqPoolPushBack(pPool, pReq);
578 if (RT_SUCCESS(rc))
579 return;
580 }
581
582 /*
583 * Create a new thread for processing the request.
584 * For simplicity, we don't bother leaving the critical section while doing so.
585 */
586 rtReqPoolCreateNewWorker(pPool);
587
588 RTCritSectLeave(&pPool->CritSect);
589 return;
590}
591
592
593/**
594 * Frees a requst.
595 *
596 * @returns true if recycled, false if not.
597 * @param pPool The request thread pool.
598 * @param pReq The request.
599 */
600DECLHIDDEN(bool) rtReqPoolRecycle(PRTREQPOOLINT pPool, PRTREQINT pReq)
601{
602 if ( pPool
603 && ASMAtomicReadU32(&pPool->cCurFreeRequests) < pPool->cMaxFreeRequests)
604 {
605 RTCritSectEnter(&pPool->CritSect);
606 if (pPool->cCurFreeRequests < pPool->cMaxFreeRequests)
607 {
608 pReq->pNext = pPool->pFreeRequests;
609 pPool->pFreeRequests = pReq;
610 ASMAtomicIncU32(&pPool->cCurFreeRequests);
611
612 RTCritSectLeave(&pPool->CritSect);
613 return true;
614 }
615
616 RTCritSectLeave(&pPool->CritSect);
617 }
618 return false;
619}
620
621
622RTDECL(int) RTReqPoolCreate(uint32_t cMaxThreads, RTMSINTERVAL cMsMinIdle,
623 uint32_t cThreadsPushBackThreshold, uint32_t cMsMaxPushBack,
624 const char *pszName, PRTREQPOOL phPool)
625{
626 /*
627 * Validate and massage the config.
628 */
629 if (cMaxThreads == UINT32_MAX)
630 cMaxThreads = RTREQPOOL_MAX_THREADS;
631 AssertMsgReturn(cMaxThreads > 0 && cMaxThreads <= RTREQPOOL_MAX_THREADS, ("%u\n", cMaxThreads), VERR_OUT_OF_RANGE);
632 uint32_t const cMinThreads = cMaxThreads > 2 ? 2 : cMaxThreads - 1;
633
634 if (cThreadsPushBackThreshold == 0)
635 cThreadsPushBackThreshold = cMinThreads;
636 else if (cThreadsPushBackThreshold == UINT32_MAX)
637 cThreadsPushBackThreshold = cMaxThreads;
638 AssertMsgReturn(cThreadsPushBackThreshold <= cMaxThreads, ("%u/%u\n", cThreadsPushBackThreshold, cMaxThreads), VERR_OUT_OF_RANGE);
639
640 if (cMsMaxPushBack == UINT32_MAX)
641 cMsMaxPushBack = RTREQPOOL_PUSH_BACK_MAX_MS;
642 AssertMsgReturn(cMsMaxPushBack <= RTREQPOOL_PUSH_BACK_MAX_MS, ("%llu\n", cMsMaxPushBack), VERR_OUT_OF_RANGE);
643 uint32_t const cMsMinPushBack = cMsMaxPushBack >= 200 ? 100 : cMsMaxPushBack / 2;
644
645 AssertPtrReturn(pszName, VERR_INVALID_POINTER);
646 size_t cchName = strlen(pszName);
647 AssertReturn(cchName > 0, VERR_INVALID_PARAMETER);
648 Assert(cchName <= 10);
649
650 AssertPtrReturn(phPool, VERR_INVALID_POINTER);
651
652 /*
653 * Create and initialize the pool.
654 */
655 PRTREQPOOLINT pPool = (PRTREQPOOLINT)RTMemAlloc(sizeof(*pPool));
656 if (!pPool)
657 return VERR_NO_MEMORY;
658
659 pPool->u32Magic = RTREQPOOL_MAGIC;
660 RTStrCopy(pPool->szName, sizeof(pPool->szName), pszName);
661
662 pPool->enmThreadType = RTTHREADTYPE_DEFAULT;
663 pPool->cMaxThreads = cMaxThreads;
664 pPool->cMinThreads = cMinThreads;
665 pPool->cMsMinIdle = cMsMinIdle == RT_INDEFINITE_WAIT || cMsMinIdle >= UINT32_MAX ? UINT32_MAX : cMsMinIdle;
666 pPool->cNsMinIdle = pPool->cMsMinIdle == UINT32_MAX ? UINT64_MAX : cMsMinIdle * RT_NS_1MS_64;
667 pPool->cMsIdleSleep = pPool->cMsMinIdle == UINT32_MAX ? RT_INDEFINITE_WAIT : RT_MAX(RT_MS_1SEC, pPool->cMsMinIdle);
668 pPool->cThreadsPushBackThreshold = cThreadsPushBackThreshold;
669 pPool->cMsMaxPushBack = cMsMaxPushBack;
670 pPool->cMsMinPushBack = cMsMinPushBack;
671 pPool->cMaxFreeRequests = cMaxThreads * 2;
672 pPool->hThreadTermEvt = NIL_RTSEMEVENTMULTI;
673 pPool->fDestructing = false;
674 pPool->cMsCurPushBack = 0;
675 pPool->cCurThreads = 0;
676 pPool->cThreadsCreated = 0;
677 pPool->uLastThreadCreateNanoTs = 0;
678 RTListInit(&pPool->WorkerThreads);
679 pPool->cReqProcessed = 0;
680 pPool->cNsTotalReqProcessing= 0;
681 pPool->cNsTotalReqQueued = 0;
682 pPool->cRefs = 1;
683 pPool->cIdleThreads = 0;
684 RTListInit(&pPool->IdleThreads);
685 pPool->pPendingRequests = NULL;
686 pPool->ppPendingRequests = &pPool->pPendingRequests;
687 pPool->cCurPendingRequests = 0;
688 pPool->cCurActiveRequests = 0;
689 pPool->cReqSubmitted = 0;
690 pPool->pFreeRequests = NULL;
691 pPool->cCurFreeRequests = NULL;
692
693 int rc = RTSemEventMultiCreate(&pPool->hThreadTermEvt);
694 if (RT_SUCCESS(rc))
695 {
696 rc = RTCritSectInit(&pPool->CritSect);
697 if (RT_SUCCESS(rc))
698 {
699 *phPool = pPool;
700 return VINF_SUCCESS;
701 }
702
703 RTSemEventMultiDestroy(pPool->hThreadTermEvt);
704 }
705 pPool->u32Magic = RTREQPOOL_MAGIC_DEAD;
706 RTMemFree(pPool);
707 return rc;
708}
709
710
711
712RTDECL(int) RTReqPoolSetCfgVar(RTREQPOOL hPool, RTREQPOOLCFGVAR enmVar, uint64_t uValue)
713{
714 PRTREQPOOLINT pPool = hPool;
715 AssertPtrReturn(pPool, VERR_INVALID_HANDLE);
716 AssertReturn(pPool->u32Magic == RTREQPOOL_MAGIC, VERR_INVALID_HANDLE);
717 AssertReturn(enmVar > RTREQPOOLCFGVAR_INVALID && enmVar < RTREQPOOLCFGVAR_END, VERR_INVALID_PARAMETER);
718
719 RTCritSectEnter(&pPool->CritSect);
720
721 bool fWakeUpIdleThreads = false;
722 int rc = VINF_SUCCESS;
723 switch (enmVar)
724 {
725 case RTREQPOOLCFGVAR_THREAD_TYPE:
726 AssertMsgBreakStmt(uValue > (uint64_t)RTTHREADTYPE_INVALID && uValue < (uint64_t)RTTHREADTYPE_END,
727 ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
728
729 pPool->enmThreadType = (RTTHREADTYPE)uValue;
730 break;
731
732 case RTREQPOOLCFGVAR_MIN_THREADS:
733 AssertMsgBreakStmt(uValue <= RTREQPOOL_MAX_THREADS, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
734 fWakeUpIdleThreads = pPool->cMinThreads > (uint32_t)uValue;
735 pPool->cMinThreads = (uint32_t)uValue;
736 if (pPool->cMinThreads > pPool->cMaxThreads)
737 pPool->cMaxThreads = pPool->cMinThreads;
738 if ( pPool->cThreadsPushBackThreshold < pPool->cMinThreads
739 || pPool->cThreadsPushBackThreshold > pPool->cMaxThreads)
740 pPool->cThreadsPushBackThreshold = pPool->cMinThreads + (pPool->cMaxThreads - pPool->cMinThreads) / 2;
741 rtReqPoolRecalcPushBack(pPool);
742 break;
743
744 case RTREQPOOLCFGVAR_MAX_THREADS:
745 AssertMsgBreakStmt(uValue <= RTREQPOOL_MAX_THREADS && uValue >= 1, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
746 pPool->cMaxThreads = (uint32_t)uValue;
747 if (pPool->cMaxThreads < pPool->cMinThreads)
748 {
749 pPool->cMinThreads = pPool->cMaxThreads;
750 fWakeUpIdleThreads = true;
751 }
752 if (pPool->cMaxThreads < pPool->cThreadsPushBackThreshold)
753 pPool->cThreadsPushBackThreshold = pPool->cMinThreads + (pPool->cMaxThreads - pPool->cMinThreads) / 2;
754 rtReqPoolRecalcPushBack(pPool);
755 break;
756
757 case RTREQPOOLCFGVAR_MS_MIN_IDLE:
758 AssertMsgBreakStmt(uValue < UINT32_MAX || uValue == RT_INDEFINITE_WAIT, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
759 if (uValue < UINT32_MAX && uValue != RT_INDEFINITE_WAIT)
760 {
761 fWakeUpIdleThreads = pPool->cMsMinIdle != (uint32_t)uValue;
762 pPool->cMsMinIdle = (uint32_t)uValue;
763 pPool->cNsMinIdle = pPool->cMsMinIdle * RT_NS_1MS_64;
764 if (pPool->cMsIdleSleep > pPool->cMsMinIdle)
765 pPool->cMsIdleSleep = RT_MAX(RT_MS_1SEC, pPool->cMsMinIdle);
766 }
767 else
768 {
769 pPool->cMsMinIdle = UINT32_MAX;
770 pPool->cNsMinIdle = UINT64_MAX;
771 pPool->cMsIdleSleep = RT_INDEFINITE_WAIT;
772 }
773 break;
774
775 case RTREQPOOLCFGVAR_MS_IDLE_SLEEP:
776 AssertMsgBreakStmt(uValue <= RT_INDEFINITE_WAIT, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
777 fWakeUpIdleThreads = pPool->cMsMinIdle > (RTMSINTERVAL)uValue;
778 pPool->cMsIdleSleep = (RTMSINTERVAL)uValue;
779 if (pPool->cMsIdleSleep == RT_INDEFINITE_WAIT)
780 {
781 pPool->cMsMinIdle = UINT32_MAX;
782 pPool->cNsMinIdle = UINT64_MAX;
783 }
784 break;
785
786 case RTREQPOOLCFGVAR_PUSH_BACK_THRESHOLD:
787 if (uValue == UINT64_MAX)
788 pPool->cThreadsPushBackThreshold = pPool->cMaxThreads;
789 else if (uValue == 0)
790 pPool->cThreadsPushBackThreshold = pPool->cMinThreads;
791 else
792 {
793 AssertMsgBreakStmt(uValue <= pPool->cMaxThreads, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
794 AssertMsgBreakStmt(uValue >= pPool->cMinThreads, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
795 pPool->cThreadsPushBackThreshold = (uint32_t)uValue;
796 }
797 break;
798
799 case RTREQPOOLCFGVAR_PUSH_BACK_MIN_MS:
800 if (uValue == UINT32_MAX || uValue == UINT64_MAX)
801 uValue = RTREQPOOL_PUSH_BACK_MAX_MS;
802 else
803 AssertMsgBreakStmt(uValue <= RTREQPOOL_PUSH_BACK_MAX_MS, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
804 pPool->cMsMinPushBack = (uint32_t)uValue;
805 if (pPool->cMsMaxPushBack < pPool->cMsMinPushBack)
806 pPool->cMsMaxPushBack = pPool->cMsMinPushBack;
807 rtReqPoolRecalcPushBack(pPool);
808 break;
809
810 case RTREQPOOLCFGVAR_PUSH_BACK_MAX_MS:
811 if (uValue == UINT32_MAX || uValue == UINT64_MAX)
812 uValue = RTREQPOOL_PUSH_BACK_MAX_MS;
813 else
814 AssertMsgBreakStmt(uValue <= RTREQPOOL_PUSH_BACK_MAX_MS, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
815 pPool->cMsMaxPushBack = (uint32_t)uValue;
816 if (pPool->cMsMinPushBack < pPool->cMsMaxPushBack)
817 pPool->cMsMinPushBack = pPool->cMsMaxPushBack;
818 rtReqPoolRecalcPushBack(pPool);
819 break;
820
821 case RTREQPOOLCFGVAR_MAX_FREE_REQUESTS:
822 if (uValue == UINT64_MAX)
823 {
824 pPool->cMaxFreeRequests = pPool->cMaxThreads * 2;
825 if (pPool->cMaxFreeRequests < 16)
826 pPool->cMaxFreeRequests = 16;
827 }
828 else
829 {
830 AssertMsgBreakStmt(uValue <= RTREQPOOL_MAX_FREE_REQUESTS, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
831 pPool->cMaxFreeRequests = (uint32_t)uValue;
832 }
833
834 while (pPool->cCurFreeRequests > pPool->cMaxFreeRequests)
835 {
836 PRTREQINT pReq = pPool->pFreeRequests;
837 pPool->pFreeRequests = pReq->pNext;
838 ASMAtomicDecU32(&pPool->cCurFreeRequests);
839 rtReqFreeIt(pReq);
840 }
841 break;
842
843 default:
844 AssertFailed();
845 rc = VERR_IPE_NOT_REACHED_DEFAULT_CASE;
846 }
847
848 /* Wake up all idle threads if required. */
849 if (fWakeUpIdleThreads)
850 {
851 Assert(rc == VINF_SUCCESS);
852 PRTREQPOOLTHREAD pThread;
853 RTListForEach(&pPool->WorkerThreads, pThread, RTREQPOOLTHREAD, ListNode)
854 {
855 RTThreadUserSignal(pThread->hThread);
856 }
857 }
858
859 RTCritSectLeave(&pPool->CritSect);
860
861 return rc;
862}
863RT_EXPORT_SYMBOL(RTReqPoolSetCfgVar);
864
865
866RTDECL(uint64_t) RTReqPoolGetCfgVar(RTREQPOOL hPool, RTREQPOOLCFGVAR enmVar)
867{
868 PRTREQPOOLINT pPool = hPool;
869 AssertPtrReturn(pPool, UINT64_MAX);
870 AssertReturn(pPool->u32Magic == RTREQPOOL_MAGIC, UINT64_MAX);
871 AssertReturn(enmVar > RTREQPOOLCFGVAR_INVALID && enmVar < RTREQPOOLCFGVAR_END, UINT64_MAX);
872
873 RTCritSectEnter(&pPool->CritSect);
874
875 uint64_t u64;
876 switch (enmVar)
877 {
878 case RTREQPOOLCFGVAR_THREAD_TYPE:
879 u64 = pPool->enmThreadType;
880 break;
881
882 case RTREQPOOLCFGVAR_MIN_THREADS:
883 u64 = pPool->cMinThreads;
884 break;
885
886 case RTREQPOOLCFGVAR_MAX_THREADS:
887 u64 = pPool->cMaxThreads;
888 break;
889
890 case RTREQPOOLCFGVAR_MS_MIN_IDLE:
891 u64 = pPool->cMsMinIdle;
892 break;
893
894 case RTREQPOOLCFGVAR_MS_IDLE_SLEEP:
895 u64 = pPool->cMsIdleSleep;
896 break;
897
898 case RTREQPOOLCFGVAR_PUSH_BACK_THRESHOLD:
899 u64 = pPool->cThreadsPushBackThreshold;
900 break;
901
902 case RTREQPOOLCFGVAR_PUSH_BACK_MIN_MS:
903 u64 = pPool->cMsMinPushBack;
904 break;
905
906 case RTREQPOOLCFGVAR_PUSH_BACK_MAX_MS:
907 u64 = pPool->cMsMaxPushBack;
908 break;
909
910 case RTREQPOOLCFGVAR_MAX_FREE_REQUESTS:
911 u64 = pPool->cMaxFreeRequests;
912 break;
913
914 default:
915 AssertFailed();
916 u64 = UINT64_MAX;
917 break;
918 }
919
920 RTCritSectLeave(&pPool->CritSect);
921
922 return u64;
923}
924RT_EXPORT_SYMBOL(RTReqGetQueryCfgVar);
925
926
927RTDECL(uint64_t) RTReqPoolGetStat(RTREQPOOL hPool, RTREQPOOLSTAT enmStat)
928{
929 PRTREQPOOLINT pPool = hPool;
930 AssertPtrReturn(pPool, UINT64_MAX);
931 AssertReturn(pPool->u32Magic == RTREQPOOL_MAGIC, UINT64_MAX);
932 AssertReturn(enmStat > RTREQPOOLSTAT_INVALID && enmStat < RTREQPOOLSTAT_END, UINT64_MAX);
933
934 RTCritSectEnter(&pPool->CritSect);
935
936 uint64_t u64;
937 switch (enmStat)
938 {
939 case RTREQPOOLSTAT_THREADS: u64 = pPool->cCurThreads; break;
940 case RTREQPOOLSTAT_THREADS_CREATED: u64 = pPool->cThreadsCreated; break;
941 case RTREQPOOLSTAT_REQUESTS_PROCESSED: u64 = pPool->cReqProcessed; break;
942 case RTREQPOOLSTAT_REQUESTS_SUBMITTED: u64 = pPool->cReqSubmitted; break;
943 case RTREQPOOLSTAT_REQUESTS_PENDING: u64 = pPool->cCurPendingRequests; break;
944 case RTREQPOOLSTAT_REQUESTS_ACTIVE: u64 = pPool->cCurActiveRequests; break;
945 case RTREQPOOLSTAT_REQUESTS_FREE: u64 = pPool->cCurFreeRequests; break;
946 case RTREQPOOLSTAT_NS_TOTAL_REQ_PROCESSING: u64 = pPool->cNsTotalReqProcessing; break;
947 case RTREQPOOLSTAT_NS_TOTAL_REQ_QUEUED: u64 = pPool->cNsTotalReqQueued; break;
948 case RTREQPOOLSTAT_NS_AVERAGE_REQ_PROCESSING: u64 = pPool->cNsTotalReqProcessing / RT_MAX(pPool->cReqProcessed, 1); break;
949 case RTREQPOOLSTAT_NS_AVERAGE_REQ_QUEUED: u64 = pPool->cNsTotalReqQueued / RT_MAX(pPool->cReqProcessed, 1); break;
950 default:
951 AssertFailed();
952 u64 = UINT64_MAX;
953 break;
954 }
955
956 RTCritSectLeave(&pPool->CritSect);
957
958 return u64;
959}
960RT_EXPORT_SYMBOL(RTReqPoolGetStat);
961
962
963RTDECL(uint32_t) RTReqPoolRetain(RTREQPOOL hPool)
964{
965 PRTREQPOOLINT pPool = hPool;
966 AssertPtrReturn(pPool, UINT32_MAX);
967 AssertReturn(pPool->u32Magic == RTREQPOOL_MAGIC, UINT32_MAX);
968
969 return ASMAtomicIncU32(&pPool->cRefs);
970}
971RT_EXPORT_SYMBOL(RTReqPoolRetain);
972
973
974RTDECL(uint32_t) RTReqPoolRelease(RTREQPOOL hPool)
975{
976 /*
977 * Ignore NULL and validate the request.
978 */
979 if (!hPool)
980 return 0;
981 PRTREQPOOLINT pPool = hPool;
982 AssertPtrReturn(pPool, UINT32_MAX);
983 AssertReturn(pPool->u32Magic == RTREQPOOL_MAGIC, UINT32_MAX);
984
985 /*
986 * Drop a reference, free it when it reaches zero.
987 */
988 uint32_t cRefs = ASMAtomicDecU32(&pPool->cRefs);
989 if (cRefs == 0)
990 {
991 AssertReturn(ASMAtomicCmpXchgU32(&pPool->u32Magic, RTREQPOOL_MAGIC_DEAD, RTREQPOOL_MAGIC), UINT32_MAX);
992
993 RTCritSectEnter(&pPool->CritSect);
994#ifdef RT_STRICT
995 RTTHREAD const hSelf = RTThreadSelf();
996#endif
997
998 /* Indicate to the worker threads that we're shutting down. */
999 ASMAtomicWriteBool(&pPool->fDestructing, true);
1000 PRTREQPOOLTHREAD pThread;
1001 RTListForEach(&pPool->WorkerThreads, pThread, RTREQPOOLTHREAD, ListNode)
1002 {
1003 Assert(pThread->hThread != hSelf);
1004 RTThreadUserSignal(pThread->hThread);
1005 }
1006
1007 /* Cancel pending requests. */
1008 Assert(!pPool->pPendingRequests);
1009 while (pPool->pPendingRequests)
1010 {
1011 PRTREQINT pReq = pPool->pPendingRequests;
1012 pPool->pPendingRequests = pReq->pNext;
1013 rtReqPoolCancelReq(pReq);
1014 }
1015 pPool->ppPendingRequests = NULL;
1016 pPool->cCurPendingRequests = 0;
1017
1018 /* Wait for the workers to shut down. */
1019 while (!RTListIsEmpty(&pPool->WorkerThreads))
1020 {
1021 RTCritSectLeave(&pPool->CritSect);
1022 RTSemEventMultiWait(pPool->hThreadTermEvt, RT_MS_1MIN);
1023 RTCritSectEnter(&pPool->CritSect);
1024 /** @todo should we wait forever here? */
1025 }
1026
1027 /* Free recycled requests. */
1028 for (;;)
1029 {
1030 PRTREQINT pReq = pPool->pFreeRequests;
1031 if (!pReq)
1032 break;
1033 pPool->pFreeRequests = pReq->pNext;
1034 pPool->cCurFreeRequests--;
1035 rtReqFreeIt(pReq);
1036 }
1037
1038 /* Finally, free the critical section and pool instance. */
1039 RTCritSectLeave(&pPool->CritSect);
1040 RTCritSectDelete(&pPool->CritSect);
1041 RTMemFree(pPool);
1042 }
1043
1044 return cRefs;
1045}
1046RT_EXPORT_SYMBOL(RTReqPoolRelease);
1047
1048
1049RTDECL(int) RTReqPoolAlloc(RTREQPOOL hPool, RTREQTYPE enmType, PRTREQ *phReq)
1050{
1051 PRTREQPOOLINT pPool = hPool;
1052 AssertPtrReturn(pPool, VERR_INVALID_HANDLE);
1053 AssertReturn(pPool->u32Magic == RTREQPOOL_MAGIC, VERR_INVALID_HANDLE);
1054
1055 /*
1056 * Try recycle old requests.
1057 */
1058 if (ASMAtomicReadU32(&pPool->cCurFreeRequests) > 0)
1059 {
1060 RTCritSectEnter(&pPool->CritSect);
1061 PRTREQINT pReq = pPool->pFreeRequests;
1062 if (pReq)
1063 {
1064 ASMAtomicDecU32(&pPool->cCurFreeRequests);
1065 pPool->pFreeRequests = pReq->pNext;
1066
1067 RTCritSectLeave(&pPool->CritSect);
1068
1069 Assert(pReq->fPoolOrQueue);
1070 Assert(pReq->uOwner.hPool == pPool);
1071
1072 int rc = rtReqReInit(pReq, enmType);
1073 if (RT_SUCCESS(rc))
1074 {
1075 *phReq = pReq;
1076 LogFlow(("RTReqPoolAlloc: returns VINF_SUCCESS *phReq=%p recycled\n", pReq));
1077 return rc;
1078 }
1079 }
1080 else
1081 RTCritSectLeave(&pPool->CritSect);
1082 }
1083
1084 /*
1085 * Allocate a new request.
1086 */
1087 int rc = rtReqAlloc(enmType, true /*fPoolOrQueue*/, pPool, phReq);
1088 LogFlow(("RTReqPoolAlloc: returns %Rrc *phReq=%p\n", rc, *phReq));
1089 return VINF_SUCCESS;
1090}
1091RT_EXPORT_SYMBOL(RTReqPoolAlloc);
1092
1093
1094RTDECL(int) RTReqPoolCallEx( RTREQPOOL hPool, RTMSINTERVAL cMillies, PRTREQ *phReq, uint32_t fFlags, PFNRT pfnFunction, unsigned cArgs, ...)
1095{
1096 va_list va;
1097 va_start(va, cArgs);
1098 int rc = RTReqPoolCallExV(hPool, cMillies, phReq, fFlags, pfnFunction, cArgs, va);
1099 va_end(va);
1100 return rc;
1101}
1102RT_EXPORT_SYMBOL(RTReqPoolCallEx);
1103
1104
1105RTDECL(int) RTReqPoolCallExV(RTREQPOOL hPool, RTMSINTERVAL cMillies, PRTREQ *phReq, uint32_t fFlags, PFNRT pfnFunction, unsigned cArgs, va_list va)
1106{
1107 /*
1108 * Check input.
1109 */
1110 AssertPtrReturn(pfnFunction, VERR_INVALID_POINTER);
1111 AssertMsgReturn(!((uint32_t)fFlags & ~(uint32_t)(RTREQFLAGS_NO_WAIT | RTREQFLAGS_RETURN_MASK)), ("%#x\n", (uint32_t)fFlags), VERR_INVALID_PARAMETER);
1112 if (!(fFlags & RTREQFLAGS_NO_WAIT))
1113 {
1114 AssertPtrReturn(phReq, VERR_INVALID_POINTER);
1115 *phReq = NIL_RTREQ;
1116 }
1117
1118 PRTREQINT pReq = NULL;
1119 AssertMsgReturn(cArgs * sizeof(uintptr_t) <= sizeof(pReq->u.Internal.aArgs), ("cArgs=%u\n", cArgs), VERR_TOO_MUCH_DATA);
1120
1121 /*
1122 * Allocate and initialize the request.
1123 */
1124 int rc = RTReqPoolAlloc(hPool, RTREQTYPE_INTERNAL, &pReq);
1125 if (RT_FAILURE(rc))
1126 return rc;
1127 pReq->fFlags = fFlags;
1128 pReq->u.Internal.pfn = pfnFunction;
1129 pReq->u.Internal.cArgs = cArgs;
1130 for (unsigned iArg = 0; iArg < cArgs; iArg++)
1131 pReq->u.Internal.aArgs[iArg] = va_arg(va, uintptr_t);
1132
1133 /*
1134 * Submit the request.
1135 */
1136 rc = RTReqSubmit(pReq, cMillies);
1137 if ( rc != VINF_SUCCESS
1138 && rc != VERR_TIMEOUT)
1139 {
1140 Assert(rc != VERR_INTERRUPTED);
1141 RTReqRelease(pReq);
1142 pReq = NULL;
1143 }
1144
1145 if (!(fFlags & RTREQFLAGS_NO_WAIT))
1146 {
1147 *phReq = pReq;
1148 LogFlow(("RTReqPoolCallExV: returns %Rrc *phReq=%p\n", rc, pReq));
1149 }
1150 else
1151 LogFlow(("RTReqPoolCallExV: returns %Rrc\n", rc));
1152 return rc;
1153}
1154RT_EXPORT_SYMBOL(RTReqPoolCallExV);
1155
1156
1157RTDECL(int) RTReqPoolCallWait(RTREQPOOL hPool, PFNRT pfnFunction, unsigned cArgs, ...)
1158{
1159 PRTREQINT pReq;
1160 va_list va;
1161 va_start(va, cArgs);
1162 int rc = RTReqPoolCallExV(hPool, RT_INDEFINITE_WAIT, &pReq, RTREQFLAGS_IPRT_STATUS,
1163 pfnFunction, cArgs, va);
1164 va_end(va);
1165 if (RT_SUCCESS(rc))
1166 rc = pReq->iStatusX;
1167 RTReqRelease(pReq);
1168 return rc;
1169}
1170RT_EXPORT_SYMBOL(RTReqPoolCallWait);
1171
1172
1173RTDECL(int) RTReqPoolCallNoWait(RTREQPOOL hPool, PFNRT pfnFunction, unsigned cArgs, ...)
1174{
1175 va_list va;
1176 va_start(va, cArgs);
1177 int rc = RTReqPoolCallExV(hPool, 0, NULL, RTREQFLAGS_IPRT_STATUS | RTREQFLAGS_NO_WAIT,
1178 pfnFunction, cArgs, va);
1179 va_end(va);
1180 return rc;
1181}
1182RT_EXPORT_SYMBOL(RTReqPoolCallNoWait);
1183
1184
1185RTDECL(int) RTReqPoolCallVoidWait(RTREQPOOL hPool, PFNRT pfnFunction, unsigned cArgs, ...)
1186{
1187 PRTREQINT pReq;
1188 va_list va;
1189 va_start(va, cArgs);
1190 int rc = RTReqPoolCallExV(hPool, RT_INDEFINITE_WAIT, &pReq, RTREQFLAGS_VOID,
1191 pfnFunction, cArgs, va);
1192 va_end(va);
1193 if (RT_SUCCESS(rc))
1194 rc = pReq->iStatusX;
1195 RTReqRelease(pReq);
1196 return rc;
1197}
1198RT_EXPORT_SYMBOL(RTReqPoolCallVoidWait);
1199
1200
1201RTDECL(int) RTReqPoolCallVoidNoWait(RTREQPOOL hPool, PFNRT pfnFunction, unsigned cArgs, ...)
1202{
1203 va_list va;
1204 va_start(va, cArgs);
1205 int rc = RTReqPoolCallExV(hPool, 0, NULL, RTREQFLAGS_VOID | RTREQFLAGS_NO_WAIT,
1206 pfnFunction, cArgs, va);
1207 va_end(va);
1208 return rc;
1209}
1210RT_EXPORT_SYMBOL(RTReqPoolCallVoidNoWait);
1211
Note: See TracBrowser for help on using the repository browser.

© 2024 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette