VirtualBox

source: vbox/trunk/src/VBox/Runtime/common/misc/aiomgr.cpp@ 78052

Last change on this file since 78052 was 77237, checked in by vboxsync, 6 years ago

IPRT/aiomgr.cpp: Use RTListForEachSafe in rtAioMgrCheckFiles() to be on the safe side. Saw crashes here when playing around with the storage tests.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
File size: 42.1 KB
Line 
1/* $Id: aiomgr.cpp 77237 2019-02-09 18:31:23Z vboxsync $ */
2/** @file
3 * IPRT - Async I/O manager.
4 */
5
6/*
7 * Copyright (C) 2013-2019 Oracle Corporation
8 *
9 * This file is part of VirtualBox Open Source Edition (OSE), as
10 * available from http://www.virtualbox.org. This file is free software;
11 * you can redistribute it and/or modify it under the terms of the GNU
12 * General Public License (GPL) as published by the Free Software
13 * Foundation, in version 2 as it comes in the "COPYING" file of the
14 * VirtualBox OSE distribution. VirtualBox OSE is distributed in the
15 * hope that it will be useful, but WITHOUT ANY WARRANTY of any kind.
16 *
17 * The contents of this file may alternatively be used under the terms
18 * of the Common Development and Distribution License Version 1.0
19 * (CDDL) only, as it comes in the "COPYING.CDDL" file of the
20 * VirtualBox OSE distribution, in which case the provisions of the
21 * CDDL are applicable instead of those of the GPL.
22 *
23 * You may elect to license modified versions of this file under the
24 * terms and conditions of either the GPL or the CDDL or both.
25 */
26
27
28/*********************************************************************************************************************************
29* Header Files *
30*********************************************************************************************************************************/
31
32#include <iprt/aiomgr.h>
33#include <iprt/err.h>
34#include <iprt/asm.h>
35#include <iprt/mem.h>
36#include <iprt/file.h>
37#include <iprt/list.h>
38#include <iprt/thread.h>
39#include <iprt/assert.h>
40#include <iprt/string.h>
41#include <iprt/critsect.h>
42#include <iprt/memcache.h>
43#include <iprt/semaphore.h>
44#include <iprt/queueatomic.h>
45
46#include "internal/magics.h"
47
48
49/*********************************************************************************************************************************
50* Structures and Typedefs *
51*********************************************************************************************************************************/
52
53/** Pointer to an internal async I/O file instance. */
54typedef struct RTAIOMGRFILEINT *PRTAIOMGRFILEINT;
55
56/**
57 * Blocking event types.
58 */
59typedef enum RTAIOMGREVENT
60{
61 /** Invalid tye */
62 RTAIOMGREVENT_INVALID = 0,
63 /** No event pending. */
64 RTAIOMGREVENT_NO_EVENT,
65 /** A file is added to the manager. */
66 RTAIOMGREVENT_FILE_ADD,
67 /** A file is about to be closed. */
68 RTAIOMGREVENT_FILE_CLOSE,
69 /** The async I/O manager is shut down. */
70 RTAIOMGREVENT_SHUTDOWN,
71 /** 32bit hack */
72 RTAIOMGREVENT_32BIT_HACK = 0x7fffffff
73} RTAIOMGREVENT;
74
75/**
76 * Async I/O manager instance data.
77 */
78typedef struct RTAIOMGRINT
79{
80 /** Magic value. */
81 uint32_t u32Magic;
82 /** Reference count. */
83 volatile uint32_t cRefs;
84 /** Async I/O context handle. */
85 RTFILEAIOCTX hAioCtx;
86 /** async I/O thread. */
87 RTTHREAD hThread;
88 /** List of files assigned to this manager. */
89 RTLISTANCHOR ListFiles;
90 /** Number of requests active currently. */
91 unsigned cReqsActive;
92 /** Number of maximum requests active. */
93 uint32_t cReqsActiveMax;
94 /** Memory cache for requests. */
95 RTMEMCACHE hMemCacheReqs;
96 /** Critical section protecting the blocking event handling. */
97 RTCRITSECT CritSectBlockingEvent;
98 /** Event semaphore for blocking external events.
99 * The caller waits on it until the async I/O manager
100 * finished processing the event. */
101 RTSEMEVENT hEventSemBlock;
102 /** Blocking event type */
103 volatile RTAIOMGREVENT enmBlockingEvent;
104 /** Event type data */
105 union
106 {
107 /** The file to be added */
108 volatile PRTAIOMGRFILEINT pFileAdd;
109 /** The file to be closed */
110 volatile PRTAIOMGRFILEINT pFileClose;
111 } BlockingEventData;
112} RTAIOMGRINT;
113/** Pointer to an internal async I/O manager instance. */
114typedef RTAIOMGRINT *PRTAIOMGRINT;
115
116/**
117 * Async I/O manager file instance data.
118 */
119typedef struct RTAIOMGRFILEINT
120{
121 /** Magic value. */
122 uint32_t u32Magic;
123 /** Reference count. */
124 volatile uint32_t cRefs;
125 /** Flags. */
126 uint32_t fFlags;
127 /** Opaque user data passed on creation. */
128 void *pvUser;
129 /** File handle. */
130 RTFILE hFile;
131 /** async I/O manager this file belongs to. */
132 PRTAIOMGRINT pAioMgr;
133 /** Work queue for new requests. */
134 RTQUEUEATOMIC QueueReqs;
135 /** Completion callback for this file. */
136 PFNRTAIOMGRREQCOMPLETE pfnReqCompleted;
137 /** Data for exclusive use by the assigned async I/O manager. */
138 struct
139 {
140 /** List node of assigned files for a async I/O manager. */
141 RTLISTNODE NodeAioMgrFiles;
142 /** List of requests waiting for submission. */
143 RTLISTANCHOR ListWaitingReqs;
144 /** Number of requests currently being processed for this endpoint
145 * (excluded flush requests). */
146 unsigned cReqsActive;
147 } AioMgr;
148} RTAIOMGRFILEINT;
149
150/** Flag whether the file is closed. */
151#define RTAIOMGRFILE_FLAGS_CLOSING RT_BIT_32(1)
152
153/**
154 * Request type.
155 */
156typedef enum RTAIOMGRREQTYPE
157{
158 /** Invalid request type. */
159 RTAIOMGRREQTYPE_INVALID = 0,
160 /** Read reques type. */
161 RTAIOMGRREQTYPE_READ,
162 /** Write request. */
163 RTAIOMGRREQTYPE_WRITE,
164 /** Flush request. */
165 RTAIOMGRREQTYPE_FLUSH,
166 /** Prefetech request. */
167 RTAIOMGRREQTYPE_PREFETCH,
168 /** 32bit hack. */
169 RTAIOMGRREQTYPE_32BIT_HACK = 0x7fffffff
170} RTAIOMGRREQTYPE;
171/** Pointer to a reques type. */
172typedef RTAIOMGRREQTYPE *PRTAIOMGRREQTYPE;
173
174/**
175 * Async I/O manager request.
176 */
177typedef struct RTAIOMGRREQ
178{
179 /** Atomic queue work item. */
180 RTQUEUEATOMICITEM WorkItem;
181 /** Node for a waiting list. */
182 RTLISTNODE NodeWaitingList;
183 /** Request flags. */
184 uint32_t fFlags;
185 /** Transfer type. */
186 RTAIOMGRREQTYPE enmType;
187 /** Assigned file request. */
188 RTFILEAIOREQ hReqIo;
189 /** File the request belongs to. */
190 PRTAIOMGRFILEINT pFile;
191 /** Opaque user data. */
192 void *pvUser;
193 /** Start offset */
194 RTFOFF off;
195 /** Data segment. */
196 RTSGSEG DataSeg;
197 /** When non-zero the segment uses a bounce buffer because the provided buffer
198 * doesn't meet host requirements. */
199 size_t cbBounceBuffer;
200 /** Pointer to the used bounce buffer if any. */
201 void *pvBounceBuffer;
202 /** Start offset in the bounce buffer to copy from. */
203 uint32_t offBounceBuffer;
204} RTAIOMGRREQ;
205/** Pointer to a I/O manager request. */
206typedef RTAIOMGRREQ *PRTAIOMGRREQ;
207
208/** Flag whether the request was prepared already. */
209#define RTAIOMGRREQ_FLAGS_PREPARED RT_BIT_32(0)
210
211
212/*********************************************************************************************************************************
213* Defined Constants And Macros *
214*********************************************************************************************************************************/
215
216/** Validates a handle and returns VERR_INVALID_HANDLE if not valid. */
217#define RTAIOMGR_VALID_RETURN_RC(a_hAioMgr, a_rc) \
218 do { \
219 AssertPtrReturn((a_hAioMgr), (a_rc)); \
220 AssertReturn((a_hAioMgr)->u32Magic == RTAIOMGR_MAGIC, (a_rc)); \
221 } while (0)
222
223/** Validates a handle and returns VERR_INVALID_HANDLE if not valid. */
224#define RTAIOMGR_VALID_RETURN(a_hAioMgr) RTAIOMGR_VALID_RETURN_RC((hAioMgr), VERR_INVALID_HANDLE)
225
226/** Validates a handle and returns (void) if not valid. */
227#define RTAIOMGR_VALID_RETURN_VOID(a_hAioMgr) \
228 do { \
229 AssertPtrReturnVoid(a_hAioMgr); \
230 AssertReturnVoid((a_hAioMgr)->u32Magic == RTAIOMGR_MAGIC); \
231 } while (0)
232
233
234/*********************************************************************************************************************************
235* Internal Functions *
236*********************************************************************************************************************************/
237
238static int rtAioMgrReqsEnqueue(PRTAIOMGRINT pThis, PRTAIOMGRFILEINT pFile,
239 PRTFILEAIOREQ pahReqs, unsigned cReqs);
240
241/**
242 * Removes an endpoint from the currently assigned manager.
243 *
244 * @returns TRUE if there are still requests pending on the current manager for this endpoint.
245 * FALSE otherwise.
246 * @param pFile The endpoint to remove.
247 */
248static bool rtAioMgrFileRemove(PRTAIOMGRFILEINT pFile)
249{
250 /* Make sure that there is no request pending on this manager for the endpoint. */
251 if (!pFile->AioMgr.cReqsActive)
252 {
253 RTListNodeRemove(&pFile->AioMgr.NodeAioMgrFiles);
254 return false;
255 }
256
257 return true;
258}
259
260/**
261 * Allocate a new I/O request.
262 *
263 * @returns Pointer to the allocated request or NULL if out of memory.
264 * @param pThis The async I/O manager instance.
265 */
266static PRTAIOMGRREQ rtAioMgrReqAlloc(PRTAIOMGRINT pThis)
267{
268 return (PRTAIOMGRREQ)RTMemCacheAlloc(pThis->hMemCacheReqs);
269}
270
271/**
272 * Frees an I/O request.
273 *
274 * @returns nothing.
275 * @param pThis The async I/O manager instance.
276 * @param pReq The request to free.
277 */
278static void rtAioMgrReqFree(PRTAIOMGRINT pThis, PRTAIOMGRREQ pReq)
279{
280 if (pReq->cbBounceBuffer)
281 {
282 AssertPtr(pReq->pvBounceBuffer);
283 RTMemPageFree(pReq->pvBounceBuffer, pReq->cbBounceBuffer);
284 pReq->pvBounceBuffer = NULL;
285 pReq->cbBounceBuffer = 0;
286 }
287 pReq->fFlags = 0;
288 RTAioMgrFileRelease(pReq->pFile);
289 RTMemCacheFree(pThis->hMemCacheReqs, pReq);
290}
291
292static void rtAioMgrReqCompleteRc(PRTAIOMGRINT pThis, PRTAIOMGRREQ pReq,
293 int rcReq, size_t cbTransfered)
294{
295 int rc = VINF_SUCCESS;
296 PRTAIOMGRFILEINT pFile;
297
298 pFile = pReq->pFile;
299 pThis->cReqsActive--;
300 pFile->AioMgr.cReqsActive--;
301
302 /*
303 * It is possible that the request failed on Linux with kernels < 2.6.23
304 * if the passed buffer was allocated with remap_pfn_range or if the file
305 * is on an NFS endpoint which does not support async and direct I/O at the same time.
306 * The endpoint will be migrated to a failsafe manager in case a request fails.
307 */
308 if (RT_FAILURE(rcReq))
309 {
310 pFile->pfnReqCompleted(pFile, rcReq, pReq->pvUser);
311 rtAioMgrReqFree(pThis, pReq);
312 }
313 else
314 {
315 /*
316 * Restart an incomplete transfer.
317 * This usually means that the request will return an error now
318 * but to get the cause of the error (disk full, file too big, I/O error, ...)
319 * the transfer needs to be continued.
320 */
321 if (RT_UNLIKELY( cbTransfered < pReq->DataSeg.cbSeg
322 || ( pReq->cbBounceBuffer
323 && cbTransfered < pReq->cbBounceBuffer)))
324 {
325 RTFOFF offStart;
326 size_t cbToTransfer;
327 uint8_t *pbBuf = NULL;
328
329 Assert(cbTransfered % 512 == 0);
330
331 if (pReq->cbBounceBuffer)
332 {
333 AssertPtr(pReq->pvBounceBuffer);
334 offStart = (pReq->off & ~((RTFOFF)512-1)) + cbTransfered;
335 cbToTransfer = pReq->cbBounceBuffer - cbTransfered;
336 pbBuf = (uint8_t *)pReq->pvBounceBuffer + cbTransfered;
337 }
338 else
339 {
340 Assert(!pReq->pvBounceBuffer);
341 offStart = pReq->off + cbTransfered;
342 cbToTransfer = pReq->DataSeg.cbSeg - cbTransfered;
343 pbBuf = (uint8_t *)pReq->DataSeg.pvSeg + cbTransfered;
344 }
345
346 if ( pReq->enmType == RTAIOMGRREQTYPE_PREFETCH
347 || pReq->enmType == RTAIOMGRREQTYPE_READ)
348 {
349 rc = RTFileAioReqPrepareRead(pReq->hReqIo, pFile->hFile, offStart,
350 pbBuf, cbToTransfer, pReq);
351 }
352 else
353 {
354 AssertMsg(pReq->enmType == RTAIOMGRREQTYPE_WRITE,
355 ("Invalid transfer type\n"));
356 rc = RTFileAioReqPrepareWrite(pReq->hReqIo, pFile->hFile, offStart,
357 pbBuf, cbToTransfer, pReq);
358 }
359 AssertRC(rc);
360
361 rc = rtAioMgrReqsEnqueue(pThis, pFile, &pReq->hReqIo, 1);
362 AssertMsg(RT_SUCCESS(rc) || (rc == VERR_FILE_AIO_INSUFFICIENT_RESSOURCES),
363 ("Unexpected return code rc=%Rrc\n", rc));
364 }
365 else if (pReq->enmType == RTAIOMGRREQTYPE_PREFETCH)
366 {
367 Assert(pReq->cbBounceBuffer);
368 pReq->enmType = RTAIOMGRREQTYPE_WRITE;
369
370 memcpy(((uint8_t *)pReq->pvBounceBuffer) + pReq->offBounceBuffer,
371 pReq->DataSeg.pvSeg,
372 pReq->DataSeg.cbSeg);
373
374 /* Write it now. */
375 RTFOFF offStart = pReq->off & ~(RTFOFF)(512-1);
376 size_t cbToTransfer = RT_ALIGN_Z(pReq->DataSeg.cbSeg + (pReq->off - offStart), 512);
377
378 rc = RTFileAioReqPrepareWrite(pReq->hReqIo, pFile->hFile,
379 offStart, pReq->pvBounceBuffer, cbToTransfer, pReq);
380 AssertRC(rc);
381 rc = rtAioMgrReqsEnqueue(pThis, pFile, &pReq->hReqIo, 1);
382 AssertMsg(RT_SUCCESS(rc) || (rc == VERR_FILE_AIO_INSUFFICIENT_RESSOURCES),
383 ("Unexpected return code rc=%Rrc\n", rc));
384 }
385 else
386 {
387 if (RT_SUCCESS(rc) && pReq->cbBounceBuffer)
388 {
389 if (pReq->enmType == RTAIOMGRREQTYPE_READ)
390 memcpy(pReq->DataSeg.pvSeg,
391 ((uint8_t *)pReq->pvBounceBuffer) + pReq->offBounceBuffer,
392 pReq->DataSeg.cbSeg);
393 }
394
395 /* Call completion callback */
396 pFile->pfnReqCompleted(pFile, rcReq, pReq->pvUser);
397 rtAioMgrReqFree(pThis, pReq);
398 }
399 } /* request completed successfully */
400}
401
402/**
403 * Wrapper around rtAioMgrReqCompleteRc().
404 */
405static void rtAioMgrReqComplete(PRTAIOMGRINT pThis, RTFILEAIOREQ hReq)
406{
407 size_t cbTransfered = 0;
408 int rcReq = RTFileAioReqGetRC(hReq, &cbTransfered);
409 PRTAIOMGRREQ pReq = (PRTAIOMGRREQ)RTFileAioReqGetUser(hReq);
410
411 rtAioMgrReqCompleteRc(pThis, pReq, rcReq, cbTransfered);
412}
413
414/**
415 * Wrapper around RTFIleAioCtxSubmit() which is also doing error handling.
416 */
417static int rtAioMgrReqsEnqueue(PRTAIOMGRINT pThis, PRTAIOMGRFILEINT pFile,
418 PRTFILEAIOREQ pahReqs, unsigned cReqs)
419{
420 pThis->cReqsActive += cReqs;
421 pFile->AioMgr.cReqsActive += cReqs;
422
423 int rc = RTFileAioCtxSubmit(pThis->hAioCtx, pahReqs, cReqs);
424 if (RT_FAILURE(rc))
425 {
426 if (rc == VERR_FILE_AIO_INSUFFICIENT_RESSOURCES)
427 {
428 /* Append any not submitted task to the waiting list. */
429 for (size_t i = 0; i < cReqs; i++)
430 {
431 int rcReq = RTFileAioReqGetRC(pahReqs[i], NULL);
432
433 if (rcReq != VERR_FILE_AIO_IN_PROGRESS)
434 {
435 PRTAIOMGRREQ pReq = (PRTAIOMGRREQ)RTFileAioReqGetUser(pahReqs[i]);
436
437 Assert(pReq->hReqIo == pahReqs[i]);
438 RTListAppend(&pFile->AioMgr.ListWaitingReqs, &pReq->NodeWaitingList);
439 pThis->cReqsActive--;
440 pFile->AioMgr.cReqsActive--;
441 }
442 }
443
444 pThis->cReqsActiveMax = pThis->cReqsActive;
445 rc = VINF_SUCCESS;
446 }
447 else /* Another kind of error happened (full disk, ...) */
448 {
449 /* An error happened. Find out which one caused the error and resubmit all other tasks. */
450 for (size_t i = 0; i < cReqs; i++)
451 {
452 PRTAIOMGRREQ pReq = (PRTAIOMGRREQ)RTFileAioReqGetUser(pahReqs[i]);
453 int rcReq = RTFileAioReqGetRC(pahReqs[i], NULL);
454
455 if (rcReq == VERR_FILE_AIO_NOT_SUBMITTED)
456 {
457 /* We call ourself again to do any error handling which might come up now. */
458 rc = rtAioMgrReqsEnqueue(pThis, pFile, &pahReqs[i], 1);
459 AssertRC(rc);
460 }
461 else if (rcReq != VERR_FILE_AIO_IN_PROGRESS)
462 rtAioMgrReqCompleteRc(pThis, pReq, rcReq, 0);
463 }
464 }
465 }
466
467 return VINF_SUCCESS;
468}
469
470/**
471 * Adds a list of requests to the waiting list.
472 *
473 * @returns nothing.
474 * @param pFile The file instance to add the requests to.
475 * @param pReqsHead The head of the request list to add.
476 */
477static void rtAioMgrFileAddReqsToWaitingList(PRTAIOMGRFILEINT pFile, PRTAIOMGRREQ pReqsHead)
478{
479 while (pReqsHead)
480 {
481 PRTAIOMGRREQ pReqCur = pReqsHead;
482
483 pReqsHead = (PRTAIOMGRREQ)pReqsHead->WorkItem.pNext;
484 pReqCur->WorkItem.pNext = NULL;
485 RTListAppend(&pFile->AioMgr.ListWaitingReqs, &pReqCur->NodeWaitingList);
486 }
487}
488
489/**
490 * Prepare the native I/o request ensuring that all alignment prerequisites of
491 * the host are met.
492 *
493 * @returns IPRT statuse code.
494 * @param pFile The file instance data.
495 * @param pReq The request to prepare.
496 */
497static int rtAioMgrReqPrepareNonBuffered(PRTAIOMGRFILEINT pFile, PRTAIOMGRREQ pReq)
498{
499 int rc = VINF_SUCCESS;
500 RTFOFF offStart = pReq->off & ~(RTFOFF)(512-1);
501 size_t cbToTransfer = RT_ALIGN_Z(pReq->DataSeg.cbSeg + (pReq->off - offStart), 512);
502 void *pvBuf = pReq->DataSeg.pvSeg;
503 bool fAlignedReq = cbToTransfer == pReq->DataSeg.cbSeg
504 && offStart == pReq->off;
505
506 /*
507 * Check if the alignment requirements are met.
508 * Offset, transfer size and buffer address
509 * need to be on a 512 boundary.
510 */
511 if ( !fAlignedReq
512 /** @todo || ((pEpClassFile->uBitmaskAlignment & (RTR3UINTPTR)pvBuf) != (RTR3UINTPTR)pvBuf) */)
513 {
514 /* Create bounce buffer. */
515 pReq->cbBounceBuffer = cbToTransfer;
516
517 AssertMsg(pReq->off >= offStart, ("Overflow in calculation off=%llu offStart=%llu\n",
518 pReq->off, offStart));
519 pReq->offBounceBuffer = pReq->off - offStart;
520
521 /** @todo I think we need something like a RTMemAllocAligned method here.
522 * Current assumption is that the maximum alignment is 4096byte
523 * (GPT disk on Windows)
524 * so we can use RTMemPageAlloc here.
525 */
526 pReq->pvBounceBuffer = RTMemPageAlloc(cbToTransfer);
527 if (RT_LIKELY(pReq->pvBounceBuffer))
528 {
529 pvBuf = pReq->pvBounceBuffer;
530
531 if (pReq->enmType == RTAIOMGRREQTYPE_WRITE)
532 {
533 if ( RT_UNLIKELY(cbToTransfer != pReq->DataSeg.cbSeg)
534 || RT_UNLIKELY(offStart != pReq->off))
535 {
536 /* We have to fill the buffer first before we can update the data. */
537 pReq->enmType = RTAIOMGRREQTYPE_WRITE;
538 }
539 else
540 memcpy(pvBuf, pReq->DataSeg.pvSeg, pReq->DataSeg.cbSeg);
541 }
542 }
543 else
544 rc = VERR_NO_MEMORY;
545 }
546 else
547 pReq->cbBounceBuffer = 0;
548
549 if (RT_SUCCESS(rc))
550 {
551 if (pReq->enmType == RTAIOMGRREQTYPE_WRITE)
552 {
553 rc = RTFileAioReqPrepareWrite(pReq->hReqIo, pFile->hFile,
554 offStart, pvBuf, cbToTransfer, pReq);
555 }
556 else /* Read or prefetch request. */
557 rc = RTFileAioReqPrepareRead(pReq->hReqIo, pFile->hFile,
558 offStart, pvBuf, cbToTransfer, pReq);
559 AssertRC(rc);
560 pReq->fFlags |= RTAIOMGRREQ_FLAGS_PREPARED;
561 }
562
563 return rc;
564}
565
566/**
567 * Prepare a new request for enqueuing.
568 *
569 * @returns IPRT status code.
570 * @param pReq The request to prepare.
571 * @param phReqIo Where to store the handle to the native I/O request on success.
572 */
573static int rtAioMgrPrepareReq(PRTAIOMGRREQ pReq, PRTFILEAIOREQ phReqIo)
574{
575 int rc = VINF_SUCCESS;
576 PRTAIOMGRFILEINT pFile = pReq->pFile;
577
578 switch (pReq->enmType)
579 {
580 case RTAIOMGRREQTYPE_FLUSH:
581 {
582 rc = RTFileAioReqPrepareFlush(pReq->hReqIo, pFile->hFile, pReq);
583 break;
584 }
585 case RTAIOMGRREQTYPE_READ:
586 case RTAIOMGRREQTYPE_WRITE:
587 {
588 rc = rtAioMgrReqPrepareNonBuffered(pFile, pReq);
589 break;
590 }
591 default:
592 AssertMsgFailed(("Invalid transfer type %d\n", pReq->enmType));
593 } /* switch transfer type */
594
595 if (RT_SUCCESS(rc))
596 *phReqIo = pReq->hReqIo;
597
598 return rc;
599}
600
601/**
602 * Prepare newly submitted requests for processing.
603 *
604 * @returns IPRT status code
605 * @param pThis The async I/O manager instance data.
606 * @param pFile The file instance.
607 * @param pReqsNew The list of new requests to prepare.
608 */
609static int rtAioMgrPrepareNewReqs(PRTAIOMGRINT pThis,
610 PRTAIOMGRFILEINT pFile,
611 PRTAIOMGRREQ pReqsNew)
612{
613 RTFILEAIOREQ apReqs[20];
614 unsigned cRequests = 0;
615 int rc = VINF_SUCCESS;
616
617 /* Go through the list and queue the requests. */
618 while ( pReqsNew
619 && (pThis->cReqsActive + cRequests < pThis->cReqsActiveMax)
620 && RT_SUCCESS(rc))
621 {
622 PRTAIOMGRREQ pCurr = pReqsNew;
623 pReqsNew = (PRTAIOMGRREQ)pReqsNew->WorkItem.pNext;
624
625 pCurr->WorkItem.pNext = NULL;
626 AssertMsg(VALID_PTR(pCurr->pFile) && (pCurr->pFile == pFile),
627 ("Files do not match\n"));
628 AssertMsg(!(pCurr->fFlags & RTAIOMGRREQ_FLAGS_PREPARED),
629 ("Request on the new list is already prepared\n"));
630
631 rc = rtAioMgrPrepareReq(pCurr, &apReqs[cRequests]);
632 if (RT_FAILURE(rc))
633 rtAioMgrReqCompleteRc(pThis, pCurr, rc, 0);
634 else
635 cRequests++;
636
637 /* Queue the requests if the array is full. */
638 if (cRequests == RT_ELEMENTS(apReqs))
639 {
640 rc = rtAioMgrReqsEnqueue(pThis, pFile, apReqs, cRequests);
641 cRequests = 0;
642 AssertMsg(RT_SUCCESS(rc) || (rc == VERR_FILE_AIO_INSUFFICIENT_RESSOURCES),
643 ("Unexpected return code\n"));
644 }
645 }
646
647 if (cRequests)
648 {
649 rc = rtAioMgrReqsEnqueue(pThis, pFile, apReqs, cRequests);
650 AssertMsg(RT_SUCCESS(rc) || (rc == VERR_FILE_AIO_INSUFFICIENT_RESSOURCES),
651 ("Unexpected return code rc=%Rrc\n", rc));
652 }
653
654 if (pReqsNew)
655 {
656 /* Add the rest of the tasks to the pending list */
657 rtAioMgrFileAddReqsToWaitingList(pFile, pReqsNew);
658 }
659
660 /* Insufficient resources are not fatal. */
661 if (rc == VERR_FILE_AIO_INSUFFICIENT_RESSOURCES)
662 rc = VINF_SUCCESS;
663
664 return rc;
665}
666
667/**
668 * Queues waiting requests.
669 *
670 * @returns IPRT status code.
671 * @param pThis The async I/O manager instance data.
672 * @param pFile The file to get the requests from.
673 */
674static int rtAioMgrQueueWaitingReqs(PRTAIOMGRINT pThis, PRTAIOMGRFILEINT pFile)
675{
676 RTFILEAIOREQ apReqs[20];
677 unsigned cRequests = 0;
678 int rc = VINF_SUCCESS;
679
680 /* Go through the list and queue the requests. */
681 PRTAIOMGRREQ pReqIt;
682 PRTAIOMGRREQ pReqItNext;
683 RTListForEachSafe(&pFile->AioMgr.ListWaitingReqs, pReqIt, pReqItNext, RTAIOMGRREQ, NodeWaitingList)
684 {
685 RTListNodeRemove(&pReqIt->NodeWaitingList);
686 AssertMsg(VALID_PTR(pReqIt->pFile) && (pReqIt->pFile == pFile),
687 ("Files do not match\n"));
688
689 if (!(pReqIt->fFlags & RTAIOMGRREQ_FLAGS_PREPARED))
690 {
691 rc = rtAioMgrPrepareReq(pReqIt, &apReqs[cRequests]);
692 if (RT_FAILURE(rc))
693 rtAioMgrReqCompleteRc(pThis, pReqIt, rc, 0);
694 else
695 cRequests++;
696 }
697 else
698 {
699 apReqs[cRequests] = pReqIt->hReqIo;
700 cRequests++;
701 }
702
703 /* Queue the requests if the array is full. */
704 if (cRequests == RT_ELEMENTS(apReqs))
705 {
706 rc = rtAioMgrReqsEnqueue(pThis, pFile, apReqs, cRequests);
707 cRequests = 0;
708 AssertMsg(RT_SUCCESS(rc) || (rc == VERR_FILE_AIO_INSUFFICIENT_RESSOURCES),
709 ("Unexpected return code\n"));
710 }
711 }
712
713 if (cRequests)
714 {
715 rc = rtAioMgrReqsEnqueue(pThis, pFile, apReqs, cRequests);
716 AssertMsg(RT_SUCCESS(rc) || (rc == VERR_FILE_AIO_INSUFFICIENT_RESSOURCES),
717 ("Unexpected return code rc=%Rrc\n", rc));
718 }
719
720 /* Insufficient resources are not fatal. */
721 if (rc == VERR_FILE_AIO_INSUFFICIENT_RESSOURCES)
722 rc = VINF_SUCCESS;
723
724 return rc;
725}
726
727/**
728 * Adds all pending requests for the given file.
729 *
730 * @returns IPRT status code.
731 * @param pThis The async I/O manager instance data.
732 * @param pFile The file to get the requests from.
733 */
734static int rtAioMgrQueueReqs(PRTAIOMGRINT pThis, PRTAIOMGRFILEINT pFile)
735{
736 int rc = VINF_SUCCESS;
737
738 /* Check the pending list first */
739 if (!RTListIsEmpty(&pFile->AioMgr.ListWaitingReqs))
740 rc = rtAioMgrQueueWaitingReqs(pThis, pFile);
741
742 if ( RT_SUCCESS(rc)
743 && RTListIsEmpty(&pFile->AioMgr.ListWaitingReqs))
744 {
745 PRTAIOMGRREQ pReqsNew = (PRTAIOMGRREQ)RTQueueAtomicRemoveAll(&pFile->QueueReqs);
746
747 if (pReqsNew)
748 {
749 rc = rtAioMgrPrepareNewReqs(pThis, pFile, pReqsNew);
750 AssertRC(rc);
751 }
752 }
753
754 return rc;
755}
756
757/**
758 * Checks all files for new requests.
759 *
760 * @returns IPRT status code.
761 * @param pThis The I/O manager instance data.
762 */
763static int rtAioMgrCheckFiles(PRTAIOMGRINT pThis)
764{
765 int rc = VINF_SUCCESS;
766
767 PRTAIOMGRFILEINT pIt, pNext;
768 RTListForEachSafe(&pThis->ListFiles, pIt, pNext, RTAIOMGRFILEINT, AioMgr.NodeAioMgrFiles)
769 {
770 rc = rtAioMgrQueueReqs(pThis, pIt);
771 if (RT_FAILURE(rc))
772 return rc;
773 }
774
775 return rc;
776}
777
778/**
779 * Process a blocking event from the outside.
780 *
781 * @returns IPRT status code.
782 * @param pThis The async I/O manager instance data.
783 */
784static int rtAioMgrProcessBlockingEvent(PRTAIOMGRINT pThis)
785{
786 int rc = VINF_SUCCESS;
787 bool fNotifyWaiter = false;
788
789 switch (pThis->enmBlockingEvent)
790 {
791 case RTAIOMGREVENT_NO_EVENT:
792 /* Nothing to do. */
793 break;
794 case RTAIOMGREVENT_FILE_ADD:
795 {
796 PRTAIOMGRFILEINT pFile = ASMAtomicReadPtrT(&pThis->BlockingEventData.pFileAdd, PRTAIOMGRFILEINT);
797 AssertMsg(VALID_PTR(pFile), ("Adding file event without a file to add\n"));
798
799 RTListAppend(&pThis->ListFiles, &pFile->AioMgr.NodeAioMgrFiles);
800 fNotifyWaiter = true;
801 break;
802 }
803 case RTAIOMGREVENT_FILE_CLOSE:
804 {
805 PRTAIOMGRFILEINT pFile = ASMAtomicReadPtrT(&pThis->BlockingEventData.pFileClose, PRTAIOMGRFILEINT);
806 AssertMsg(VALID_PTR(pFile), ("Close file event without a file to close\n"));
807
808 if (!(pFile->fFlags & RTAIOMGRFILE_FLAGS_CLOSING))
809 {
810 /* Make sure all requests finished. Process the queues a last time first. */
811 rc = rtAioMgrQueueReqs(pThis, pFile);
812 AssertRC(rc);
813
814 pFile->fFlags |= RTAIOMGRFILE_FLAGS_CLOSING;
815 fNotifyWaiter = !rtAioMgrFileRemove(pFile);
816 }
817 else if (!pFile->AioMgr.cReqsActive)
818 fNotifyWaiter = true;
819 break;
820 }
821 case RTAIOMGREVENT_SHUTDOWN:
822 {
823 if (!pThis->cReqsActive)
824 fNotifyWaiter = true;
825 break;
826 }
827 default:
828 AssertReleaseMsgFailed(("Invalid event type %d\n", pThis->enmBlockingEvent));
829 }
830
831 if (fNotifyWaiter)
832 {
833 /* Release the waiting thread. */
834 rc = RTSemEventSignal(pThis->hEventSemBlock);
835 AssertRC(rc);
836 }
837
838 return rc;
839}
840
841/**
842 * async I/O manager worker loop.
843 *
844 * @returns IPRT status code.
845 * @param hThreadSelf The thread handle this worker belongs to.
846 * @param pvUser Opaque user data (Pointer to async I/O manager instance).
847 */
848static DECLCALLBACK(int) rtAioMgrWorker(RTTHREAD hThreadSelf, void *pvUser)
849{
850 PRTAIOMGRINT pThis = (PRTAIOMGRINT)pvUser;
851 /*bool fRunning = true;*/
852 int rc = VINF_SUCCESS;
853
854 do
855 {
856 uint32_t cReqsCompleted = 0;
857 RTFILEAIOREQ ahReqsCompleted[32];
858 rc = RTFileAioCtxWait(pThis->hAioCtx, 1, RT_INDEFINITE_WAIT, &ahReqsCompleted[0],
859 RT_ELEMENTS(ahReqsCompleted), &cReqsCompleted);
860 if (rc == VERR_INTERRUPTED)
861 {
862 /* Process external event. */
863 rtAioMgrProcessBlockingEvent(pThis);
864 rc = rtAioMgrCheckFiles(pThis);
865 }
866 else if (RT_FAILURE(rc))
867 {
868 /* Something bad happened. */
869 /** @todo */
870 }
871 else
872 {
873 /* Requests completed. */
874 for (uint32_t i = 0; i < cReqsCompleted; i++)
875 rtAioMgrReqComplete(pThis, ahReqsCompleted[i]);
876
877 /* Check files for new requests and queue waiting requests. */
878 rc = rtAioMgrCheckFiles(pThis);
879 }
880 } while ( /*fRunning - never modified
881 && */ RT_SUCCESS(rc));
882
883 RT_NOREF_PV(hThreadSelf);
884 return rc;
885}
886
887/**
888 * Wakes up the async I/O manager.
889 *
890 * @returns IPRT status code.
891 * @param pThis The async I/O manager.
892 */
893static int rtAioMgrWakeup(PRTAIOMGRINT pThis)
894{
895 return RTFileAioCtxWakeup(pThis->hAioCtx);
896}
897
898/**
899 * Waits until the async I/O manager handled the given event.
900 *
901 * @returns IPRT status code.
902 * @param pThis The async I/O manager.
903 * @param enmEvent The event to pass to the manager.
904 */
905static int rtAioMgrWaitForBlockingEvent(PRTAIOMGRINT pThis, RTAIOMGREVENT enmEvent)
906{
907 Assert(pThis->enmBlockingEvent == RTAIOMGREVENT_NO_EVENT);
908 ASMAtomicWriteU32((volatile uint32_t *)&pThis->enmBlockingEvent, enmEvent);
909
910 /* Wakeup the async I/O manager */
911 int rc = rtAioMgrWakeup(pThis);
912 if (RT_FAILURE(rc))
913 return rc;
914
915 /* Wait for completion. */
916 rc = RTSemEventWait(pThis->hEventSemBlock, RT_INDEFINITE_WAIT);
917 AssertRC(rc);
918
919 ASMAtomicWriteU32((volatile uint32_t *)&pThis->enmBlockingEvent, RTAIOMGREVENT_NO_EVENT);
920
921 return rc;
922}
923
924/**
925 * Add a given file to the given I/O manager.
926 *
927 * @returns IPRT status code.
928 * @param pThis The async I/O manager.
929 * @param pFile The file to add.
930 */
931static int rtAioMgrAddFile(PRTAIOMGRINT pThis, PRTAIOMGRFILEINT pFile)
932{
933 /* Update the assigned I/O manager. */
934 ASMAtomicWritePtr(&pFile->pAioMgr, pThis);
935
936 int rc = RTCritSectEnter(&pThis->CritSectBlockingEvent);
937 AssertRCReturn(rc, rc);
938
939 ASMAtomicWritePtr(&pThis->BlockingEventData.pFileAdd, pFile);
940 rc = rtAioMgrWaitForBlockingEvent(pThis, RTAIOMGREVENT_FILE_ADD);
941 ASMAtomicWriteNullPtr(&pThis->BlockingEventData.pFileAdd);
942
943 RTCritSectLeave(&pThis->CritSectBlockingEvent);
944 return rc;
945}
946
947/**
948 * Removes a given file from the given I/O manager.
949 *
950 * @returns IPRT status code.
951 * @param pThis The async I/O manager.
952 * @param pFile The file to remove.
953 */
954static int rtAioMgrCloseFile(PRTAIOMGRINT pThis, PRTAIOMGRFILEINT pFile)
955{
956 int rc = RTCritSectEnter(&pThis->CritSectBlockingEvent);
957 AssertRCReturn(rc, rc);
958
959 ASMAtomicWritePtr(&pThis->BlockingEventData.pFileClose, pFile);
960 rc = rtAioMgrWaitForBlockingEvent(pThis, RTAIOMGREVENT_FILE_CLOSE);
961 ASMAtomicWriteNullPtr(&pThis->BlockingEventData.pFileClose);
962
963 RTCritSectLeave(&pThis->CritSectBlockingEvent);
964
965 return rc;
966}
967
968/**
969 * Process a shutdown event.
970 *
971 * @returns IPRT status code.
972 * @param pThis The async I/O manager to shut down.
973 */
974static int rtAioMgrShutdown(PRTAIOMGRINT pThis)
975{
976 int rc = RTCritSectEnter(&pThis->CritSectBlockingEvent);
977 AssertRCReturn(rc, rc);
978
979 rc = rtAioMgrWaitForBlockingEvent(pThis, RTAIOMGREVENT_SHUTDOWN);
980 RTCritSectLeave(&pThis->CritSectBlockingEvent);
981
982 return rc;
983}
984
985/**
986 * Destroys an async I/O manager.
987 *
988 * @returns nothing.
989 * @param pThis The async I/O manager instance to destroy.
990 */
991static void rtAioMgrDestroy(PRTAIOMGRINT pThis)
992{
993 int rc;
994
995 rc = rtAioMgrShutdown(pThis);
996 AssertRC(rc);
997
998 rc = RTThreadWait(pThis->hThread, RT_INDEFINITE_WAIT, NULL);
999 AssertRC(rc);
1000
1001 rc = RTFileAioCtxDestroy(pThis->hAioCtx);
1002 AssertRC(rc);
1003
1004 rc = RTMemCacheDestroy(pThis->hMemCacheReqs);
1005 AssertRC(rc);
1006
1007 pThis->hThread = NIL_RTTHREAD;
1008 pThis->hAioCtx = NIL_RTFILEAIOCTX;
1009 pThis->hMemCacheReqs = NIL_RTMEMCACHE;
1010 pThis->u32Magic = ~RTAIOMGR_MAGIC;
1011 RTCritSectDelete(&pThis->CritSectBlockingEvent);
1012 RTSemEventDestroy(pThis->hEventSemBlock);
1013 RTMemFree(pThis);
1014}
1015
1016/**
1017 * Queues a new request for processing.
1018 */
1019static void rtAioMgrFileQueueReq(PRTAIOMGRFILEINT pThis, PRTAIOMGRREQ pReq)
1020{
1021 RTAioMgrFileRetain(pThis);
1022 RTQueueAtomicInsert(&pThis->QueueReqs, &pReq->WorkItem);
1023 rtAioMgrWakeup(pThis->pAioMgr);
1024}
1025
1026/**
1027 * Destroys an async I/O manager file.
1028 *
1029 * @returns nothing.
1030 * @param pThis The async I/O manager file.
1031 */
1032static void rtAioMgrFileDestroy(PRTAIOMGRFILEINT pThis)
1033{
1034 pThis->u32Magic = ~RTAIOMGRFILE_MAGIC;
1035 rtAioMgrCloseFile(pThis->pAioMgr, pThis);
1036 RTAioMgrRelease(pThis->pAioMgr);
1037 RTMemFree(pThis);
1038}
1039
1040/**
1041 * Queues a new I/O request.
1042 *
1043 * @returns IPRT status code.
1044 * @param hAioMgrFile The I/O manager file handle.
1045 * @param off Start offset of the I/o request.
1046 * @param pSgBuf Data S/G buffer.
1047 * @param cbIo How much to transfer.
1048 * @param pvUser Opaque user data.
1049 * @param enmType I/O direction type (read/write).
1050 */
1051static int rtAioMgrFileIoReqCreate(RTAIOMGRFILE hAioMgrFile, RTFOFF off, PRTSGBUF pSgBuf,
1052 size_t cbIo, void *pvUser, RTAIOMGRREQTYPE enmType)
1053{
1054 int rc;
1055 PRTAIOMGRFILEINT pFile = hAioMgrFile;
1056 PRTAIOMGRINT pAioMgr;
1057
1058 AssertPtrReturn(pFile, VERR_INVALID_HANDLE);
1059 pAioMgr = pFile->pAioMgr;
1060
1061 PRTAIOMGRREQ pReq = rtAioMgrReqAlloc(pAioMgr);
1062 if (RT_LIKELY(pReq))
1063 {
1064 unsigned cSeg = 1;
1065 size_t cbSeg = RTSgBufSegArrayCreate(pSgBuf, &pReq->DataSeg, &cSeg, cbIo);
1066
1067 if (cbSeg == cbIo)
1068 {
1069 pReq->enmType = enmType;
1070 pReq->pFile = pFile;
1071 pReq->pvUser = pvUser;
1072 pReq->off = off;
1073 rtAioMgrFileQueueReq(pFile, pReq);
1074 rc = VERR_FILE_AIO_IN_PROGRESS;
1075 }
1076 else
1077 {
1078 /** @todo Real S/G buffer support. */
1079 rtAioMgrReqFree(pAioMgr, pReq);
1080 rc = VERR_NOT_SUPPORTED;
1081 }
1082 }
1083 else
1084 rc = VERR_NO_MEMORY;
1085
1086 return rc;
1087}
1088
1089/**
1090 * Request constructor for the memory cache.
1091 *
1092 * @returns IPRT status code.
1093 * @param hMemCache The cache handle.
1094 * @param pvObj The memory object that should be initialized.
1095 * @param pvUser The user argument.
1096 */
1097static DECLCALLBACK(int) rtAioMgrReqCtor(RTMEMCACHE hMemCache, void *pvObj, void *pvUser)
1098{
1099 PRTAIOMGRREQ pReq = (PRTAIOMGRREQ)pvObj;
1100 RT_NOREF_PV(hMemCache); RT_NOREF_PV(pvUser);
1101
1102 memset(pReq, 0, sizeof(RTAIOMGRREQ));
1103 return RTFileAioReqCreate(&pReq->hReqIo);
1104}
1105
1106/**
1107 * Request destructor for the memory cache.
1108 *
1109 * @param hMemCache The cache handle.
1110 * @param pvObj The memory object that should be destroyed.
1111 * @param pvUser The user argument.
1112 */
1113static DECLCALLBACK(void) rtAioMgrReqDtor(RTMEMCACHE hMemCache, void *pvObj, void *pvUser)
1114{
1115 PRTAIOMGRREQ pReq = (PRTAIOMGRREQ)pvObj;
1116 int rc = RTFileAioReqDestroy(pReq->hReqIo);
1117 AssertRC(rc);
1118
1119 RT_NOREF_PV(hMemCache); RT_NOREF_PV(pvUser);
1120}
1121
1122RTDECL(int) RTAioMgrCreate(PRTAIOMGR phAioMgr, uint32_t cReqsMax)
1123{
1124 int rc = VINF_SUCCESS;
1125 PRTAIOMGRINT pThis;
1126
1127 AssertPtrReturn(phAioMgr, VERR_INVALID_POINTER);
1128 AssertReturn(cReqsMax > 0, VERR_INVALID_PARAMETER);
1129
1130 pThis = (PRTAIOMGRINT)RTMemAllocZ(sizeof(RTAIOMGRINT));
1131 if (pThis)
1132 {
1133 pThis->u32Magic = RTAIOMGR_MAGIC;
1134 pThis->cRefs = 1;
1135 pThis->enmBlockingEvent = RTAIOMGREVENT_NO_EVENT;
1136 RTListInit(&pThis->ListFiles);
1137 rc = RTCritSectInit(&pThis->CritSectBlockingEvent);
1138 if (RT_SUCCESS(rc))
1139 {
1140 rc = RTSemEventCreate(&pThis->hEventSemBlock);
1141 if (RT_SUCCESS(rc))
1142 {
1143 rc = RTMemCacheCreate(&pThis->hMemCacheReqs, sizeof(RTAIOMGRREQ),
1144 0, UINT32_MAX, rtAioMgrReqCtor, rtAioMgrReqDtor, NULL, 0);
1145 if (RT_SUCCESS(rc))
1146 {
1147 rc = RTFileAioCtxCreate(&pThis->hAioCtx, cReqsMax == UINT32_MAX
1148 ? RTFILEAIO_UNLIMITED_REQS
1149 : cReqsMax,
1150 RTFILEAIOCTX_FLAGS_WAIT_WITHOUT_PENDING_REQUESTS);
1151 if (RT_SUCCESS(rc))
1152 {
1153 rc = RTThreadCreateF(&pThis->hThread, rtAioMgrWorker, pThis, 0, RTTHREADTYPE_IO,
1154 RTTHREADFLAGS_WAITABLE, "AioMgr-%u", cReqsMax);
1155 if (RT_FAILURE(rc))
1156 {
1157 rc = RTFileAioCtxDestroy(pThis->hAioCtx);
1158 AssertRC(rc);
1159 }
1160 }
1161
1162 if (RT_FAILURE(rc))
1163 RTMemCacheDestroy(pThis->hMemCacheReqs);
1164 }
1165
1166 if (RT_FAILURE(rc))
1167 RTSemEventDestroy(pThis->hEventSemBlock);
1168 }
1169
1170 if (RT_FAILURE(rc))
1171 RTCritSectDelete(&pThis->CritSectBlockingEvent);
1172 }
1173
1174 if (RT_FAILURE(rc))
1175 RTMemFree(pThis);
1176 }
1177 else
1178 rc = VERR_NO_MEMORY;
1179
1180 if (RT_SUCCESS(rc))
1181 *phAioMgr = pThis;
1182
1183 return rc;
1184}
1185
1186RTDECL(uint32_t) RTAioMgrRetain(RTAIOMGR hAioMgr)
1187{
1188 PRTAIOMGRINT pThis = hAioMgr;
1189 AssertReturn(hAioMgr != NIL_RTAIOMGR, UINT32_MAX);
1190 AssertPtrReturn(pThis, UINT32_MAX);
1191 AssertReturn(pThis->u32Magic == RTAIOMGR_MAGIC, UINT32_MAX);
1192
1193 uint32_t cRefs = ASMAtomicIncU32(&pThis->cRefs);
1194 AssertMsg(cRefs > 1 && cRefs < _1G, ("%#x %p\n", cRefs, pThis));
1195 return cRefs;
1196}
1197
1198RTDECL(uint32_t) RTAioMgrRelease(RTAIOMGR hAioMgr)
1199{
1200 PRTAIOMGRINT pThis = hAioMgr;
1201 if (pThis == NIL_RTAIOMGR)
1202 return 0;
1203 AssertPtrReturn(pThis, UINT32_MAX);
1204 AssertReturn(pThis->u32Magic == RTAIOMGR_MAGIC, UINT32_MAX);
1205
1206 uint32_t cRefs = ASMAtomicDecU32(&pThis->cRefs);
1207 AssertMsg(cRefs < _1G, ("%#x %p\n", cRefs, pThis));
1208 if (cRefs == 0)
1209 rtAioMgrDestroy(pThis);
1210 return cRefs;
1211}
1212
1213RTDECL(int) RTAioMgrFileCreate(RTAIOMGR hAioMgr, RTFILE hFile, PFNRTAIOMGRREQCOMPLETE pfnReqComplete,
1214 void *pvUser, PRTAIOMGRFILE phAioMgrFile)
1215{
1216 int rc = VINF_SUCCESS;
1217 PRTAIOMGRFILEINT pThis;
1218
1219 AssertReturn(hAioMgr != NIL_RTAIOMGR, VERR_INVALID_HANDLE);
1220 AssertReturn(hFile != NIL_RTFILE, VERR_INVALID_HANDLE);
1221 AssertPtrReturn(pfnReqComplete, VERR_INVALID_POINTER);
1222 AssertPtrReturn(phAioMgrFile, VERR_INVALID_POINTER);
1223
1224 pThis = (PRTAIOMGRFILEINT)RTMemAllocZ(sizeof(RTAIOMGRFILEINT));
1225 if (pThis)
1226 {
1227 pThis->u32Magic = RTAIOMGRFILE_MAGIC;
1228 pThis->cRefs = 1;
1229 pThis->hFile = hFile;
1230 pThis->pAioMgr = hAioMgr;
1231 pThis->pvUser = pvUser;
1232 pThis->pfnReqCompleted = pfnReqComplete;
1233 RTQueueAtomicInit(&pThis->QueueReqs);
1234 RTListInit(&pThis->AioMgr.ListWaitingReqs);
1235 RTAioMgrRetain(hAioMgr);
1236 rc = RTFileAioCtxAssociateWithFile(pThis->pAioMgr->hAioCtx, hFile);
1237 if (RT_FAILURE(rc))
1238 rtAioMgrFileDestroy(pThis);
1239 else
1240 rtAioMgrAddFile(pThis->pAioMgr, pThis);
1241 }
1242 else
1243 rc = VERR_NO_MEMORY;
1244
1245 if (RT_SUCCESS(rc))
1246 *phAioMgrFile = pThis;
1247
1248 return rc;
1249}
1250
1251RTDECL(uint32_t) RTAioMgrFileRetain(RTAIOMGRFILE hAioMgrFile)
1252{
1253 PRTAIOMGRFILEINT pThis = hAioMgrFile;
1254 AssertReturn(hAioMgrFile != NIL_RTAIOMGRFILE, UINT32_MAX);
1255 AssertPtrReturn(pThis, UINT32_MAX);
1256 AssertReturn(pThis->u32Magic == RTAIOMGRFILE_MAGIC, UINT32_MAX);
1257
1258 uint32_t cRefs = ASMAtomicIncU32(&pThis->cRefs);
1259 AssertMsg(cRefs > 1 && cRefs < _1G, ("%#x %p\n", cRefs, pThis));
1260 return cRefs;
1261}
1262
1263RTDECL(uint32_t) RTAioMgrFileRelease(RTAIOMGRFILE hAioMgrFile)
1264{
1265 PRTAIOMGRFILEINT pThis = hAioMgrFile;
1266 if (pThis == NIL_RTAIOMGRFILE)
1267 return 0;
1268 AssertPtrReturn(pThis, UINT32_MAX);
1269 AssertReturn(pThis->u32Magic == RTAIOMGRFILE_MAGIC, UINT32_MAX);
1270
1271 uint32_t cRefs = ASMAtomicDecU32(&pThis->cRefs);
1272 AssertMsg(cRefs < _1G, ("%#x %p\n", cRefs, pThis));
1273 if (cRefs == 0)
1274 rtAioMgrFileDestroy(pThis);
1275 return cRefs;
1276}
1277
1278RTDECL(void *) RTAioMgrFileGetUser(RTAIOMGRFILE hAioMgrFile)
1279{
1280 PRTAIOMGRFILEINT pThis = hAioMgrFile;
1281
1282 AssertPtrReturn(pThis, NULL);
1283 return pThis->pvUser;
1284}
1285
1286RTDECL(int) RTAioMgrFileRead(RTAIOMGRFILE hAioMgrFile, RTFOFF off,
1287 PRTSGBUF pSgBuf, size_t cbRead, void *pvUser)
1288{
1289 return rtAioMgrFileIoReqCreate(hAioMgrFile, off, pSgBuf, cbRead, pvUser,
1290 RTAIOMGRREQTYPE_READ);
1291}
1292
1293RTDECL(int) RTAioMgrFileWrite(RTAIOMGRFILE hAioMgrFile, RTFOFF off,
1294 PRTSGBUF pSgBuf, size_t cbWrite, void *pvUser)
1295{
1296 return rtAioMgrFileIoReqCreate(hAioMgrFile, off, pSgBuf, cbWrite, pvUser,
1297 RTAIOMGRREQTYPE_WRITE);
1298}
1299
1300RTDECL(int) RTAioMgrFileFlush(RTAIOMGRFILE hAioMgrFile, void *pvUser)
1301{
1302 PRTAIOMGRFILEINT pFile = hAioMgrFile;
1303 PRTAIOMGRINT pAioMgr;
1304
1305 AssertPtrReturn(pFile, VERR_INVALID_HANDLE);
1306
1307 pAioMgr = pFile->pAioMgr;
1308
1309 PRTAIOMGRREQ pReq = rtAioMgrReqAlloc(pAioMgr);
1310 if (RT_UNLIKELY(!pReq))
1311 return VERR_NO_MEMORY;
1312
1313 pReq->pFile = pFile;
1314 pReq->enmType = RTAIOMGRREQTYPE_FLUSH;
1315 pReq->pvUser = pvUser;
1316 rtAioMgrFileQueueReq(pFile, pReq);
1317
1318 return VERR_FILE_AIO_IN_PROGRESS;
1319}
1320
Note: See TracBrowser for help on using the repository browser.

© 2024 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette