VirtualBox

source: vbox/trunk/src/VBox/Runtime/common/misc/reqqueue.cpp@ 60963

Last change on this file since 60963 was 60124, checked in by vboxsync, 9 years ago

RTReqQueueProcess: Addressed todo regarding lost requests and document behavior more accuratly.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
File size: 13.2 KB
Line 
1/* $Id: reqqueue.cpp 60124 2016-03-21 14:41:00Z vboxsync $ */
2/** @file
3 * IPRT - Request Queue.
4 */
5
6/*
7 * Copyright (C) 2006-2015 Oracle Corporation
8 *
9 * This file is part of VirtualBox Open Source Edition (OSE), as
10 * available from http://www.virtualbox.org. This file is free software;
11 * you can redistribute it and/or modify it under the terms of the GNU
12 * General Public License (GPL) as published by the Free Software
13 * Foundation, in version 2 as it comes in the "COPYING" file of the
14 * VirtualBox OSE distribution. VirtualBox OSE is distributed in the
15 * hope that it will be useful, but WITHOUT ANY WARRANTY of any kind.
16 *
17 * The contents of this file may alternatively be used under the terms
18 * of the Common Development and Distribution License Version 1.0
19 * (CDDL) only, as it comes in the "COPYING.CDDL" file of the
20 * VirtualBox OSE distribution, in which case the provisions of the
21 * CDDL are applicable instead of those of the GPL.
22 *
23 * You may elect to license modified versions of this file under the
24 * terms and conditions of either the GPL or the CDDL or both.
25 */
26
27
28/*********************************************************************************************************************************
29* Header Files *
30*********************************************************************************************************************************/
31#include <iprt/req.h>
32#include "internal/iprt.h"
33
34#include <iprt/assert.h>
35#include <iprt/asm.h>
36#include <iprt/string.h>
37#include <iprt/time.h>
38#include <iprt/semaphore.h>
39#include <iprt/thread.h>
40#include <iprt/log.h>
41#include <iprt/mem.h>
42
43#include "internal/req.h"
44#include "internal/magics.h"
45
46
47
48RTDECL(int) RTReqQueueCreate(RTREQQUEUE *phQueue)
49{
50 PRTREQQUEUEINT pQueue = (PRTREQQUEUEINT)RTMemAllocZ(sizeof(RTREQQUEUEINT));
51 if (!pQueue)
52 return VERR_NO_MEMORY;
53 int rc = RTSemEventCreate(&pQueue->EventSem);
54 if (RT_SUCCESS(rc))
55 {
56 pQueue->u32Magic = RTREQQUEUE_MAGIC;
57
58 *phQueue = pQueue;
59 return VINF_SUCCESS;
60 }
61
62 RTMemFree(pQueue);
63 return rc;
64}
65RT_EXPORT_SYMBOL(RTReqQueueCreate);
66
67
68RTDECL(int) RTReqQueueDestroy(RTREQQUEUE hQueue)
69{
70 /*
71 * Check input.
72 */
73 if (hQueue == NIL_RTREQQUEUE)
74 return VINF_SUCCESS;
75 PRTREQQUEUEINT pQueue = hQueue;
76 AssertPtrReturn(pQueue, VERR_INVALID_HANDLE);
77 AssertReturn(ASMAtomicCmpXchgU32(&pQueue->u32Magic, RTREQQUEUE_MAGIC_DEAD, RTREQQUEUE_MAGIC), VERR_INVALID_HANDLE);
78
79 RTSemEventDestroy(pQueue->EventSem);
80 pQueue->EventSem = NIL_RTSEMEVENT;
81
82 for (unsigned i = 0; i < RT_ELEMENTS(pQueue->apReqFree); i++)
83 {
84 PRTREQ pReq = (PRTREQ)ASMAtomicXchgPtr((void **)&pQueue->apReqFree[i], NULL);
85 while (pReq)
86 {
87 PRTREQ pNext = pReq->pNext;
88 rtReqFreeIt(pReq);
89 pReq = pNext;
90 }
91 }
92
93 RTMemFree(pQueue);
94 return VINF_SUCCESS;
95}
96RT_EXPORT_SYMBOL(RTReqQueueDestroy);
97
98
99RTDECL(int) RTReqQueueProcess(RTREQQUEUE hQueue, RTMSINTERVAL cMillies)
100{
101 LogFlow(("RTReqQueueProcess %x\n", hQueue));
102
103 /*
104 * Check input.
105 */
106 PRTREQQUEUEINT pQueue = hQueue;
107 AssertPtrReturn(pQueue, VERR_INVALID_HANDLE);
108 AssertReturn(pQueue->u32Magic == RTREQQUEUE_MAGIC, VERR_INVALID_HANDLE);
109
110 /*
111 * Process loop. Stop (break) after the first non-VINF_SUCCESS status code.
112 */
113 int rc = VINF_SUCCESS;
114 for (;;)
115 {
116 /*
117 * Get pending requests.
118 */
119 PRTREQ pReqs = ASMAtomicXchgPtrT(&pQueue->pAlreadyPendingReqs, NULL, PRTREQ);
120 if (RT_LIKELY(!pReqs))
121 {
122 pReqs = ASMAtomicXchgPtrT(&pQueue->pReqs, NULL, PRTREQ);
123 if (!pReqs)
124 {
125 /* We do not adjust cMillies (documented behavior). */
126 ASMAtomicWriteBool(&pQueue->fBusy, false); /* this aint 100% perfect, but it's good enough for now... */
127 rc = RTSemEventWait(pQueue->EventSem, cMillies);
128 if (rc != VINF_SUCCESS)
129 break;
130 continue;
131 }
132
133 ASMAtomicWriteBool(&pQueue->fBusy, true);
134
135 /*
136 * Reverse the list to process it in FIFO order.
137 */
138 PRTREQ pReq = pReqs;
139 if (pReq->pNext)
140 Log2(("RTReqQueueProcess: 2+ requests: %p %p %p\n", pReq, pReq->pNext, pReq->pNext->pNext));
141 pReqs = NULL;
142 while (pReq)
143 {
144 Assert(pReq->enmState == RTREQSTATE_QUEUED);
145 Assert(pReq->uOwner.hQueue == pQueue);
146 PRTREQ pCur = pReq;
147 pReq = pReq->pNext;
148 pCur->pNext = pReqs;
149 pReqs = pCur;
150 }
151
152 }
153 else
154 ASMAtomicWriteBool(&pQueue->fBusy, true);
155
156 /*
157 * Process the requests.
158 */
159 while (pReqs)
160 {
161 /* Unchain the first request and advance the list. */
162 PRTREQ pReq = pReqs;
163 pReqs = pReqs->pNext;
164 pReq->pNext = NULL;
165
166 /* Process the request. */
167 rc = rtReqProcessOne(pReq);
168 AssertRC(rc);
169 if (rc != VINF_SUCCESS)
170 {
171 /* Propagate the return code to caller. If more requests pending, queue them for later. */
172 if (pReqs)
173 {
174 pReqs = ASMAtomicXchgPtrT(&pQueue->pAlreadyPendingReqs, pReqs, PRTREQ);
175 Assert(!pReqs);
176 }
177 break;
178 }
179 }
180 if (rc != VINF_SUCCESS)
181 break;
182 }
183
184 LogFlow(("RTReqQueueProcess: returns %Rrc\n", rc));
185 return rc;
186}
187RT_EXPORT_SYMBOL(RTReqQueueProcess);
188
189
190RTDECL(int) RTReqQueueCall(RTREQQUEUE hQueue, PRTREQ *ppReq, RTMSINTERVAL cMillies, PFNRT pfnFunction, unsigned cArgs, ...)
191{
192 va_list va;
193 va_start(va, cArgs);
194 int rc = RTReqQueueCallV(hQueue, ppReq, cMillies, RTREQFLAGS_IPRT_STATUS, pfnFunction, cArgs, va);
195 va_end(va);
196 return rc;
197}
198RT_EXPORT_SYMBOL(RTReqQueueCall);
199
200
201RTDECL(int) RTReqQueueCallVoid(RTREQQUEUE hQueue, PRTREQ *ppReq, RTMSINTERVAL cMillies, PFNRT pfnFunction, unsigned cArgs, ...)
202{
203 va_list va;
204 va_start(va, cArgs);
205 int rc = RTReqQueueCallV(hQueue, ppReq, cMillies, RTREQFLAGS_VOID, pfnFunction, cArgs, va);
206 va_end(va);
207 return rc;
208}
209RT_EXPORT_SYMBOL(RTReqQueueCallVoid);
210
211
212RTDECL(int) RTReqQueueCallEx(RTREQQUEUE hQueue, PRTREQ *ppReq, RTMSINTERVAL cMillies, unsigned fFlags, PFNRT pfnFunction, unsigned cArgs, ...)
213{
214 va_list va;
215 va_start(va, cArgs);
216 int rc = RTReqQueueCallV(hQueue, ppReq, cMillies, fFlags, pfnFunction, cArgs, va);
217 va_end(va);
218 return rc;
219}
220RT_EXPORT_SYMBOL(RTReqQueueCallEx);
221
222
223RTDECL(int) RTReqQueueCallV(RTREQQUEUE hQueue, PRTREQ *ppReq, RTMSINTERVAL cMillies, unsigned fFlags, PFNRT pfnFunction, unsigned cArgs, va_list Args)
224{
225 LogFlow(("RTReqQueueCallV: cMillies=%d fFlags=%#x pfnFunction=%p cArgs=%d\n", cMillies, fFlags, pfnFunction, cArgs));
226
227 /*
228 * Check input.
229 */
230 PRTREQQUEUEINT pQueue = hQueue;
231 AssertPtrReturn(pQueue, VERR_INVALID_HANDLE);
232 AssertReturn(pQueue->u32Magic == RTREQQUEUE_MAGIC, VERR_INVALID_HANDLE);
233 AssertPtrReturn(pfnFunction, VERR_INVALID_POINTER);
234 AssertReturn(!(fFlags & ~(RTREQFLAGS_RETURN_MASK | RTREQFLAGS_NO_WAIT)), VERR_INVALID_PARAMETER);
235
236 if (!(fFlags & RTREQFLAGS_NO_WAIT) || ppReq)
237 {
238 AssertPtrReturn(ppReq, VERR_INVALID_POINTER);
239 *ppReq = NULL;
240 }
241
242 PRTREQ pReq = NULL;
243 AssertMsgReturn(cArgs * sizeof(uintptr_t) <= sizeof(pReq->u.Internal.aArgs), ("cArgs=%u\n", cArgs), VERR_TOO_MUCH_DATA);
244
245 /*
246 * Allocate request
247 */
248 int rc = RTReqQueueAlloc(pQueue, RTREQTYPE_INTERNAL, &pReq);
249 if (rc != VINF_SUCCESS)
250 return rc;
251
252 /*
253 * Initialize the request data.
254 */
255 pReq->fFlags = fFlags;
256 pReq->u.Internal.pfn = pfnFunction;
257 pReq->u.Internal.cArgs = cArgs;
258 for (unsigned iArg = 0; iArg < cArgs; iArg++)
259 pReq->u.Internal.aArgs[iArg] = va_arg(Args, uintptr_t);
260
261 /*
262 * Queue the request and return.
263 */
264 rc = RTReqSubmit(pReq, cMillies);
265 if ( rc != VINF_SUCCESS
266 && rc != VERR_TIMEOUT)
267 {
268 RTReqRelease(pReq);
269 pReq = NULL;
270 }
271 if (!(fFlags & RTREQFLAGS_NO_WAIT))
272 {
273 *ppReq = pReq;
274 LogFlow(("RTReqQueueCallV: returns %Rrc *ppReq=%p\n", rc, pReq));
275 }
276 else
277 LogFlow(("RTReqQueueCallV: returns %Rrc\n", rc));
278 Assert(rc != VERR_INTERRUPTED);
279 return rc;
280}
281RT_EXPORT_SYMBOL(RTReqQueueCallV);
282
283
284RTDECL(bool) RTReqQueueIsBusy(RTREQQUEUE hQueue)
285{
286 PRTREQQUEUEINT pQueue = hQueue;
287 AssertPtrReturn(pQueue, false);
288
289 if (ASMAtomicReadBool(&pQueue->fBusy))
290 return true;
291 if (ASMAtomicReadPtrT(&pQueue->pReqs, PRTREQ) != NULL)
292 return true;
293 if (ASMAtomicReadBool(&pQueue->fBusy))
294 return true;
295 return false;
296}
297RT_EXPORT_SYMBOL(RTReqQueueIsBusy);
298
299
300/**
301 * Joins the list pList with whatever is linked up at *pHead.
302 */
303static void vmr3ReqJoinFreeSub(volatile PRTREQ *ppHead, PRTREQ pList)
304{
305 for (unsigned cIterations = 0;; cIterations++)
306 {
307 PRTREQ pHead = ASMAtomicXchgPtrT(ppHead, pList, PRTREQ);
308 if (!pHead)
309 return;
310 PRTREQ pTail = pHead;
311 while (pTail->pNext)
312 pTail = pTail->pNext;
313 pTail->pNext = pList;
314 if (ASMAtomicCmpXchgPtr(ppHead, pHead, pList))
315 return;
316 pTail->pNext = NULL;
317 if (ASMAtomicCmpXchgPtr(ppHead, pHead, NULL))
318 return;
319 pList = pHead;
320 Assert(cIterations != 32);
321 Assert(cIterations != 64);
322 }
323}
324
325
326/**
327 * Joins the list pList with whatever is linked up at *pHead.
328 */
329static void vmr3ReqJoinFree(PRTREQQUEUEINT pQueue, PRTREQ pList)
330{
331 /*
332 * Split the list if it's too long.
333 */
334 unsigned cReqs = 1;
335 PRTREQ pTail = pList;
336 while (pTail->pNext)
337 {
338 if (cReqs++ > 25)
339 {
340 const uint32_t i = pQueue->iReqFree;
341 vmr3ReqJoinFreeSub(&pQueue->apReqFree[(i + 2) % RT_ELEMENTS(pQueue->apReqFree)], pTail->pNext);
342
343 pTail->pNext = NULL;
344 vmr3ReqJoinFreeSub(&pQueue->apReqFree[(i + 2 + (i == pQueue->iReqFree)) % RT_ELEMENTS(pQueue->apReqFree)], pTail->pNext);
345 return;
346 }
347 pTail = pTail->pNext;
348 }
349 vmr3ReqJoinFreeSub(&pQueue->apReqFree[(pQueue->iReqFree + 2) % RT_ELEMENTS(pQueue->apReqFree)], pList);
350}
351
352
353RTDECL(int) RTReqQueueAlloc(RTREQQUEUE hQueue, RTREQTYPE enmType, PRTREQ *phReq)
354{
355 /*
356 * Validate input.
357 */
358 PRTREQQUEUEINT pQueue = hQueue;
359 AssertPtrReturn(pQueue, VERR_INVALID_HANDLE);
360 AssertReturn(pQueue->u32Magic == RTREQQUEUE_MAGIC, VERR_INVALID_HANDLE);
361 AssertMsgReturn(enmType > RTREQTYPE_INVALID && enmType < RTREQTYPE_MAX, ("%d\n", enmType), VERR_RT_REQUEST_INVALID_TYPE);
362
363 /*
364 * Try get a recycled packet.
365 *
366 * While this could all be solved with a single list with a lock, it's a sport
367 * of mine to avoid locks.
368 */
369 int cTries = RT_ELEMENTS(pQueue->apReqFree) * 2;
370 while (--cTries >= 0)
371 {
372 PRTREQ volatile *ppHead = &pQueue->apReqFree[ASMAtomicIncU32(&pQueue->iReqFree) % RT_ELEMENTS(pQueue->apReqFree)];
373 PRTREQ pReq = ASMAtomicXchgPtrT(ppHead, NULL, PRTREQ);
374 if (pReq)
375 {
376 PRTREQ pNext = pReq->pNext;
377 if ( pNext
378 && !ASMAtomicCmpXchgPtr(ppHead, pNext, NULL))
379 vmr3ReqJoinFree(pQueue, pReq->pNext);
380 ASMAtomicDecU32(&pQueue->cReqFree);
381
382 Assert(pReq->uOwner.hQueue == pQueue);
383 Assert(!pReq->fPoolOrQueue);
384
385 int rc = rtReqReInit(pReq, enmType);
386 if (RT_SUCCESS(rc))
387 {
388 *phReq = pReq;
389 LogFlow(("RTReqQueueAlloc: returns VINF_SUCCESS *phReq=%p recycled\n", pReq));
390 return VINF_SUCCESS;
391 }
392 }
393 }
394
395 /*
396 * Ok, allocate a new one.
397 */
398 int rc = rtReqAlloc(enmType, false /*fPoolOrQueue*/, pQueue, phReq);
399 LogFlow(("RTReqQueueAlloc: returns %Rrc *phReq=%p\n", rc, *phReq));
400 return rc;
401}
402RT_EXPORT_SYMBOL(RTReqQueueAlloc);
403
404
405/**
406 * Recycles a requst.
407 *
408 * @returns true if recycled, false if it should be freed.
409 * @param pQueue The queue.
410 * @param pReq The request.
411 */
412DECLHIDDEN(bool) rtReqQueueRecycle(PRTREQQUEUEINT pQueue, PRTREQINT pReq)
413{
414 if ( !pQueue
415 || pQueue->cReqFree >= 128)
416 return false;
417
418 ASMAtomicIncU32(&pQueue->cReqFree);
419 PRTREQ volatile *ppHead = &pQueue->apReqFree[ASMAtomicIncU32(&pQueue->iReqFree) % RT_ELEMENTS(pQueue->apReqFree)];
420 PRTREQ pNext;
421 do
422 {
423 pNext = *ppHead;
424 ASMAtomicWritePtr(&pReq->pNext, pNext);
425 } while (!ASMAtomicCmpXchgPtr(ppHead, pReq, pNext));
426
427 return true;
428}
429
430
431/**
432 * Submits a request to the queue.
433 *
434 * @param pQueue The queue.
435 * @param pReq The request.
436 */
437DECLHIDDEN(void) rtReqQueueSubmit(PRTREQQUEUEINT pQueue, PRTREQINT pReq)
438{
439 PRTREQ pNext;
440 do
441 {
442 pNext = pQueue->pReqs;
443 pReq->pNext = pNext;
444 ASMAtomicWriteBool(&pQueue->fBusy, true);
445 } while (!ASMAtomicCmpXchgPtr(&pQueue->pReqs, pReq, pNext));
446
447 /*
448 * Notify queue thread.
449 */
450 RTSemEventSignal(pQueue->EventSem);
451}
452
Note: See TracBrowser for help on using the repository browser.

© 2024 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette