VirtualBox

source: vbox/trunk/src/VBox/Runtime/common/misc/reqpool.cpp@ 39550

Last change on this file since 39550 was 39550, checked in by vboxsync, 13 years ago

Request thread pool hancking. Some RTReq refactoring as always...

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
File size: 18.8 KB
Line 
1/* $Id: reqpool.cpp 39550 2011-12-07 20:28:23Z vboxsync $ */
2/** @file
3 * IPRT - Request Pool.
4 */
5
6/*
7 * Copyright (C) 2006-2011 Oracle Corporation
8 *
9 * This file is part of VirtualBox Open Source Edition (OSE), as
10 * available from http://www.virtualbox.org. This file is free software;
11 * you can redistribute it and/or modify it under the terms of the GNU
12 * General Public License (GPL) as published by the Free Software
13 * Foundation, in version 2 as it comes in the "COPYING" file of the
14 * VirtualBox OSE distribution. VirtualBox OSE is distributed in the
15 * hope that it will be useful, but WITHOUT ANY WARRANTY of any kind.
16 *
17 * The contents of this file may alternatively be used under the terms
18 * of the Common Development and Distribution License Version 1.0
19 * (CDDL) only, as it comes in the "COPYING.CDDL" file of the
20 * VirtualBox OSE distribution, in which case the provisions of the
21 * CDDL are applicable instead of those of the GPL.
22 *
23 * You may elect to license modified versions of this file under the
24 * terms and conditions of either the GPL or the CDDL or both.
25 */
26
27
28/*******************************************************************************
29* Header Files *
30*******************************************************************************/
31#include <iprt/req.h>
32#include "internal/iprt.h"
33
34#include <iprt/assert.h>
35#include <iprt/asm.h>
36#include <iprt/critsect.h>
37#include <iprt/list.h>
38#include <iprt/log.h>
39#include <iprt/mem.h>
40#include <iprt/string.h>
41#include <iprt/time.h>
42#include <iprt/semaphore.h>
43#include <iprt/thread.h>
44
45#include "internal/req.h"
46#include "internal/magics.h"
47
48
49/*******************************************************************************
50* Structures and Typedefs *
51*******************************************************************************/
52typedef struct RTREQPOOLTHREAD
53{
54 /** Node in the RTREQPOOLINT::IdleThreads list. */
55 RTLISTNODE IdleNode;
56 /** Node in the RTREQPOOLINT::WorkerThreads list. */
57 RTLISTNODE ListNode;
58
59 /** The submit timestamp of the pending request. */
60 uint64_t uPendingNanoTs;
61 /** The submit timestamp of the request processing. */
62 uint64_t uProcessingNanoTs;
63 /** When this CPU went idle the last time. */
64 uint64_t uIdleNanoTs;
65 /** The number of requests processed by this thread. */
66 uint64_t cReqProcessed;
67 /** Total time the requests processed by this thread took to process. */
68 uint64_t cNsTotalReqProcessing;
69 /** Total time the requests processed by this thread had to wait in
70 * the queue before being scheduled. */
71 uint64_t cNsTotalReqQueued;
72 /** The CPU this was scheduled last time we checked. */
73 RTCPUID idLastCpu;
74
75 /** The submitter will put an incoming request here when scheduling an idle
76 * thread. */
77 PRTREQINT volatile pTodoReq;
78 /** The request the thread is currently processing. */
79 PRTREQINT volatile pPendingReq;
80
81 /** The thread handle. */
82 RTTHREAD hThread;
83 /** Nano seconds timestamp representing the birth time of the thread. */
84 uint64_t uBirthNanoTs;
85 /** Pointer to the request thread pool instance the thread is associated
86 * with. */
87 struct RTREQPOOLINT *pPool;
88} RTREQPOOLTHREAD;
89/** Pointer to a worker thread. */
90typedef RTREQPOOLTHREAD *PRTREQPOOLTHREAD;
91
92/**
93 * Request thread pool instance data.
94 */
95typedef struct RTREQPOOLINT
96{
97 /** Magic value (RTREQPOOL_MAGIC). */
98 uint32_t u32Magic;
99
100 /** @name Config
101 * @{ */
102 /** The worker thread type. */
103 RTTHREADTYPE enmThreadType;
104 /** The maximum number of worker threads. */
105 uint32_t cMaxThreads;
106 /** The number of threads which should be spawned before throttling kicks
107 * in. */
108 uint32_t cThreadsThreshold;
109 /** The minimum number of worker threads. */
110 uint32_t cMinThreads;
111 /** The number of milliseconds a thread needs to be idle before it is
112 * considered for retirement. */
113 uint32_t cMsMinIdle;
114 /** The max number of milliseconds to push back a submitter before creating
115 * a new worker thread once the threshold has been reached. */
116 uint32_t cMsMaxPushBack;
117 /** The minimum number of milliseconds to push back a submitter before
118 * creating a new worker thread once the threshold has been reached. */
119 uint32_t cMsMinPushBack;
120 /** The max number of free requests in the recycle LIFO. */
121 uint32_t cMaxFreeRequests;
122 /** @} */
123
124 /** Signaled by terminating worker threads. */
125 RTSEMEVENT hThreadTermEvt;
126
127 /** Destruction indicator. The worker threads checks in their loop. */
128 bool volatile fDestructing;
129
130 /** The current submitter push back in milliseconds.
131 * This is recalculated when worker threads come and go. */
132 uint32_t cMsCurPushBack;
133 /** The current number of worker threads. */
134 uint32_t cCurThreads;
135 /** Statistics: The total number of threads created. */
136 uint32_t cThreadsCreated;
137 /** Statistics: The timestamp when the last thread was created. */
138 uint64_t uLastThreadCreateNanoTs;
139 /** Linked list of worker threads. */
140 RTLISTANCHOR WorkerThreads;
141
142 /** Reference counter. */
143 uint32_t volatile cRefs;
144 /** The number of idle thread or threads in the process of becoming
145 * idle. This is increased before the to-be-idle thread tries to enter
146 * the critical section and add itself to the list. */
147 uint32_t volatile cIdleThreads;
148 /** Linked list of idle threads. */
149 RTLISTANCHOR IdleThreads;
150
151 /** Head of the request FIFO. */
152 PRTREQINT pPendingRequests;
153 /** Where to insert the next request. */
154 PRTREQINT *ppPendingRequests;
155
156 /** Head of the request recycling LIFO. */
157 PRTREQINT pFreeRequests;
158 /** The number of requests in the recycling LIFO. This is read without
159 * entering the critical section, thus volatile. */
160 uint32_t volatile cCurFreeRequests;
161
162 /** Critical section serializing access to members of this structure. */
163 RTCRITSECT CritSect;
164
165} RTREQPOOLINT;
166
167
168static void rtReqPoolRecalcPushBack(PRTREQPOOLINT pPool)
169{
170 uint32_t const cMsRange = pPool->cMsMaxPushBack - pPool->cMsMinPushBack;
171 uint32_t const cSteps = pPool->cMaxThreads - pPool->cThreadsThreshold;
172 uint32_t const iStep = pPool->cCurThreads - pPool->cThreadsThreshold;
173
174 uint32_t cMsCurPushBack;
175 if ((cMsRange >> 2) >= cSteps)
176 cMsCurPushBack = cMsRange / cSteps * iStep;
177 else
178 cMsCurPushBack = (uint32_t)( (uint64_t)cMsRange * RT_NS_1MS / cSteps * iStep / RT_NS_1MS );
179 cMsCurPushBack += pPool->cMsMinPushBack;
180
181 pPool->cMsCurPushBack = cMsCurPushBack;
182}
183
184
185
186static void rtReqPoolThreadProcessRequest(PRTREQPOOLTHREAD pThread, PRTREQINT pReq)
187{
188 /*
189 * Update thread state.
190 */
191 pThread->uProcessingNanoTs = RTTimeNanoTS();
192 pThread->uPendingNanoTs = pReq->uSubmitNanoTs;
193 pThread->pPendingReq = pReq;
194 Assert(pReq->u32Magic == RTREQ_MAGIC);
195
196 /*
197 * Do the actual processing.
198 */
199 /** @todo */
200
201 /*
202 * Update thread statistics and state.
203 */
204 uint64_t const uNsTsEnd = RTTimeNanoTS();
205 pThread->cNsTotalReqProcessing += uNsTsEnd - pThread->uProcessingNanoTs;
206 pThread->cNsTotalReqQueued += uNsTsEnd - pThread->uPendingNanoTs;
207 pThread->cReqProcessed++;
208}
209
210
211
212static DECLCALLBACK(int) rtReqPoolThreadProc(RTTHREAD hThreadSelf, void *pvArg)
213{
214 PRTREQPOOLTHREAD pThread = (PRTREQPOOLTHREAD)pvArg;
215 PRTREQPOOLINT pPool = pThread->pPool;
216
217/** @todo rework this... */
218
219 /*
220 * The work loop.
221 */
222 uint64_t cPrevReqProcessed = 0;
223 while (pPool->fDestructing)
224 {
225 /*
226 * Pending work?
227 */
228 PRTREQINT pReq = ASMAtomicXchgPtrT(&pThread->pTodoReq, NULL, PRTREQINT);
229 if (pReq)
230 rtReqPoolThreadProcessRequest(pThread, pReq);
231 else
232 {
233 ASMAtomicIncU32(&pPool->cIdleThreads);
234 RTCritSectEnter(&pPool->CritSect);
235
236 /* Recheck the todo request pointer after entering the critsect. */
237 pReq = ASMAtomicXchgPtrT(&pThread->pTodoReq, NULL, PRTREQINT);
238 if (!pReq)
239 {
240 /* Any pending requests in the queue? */
241 pReq = pPool->pPendingRequests;
242 if (pReq)
243 {
244 pPool->pPendingRequests = pReq->pNext;
245 if (pReq->pNext == NULL)
246 pPool->ppPendingRequests = &pPool->pPendingRequests;
247 }
248 }
249
250 if (pReq)
251 {
252 /*
253 * Un-idle ourselves and process the request.
254 */
255 if (!RTListIsEmpty(&pThread->IdleNode))
256 {
257 RTListNodeRemove(&pThread->IdleNode);
258 RTListInit(&pThread->IdleNode);
259 }
260 ASMAtomicDecU32(&pPool->cIdleThreads);
261 RTCritSectLeave(&pPool->CritSect);
262
263 rtReqPoolThreadProcessRequest(pThread, pReq);
264 }
265 else
266 {
267 /*
268 * Nothing to do, go idle.
269 */
270 if (cPrevReqProcessed != pThread->cReqProcessed)
271 {
272 pThread->cReqProcessed = cPrevReqProcessed;
273 pThread->uIdleNanoTs = RTTimeNanoTS();
274 }
275
276 if (RTListIsEmpty(&pThread->IdleNode))
277 RTListPrepend(&pPool->IdleThreads, &pThread->IdleNode);
278 RTThreadUserReset(hThreadSelf);
279
280 RTCritSectLeave(&pPool->CritSect);
281
282 RTThreadUserWait(hThreadSelf, 0);
283
284
285
286 }
287 }
288 }
289
290 /*
291 * Clean up on the way out.
292 */
293 RTCritSectEnter(&pPool->CritSect);
294
295 /** @todo .... */
296
297 rtReqPoolRecalcPushBack(pPool);
298
299 RTCritSectLeave(&pPool->CritSect);
300
301 return VINF_SUCCESS;
302}
303
304
305/**
306 * Create a new worker thread.
307 *
308 * @param pPool The pool needing new worker thread.
309 * @remarks Caller owns the critical section
310 */
311static void rtReqPoolCreateNewWorker(RTREQPOOL pPool)
312{
313 PRTREQPOOLTHREAD pThread = (PRTREQPOOLTHREAD)RTMemAllocZ(sizeof(RTREQPOOLTHREAD));
314 if (!pThread)
315 return;
316
317 pThread->uBirthNanoTs = RTTimeNanoTS();
318 pThread->pPool = pPool;
319 pThread->idLastCpu = NIL_RTCPUID;
320 pThread->hThread = NIL_RTTHREAD;
321 RTListInit(&pThread->IdleNode);
322 RTListAppend(&pPool->WorkerThreads, &pThread->ListNode);
323 pPool->cCurThreads++;
324 pPool->cThreadsCreated++;
325
326 static uint32_t s_idThread = 0;
327 int rc = RTThreadCreateF(&pThread->hThread, rtReqPoolThreadProc, pThread, 0 /*default stack size*/,
328 pPool->enmThreadType, 0 /*fFlags*/, "REQPT%02u", ++s_idThread);
329 if (RT_SUCCESS(rc))
330 pPool->uLastThreadCreateNanoTs = pThread->uBirthNanoTs;
331 else
332 {
333 pPool->cCurThreads--;
334 RTListNodeRemove(&pThread->ListNode);
335 RTMemFree(pThread);
336 }
337}
338
339
340/**
341 * Repel the submitter, giving the worker threads a chance to process the
342 * incoming request.
343 *
344 * @returns Success if a worker picked up the request, failure if not. The
345 * critical section has been left on success, while we'll be inside it
346 * on failure.
347 * @param pPool The pool.
348 * @param pReq The incoming request.
349 */
350static int rtReqPoolPushBack(PRTREQPOOLINT pPool, PRTREQINT pReq)
351{
352 /*
353 * Lazily create the push back semaphore that we'll be blociing on.
354 */
355 int rc;
356 RTSEMEVENTMULTI hEvt = pReq->hPushBackEvt;
357 if (hEvt == NIL_RTSEMEVENTMULTI)
358 {
359 rc = RTSemEventMultiCreate(&hEvt);
360 if (RT_FAILURE(rc))
361 return rc;
362 pReq->hPushBackEvt = hEvt;
363 }
364
365 /*
366 * Prepare the request and semaphore.
367 */
368 uint32_t const cMsTimeout = pPool->cMsCurPushBack;
369 pReq->fSignalPushBack = true;
370 RTReqRetain(pReq);
371 RTSemEventMultiReset(hEvt);
372
373 RTCritSectLeave(&pPool->CritSect);
374
375 /*
376 * Block.
377 */
378 rc = RTSemEventMultiWait(hEvt, cMsTimeout);
379 if (RT_FAILURE(rc))
380 {
381 AssertMsg(rc == VERR_TIMEOUT, ("%Rrc\n", rc));
382 RTCritSectEnter(&pPool->CritSect);
383 }
384 RTReqRelease(pReq);
385 return rc;
386}
387
388
389
390DECLHIDDEN(void) rtReqPoolSubmit(PRTREQPOOLINT pPool, PRTREQINT pReq)
391{
392 RTCritSectEnter(&pPool->CritSect);
393
394 /*
395 * Try schedule the request to a thread that's currently idle.
396 */
397 PRTREQPOOLTHREAD pThread = RTListGetFirst(&pPool->IdleThreads, RTREQPOOLTHREAD, IdleNode);
398 if (pThread)
399 {
400 /** @todo CPU affinity... */
401 ASMAtomicWritePtr(&pThread->pTodoReq, pReq);
402
403 RTListNodeRemove(&pThread->IdleNode);
404 RTListInit(&pThread->IdleNode);
405 ASMAtomicDecU32(&pPool->cIdleThreads);
406
407 RTThreadUserSignal(pThread->hThread);
408
409 RTCritSectLeave(&pPool->CritSect);
410 return;
411 }
412 Assert(RTListIsEmpty(&pPool->IdleThreads));
413
414 /*
415 * Put the request in the pending queue.
416 */
417 pReq->pNext = NULL;
418 *pPool->ppPendingRequests = pReq;
419 pPool->ppPendingRequests = (PRTREQINT*)&pReq->pNext;
420
421 /*
422 * If there is an incoming worker thread already or we've reached the
423 * maximum number of worker threads, we're done.
424 */
425 if ( pPool->cIdleThreads > 0
426 || pPool->cCurThreads >= pPool->cMaxThreads)
427 {
428 RTCritSectLeave(&pPool->CritSect);
429 return;
430 }
431
432 /*
433 * Push back before creating a new worker thread.
434 */
435 if ( pPool->cCurThreads > pPool->cThreadsThreshold
436 && (RTTimeNanoTS() - pReq->uSubmitNanoTs) / RT_NS_1MS >= pPool->cMsCurPushBack )
437 {
438 int rc = rtReqPoolPushBack(pPool, pReq);
439 if (RT_SUCCESS(rc))
440 return;
441 }
442
443 /*
444 * Create a new thread for processing the request.
445 * For simplicity, we don't bother leaving the critical section while doing so.
446 */
447 rtReqPoolCreateNewWorker(pPool);
448
449 RTCritSectLeave(&pPool->CritSect);
450 return;
451}
452
453
454/**
455 * Frees a requst.
456 *
457 * @returns true if recycled, false if not.
458 * @param pPool The request thread pool.
459 * @param pReq The request.
460 */
461DECLHIDDEN(bool) rtReqPoolRecycle(PRTREQPOOLINT pPool, PRTREQINT pReq)
462{
463 if ( pPool
464 && ASMAtomicReadU32(&pPool->cCurFreeRequests) < pPool->cMaxFreeRequests)
465 {
466 RTCritSectEnter(&pPool->CritSect);
467 if (pPool->cCurFreeRequests < pPool->cMaxFreeRequests)
468 {
469 pReq->pNext = pPool->pFreeRequests;
470 pPool->pFreeRequests = pReq;
471 ASMAtomicIncU32(&pPool->cCurFreeRequests);
472
473 RTCritSectLeave(&pPool->CritSect);
474 return true;
475 }
476
477 RTCritSectLeave(&pPool->CritSect);
478 }
479 return false;
480}
481
482
483RTDECL(uint32_t) RTReqPoolRetain(RTREQPOOL hPool)
484{
485 PRTREQPOOLINT pPool = hPool;
486 AssertPtrReturn(pPool, UINT32_MAX);
487 AssertReturn(pPool->u32Magic == RTREQPOOL_MAGIC, UINT32_MAX);
488
489 return ASMAtomicIncU32(&pPool->cRefs);
490}
491RT_EXPORT_SYMBOL(RTReqPoolRetain);
492
493
494RTDECL(uint32_t) RTReqPoolRelease(RTREQPOOL hPool)
495{
496 /*
497 * Ignore NULL and validate the request.
498 */
499 if (!hPool)
500 return 0;
501 PRTREQPOOLINT pPool = hPool;
502 AssertPtrReturn(pPool, UINT32_MAX);
503 AssertReturn(pPool->u32Magic == RTREQPOOL_MAGIC, UINT32_MAX);
504
505 /*
506 * Drop a reference, free it when it reaches zero.
507 */
508 uint32_t cRefs = ASMAtomicDecU32(&pPool->cRefs);
509 if (cRefs == 0)
510 {
511 AssertReturn(ASMAtomicCmpXchgU32(&pPool->u32Magic, RTREQPOOL_MAGIC_DEAD, RTREQPOOL_MAGIC), UINT32_MAX);
512
513 RTCritSectEnter(&pPool->CritSect);
514
515 /* Indicate to the worker threads that we're shutting down. */
516 ASMAtomicWriteBool(&pPool->fDestructing, true);
517 PRTREQPOOLTHREAD pThread;
518 RTListForEach(&pPool->WorkerThreads, pThread, RTREQPOOLTHREAD, ListNode)
519 {
520 RTThreadUserSignal(pThread->hThread);
521 }
522
523 /* Cancel pending requests. */
524 Assert(!pPool->pPendingRequests);
525 while (pPool->pPendingRequests)
526 {
527 PRTREQINT pReq = pPool->pPendingRequests;
528 pPool->pPendingRequests = pReq->pNext;
529
530 pReq->enmState = RTREQSTATE_COMPLETED;
531 ASMAtomicWriteS32(&pReq->iStatusX, VERR_CANCELLED);
532 if (pReq->hPushBackEvt != NIL_RTSEMEVENTMULTI)
533 RTSemEventMultiSignal(pReq->hPushBackEvt);
534 RTSemEventSignal(pReq->EventSem);
535
536 pReq->uOwner.hPool = NULL;
537 RTReqRelease(pReq);
538 }
539 pPool->ppPendingRequests = NULL;
540
541 /* Wait for the workers to shut down. */
542 while (!RTListIsEmpty(&pPool->WorkerThreads))
543 {
544 RTCritSectLeave(&pPool->CritSect);
545 RTSemEventWait(pPool->hThreadTermEvt, RT_MS_1MIN);
546 RTCritSectEnter(&pPool->CritSect);
547 /** @todo should we wait forever here? */
548 }
549
550 /* Free recycled requests. */
551 for (;;)
552 {
553 PRTREQINT pReq = pPool->pFreeRequests;
554 if (!pReq)
555 break;
556 pPool->pFreeRequests = pReq->pNext;
557 pPool->cCurFreeRequests--;
558 rtReqFreeIt(pReq);
559 }
560
561 /* Finally, free the handle. */
562 RTMemFree(pPool);
563 }
564
565 return cRefs;
566}
567RT_EXPORT_SYMBOL(RTReqPoolRelease);
568
569
570RTDECL(int) RTReqPoolAlloc(RTREQPOOL hPool, RTREQTYPE enmType, PRTREQ *phReq)
571{
572 PRTREQPOOLINT pPool = hPool;
573 AssertPtrReturn(pPool, VERR_INVALID_HANDLE);
574 AssertReturn(pPool->u32Magic == RTREQPOOL_MAGIC, VERR_INVALID_HANDLE);
575
576 /*
577 * Try recycle old requests.
578 */
579 if (ASMAtomicReadU32(&pPool->cCurFreeRequests) > 0)
580 {
581 RTCritSectEnter(&pPool->CritSect);
582 PRTREQINT pReq = pPool->pFreeRequests;
583 if (pReq)
584 {
585 ASMAtomicDecU32(&pPool->cCurFreeRequests);
586 pPool->pFreeRequests = pReq->pNext;
587
588 RTCritSectLeave(&pPool->CritSect);
589
590 Assert(pReq->fPoolOrQueue);
591 Assert(pReq->uOwner.hPool == pPool);
592
593 int rc = rtReqReInit(pReq, enmType);
594 if (RT_SUCCESS(rc))
595 {
596 *phReq = pReq;
597 LogFlow(("RTReqPoolAlloc: returns VINF_SUCCESS *phReq=%p recycled\n", pReq));
598 return rc;
599 }
600 }
601 else
602 RTCritSectLeave(&pPool->CritSect);
603 }
604
605 /*
606 * Allocate a new request.
607 */
608 int rc = rtReqAlloc(enmType, true /*fPoolOrQueue*/, pPool, phReq);
609 LogFlow(("RTReqPoolAlloc: returns %Rrc *phReq=%p\n", rc, *phReq));
610 return VINF_SUCCESS;
611}
612RT_EXPORT_SYMBOL(RTReqPoolAlloc);
613
Note: See TracBrowser for help on using the repository browser.

© 2024 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette