Changeset 46233 in vbox for trunk/src/VBox/Runtime
- Timestamp:
- May 23, 2013 12:56:17 PM (12 years ago)
- svn:sync-xref-src-repo-rev:
- 85962
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/src/VBox/Runtime/common/misc/aiomgr.cpp
r46185 r46233 34 34 #include <iprt/mem.h> 35 35 #include <iprt/file.h> 36 #include <iprt/list.h> 36 37 #include <iprt/thread.h> 37 38 #include <iprt/assert.h> 39 #include <iprt/string.h> 38 40 #include <iprt/critsect.h> 41 #include <iprt/memcache.h> 39 42 #include <iprt/semaphore.h> 40 43 #include <iprt/queueatomic.h> … … 56 59 /** Invalid tye */ 57 60 RTAIOMGREVENT_INVALID = 0, 58 /** An endpoint is added to the manager. */ 59 RTAIOMGREVENT_ADD_FILE, 60 /** An endpoint is about to be closed. */ 61 RTAIOMGREVENT_CLOSE_FILE, 61 /** No event pending. */ 62 RTAIOMGREVENT_NO_EVENT, 63 /** A file is added to the manager. */ 64 RTAIOMGREVENT_FILE_ADD, 65 /** A file is about to be closed. */ 66 RTAIOMGREVENT_FILE_CLOSE, 67 /** The async I/O manager is shut down. */ 68 RTAIOMGREVENT_SHUTDOWN, 62 69 /** 32bit hack */ 63 70 RTAIOMGREVENT_32BIT_HACK = 0x7fffffff … … 73 80 /** Reference count. */ 74 81 volatile uint32_t cRefs; 75 /** Running flag. */76 volatile bool fRunning;77 82 /** Async I/O context handle. */ 78 83 RTFILEAIOCTX hAioCtx; … … 80 85 RTTHREAD hThread; 81 86 /** List of files assigned to this manager. */ 82 PRTAIOMGRFILEINT pFilesHead;87 RTLISTANCHOR ListFiles; 83 88 /** Number of requests active currently. */ 84 89 unsigned cReqsActive; 85 90 /** Number of maximum requests active. */ 86 91 uint32_t cReqsActiveMax; 87 /** Pointer to an array of free async I/O request handles. */ 88 RTFILEAIOREQ *pahReqsFree; 89 /** Index of the next free entry in the cache. */ 90 uint32_t iFreeEntry; 91 /** Size of the array. */ 92 unsigned cReqEntries; 92 /** Memory cache for requests. */ 93 RTMEMCACHE hMemCacheReqs; 93 94 /** Critical section protecting the blocking event handling. */ 94 95 RTCRITSECT CritSectBlockingEvent; … … 96 97 * The caller waits on it until the async I/O manager 97 98 * finished processing the event. */ 98 RTSEMEVENT EventSemBlock; 99 /** Flag whether a blocking event is pending and needs 100 * processing by the I/O manager. */ 101 volatile bool fBlockingEventPending; 99 RTSEMEVENT hEventSemBlock; 102 100 /** Blocking event type */ 103 101 volatile RTAIOMGREVENT enmBlockingEvent; … … 123 121 /** Reference count. */ 124 122 volatile uint32_t cRefs; 123 /** Flags. */ 124 uint32_t fFlags; 125 125 /** File handle. */ 126 126 RTFILE hFile; … … 129 129 /** Work queue for new requests. */ 130 130 RTQUEUEATOMIC QueueReqs; 131 /** Completion callback for this file. */ 132 PFNRTAIOMGRREQCOMPLETE pfnReqCompleted; 131 133 /** Data for exclusive use by the assigned async I/O manager. */ 132 134 struct 133 135 { 134 /** Pointer to the next file assigned to the manager. */ 135 PRTAIOMGRFILEINT pNext; 136 #if 0 137 /** List of pending requests (not submitted due to usage restrictions 138 * or a pending flush request) */ 139 R3PTRTYPE(PPDMACTASKFILE) pReqsPendingHead; 140 /** Tail of pending requests. */ 141 R3PTRTYPE(PPDMACTASKFILE) pReqsPendingTail; 142 /** Tree of currently locked ranges. 143 * If a write task is enqueued the range gets locked and any other 144 * task writing to that range has to wait until the task completes. 145 */ 146 PAVLRFOFFTREE pTreeRangesLocked; 147 /** Number of requests with a range lock active. */ 148 unsigned cLockedReqsActive; 136 /** List node of assigned files for a async I/O manager. */ 137 RTLISTNODE NodeAioMgrFiles; 138 /** List of requests waiting for submission. */ 139 RTLISTANCHOR ListWaitingReqs; 149 140 /** Number of requests currently being processed for this endpoint 150 141 * (excluded flush requests). */ 151 unsigned cRequestsActive; 152 #endif 142 unsigned cReqsActive; 153 143 } AioMgr; 154 144 } RTAIOMGRFILEINT; 145 146 /** Flag whether the file is closed. */ 147 #define RTAIOMGRFILE_FLAGS_CLOSING RT_BIT_32(1) 148 149 /** 150 * Request type. 151 */ 152 typedef enum RTAIOMGRREQTYPE 153 { 154 /** Invalid request type. */ 155 RTAIOMGRREQTYPE_INVALID = 0, 156 /** Read reques type. */ 157 RTAIOMGRREQTYPE_READ, 158 /** Write request. */ 159 RTAIOMGRREQTYPE_WRITE, 160 /** Flush request. */ 161 RTAIOMGRREQTYPE_FLUSH, 162 /** Prefetech request. */ 163 RTAIOMGRREQTYPE_PREFETCH, 164 /** 32bit hack. */ 165 RTAIOMGRREQTYPE_32BIT_HACK = 0x7fffffff 166 } RTAIOMGRREQTYPE; 167 /** Pointer to a reques type. */ 168 typedef RTAIOMGRREQTYPE *PRTAIOMGRREQTYPE; 169 170 /** 171 * Async I/O manager request. 172 */ 173 typedef struct RTAIOMGRREQ 174 { 175 /** Atomic queue work item. */ 176 RTQUEUEATOMICITEM WorkItem; 177 /** Node for a waiting list. */ 178 RTLISTNODE NodeWaitingList; 179 /** Request flags. */ 180 uint32_t fFlags; 181 /** Transfer type. */ 182 RTAIOMGRREQTYPE enmType; 183 /** Assigned file request. */ 184 RTFILEAIOREQ hReqIo; 185 /** File the request belongs to. */ 186 PRTAIOMGRFILEINT pFile; 187 /** Opaque user data. */ 188 void *pvUser; 189 /** Start offset */ 190 RTFOFF off; 191 /** Data segment. */ 192 RTSGSEG DataSeg; 193 /** When non-zero the segment uses a bounce buffer because the provided buffer 194 * doesn't meet host requirements. */ 195 size_t cbBounceBuffer; 196 /** Pointer to the used bounce buffer if any. */ 197 void *pvBounceBuffer; 198 /** Start offset in the bounce buffer to copy from. */ 199 uint32_t offBounceBuffer; 200 } RTAIOMGRREQ; 201 /** Pointer to a I/O manager request. */ 202 typedef RTAIOMGRREQ *PRTAIOMGRREQ; 203 204 /** Flag whether the request was prepared already. */ 205 #define RTAIOMGRREQ_FLAGS_PREPARED RT_BIT_32(0) 155 206 156 207 /******************************************************************************* … … 179 230 *******************************************************************************/ 180 231 232 static int rtAioMgrReqsEnqueue(PRTAIOMGRINT pThis, PRTAIOMGRFILEINT pFile, 233 PRTFILEAIOREQ pahReqs, unsigned cReqs); 234 235 /** 236 * Removes an endpoint from the currently assigned manager. 237 * 238 * @returns TRUE if there are still requests pending on the current manager for this endpoint. 239 * FALSE otherwise. 240 * @param pEndpointRemove The endpoint to remove. 241 */ 242 static bool rtAioMgrFileRemove(PRTAIOMGRFILEINT pFile) 243 { 244 /* Make sure that there is no request pending on this manager for the endpoint. */ 245 if (!pFile->AioMgr.cReqsActive) 246 { 247 RTListNodeRemove(&pFile->AioMgr.NodeAioMgrFiles); 248 return false; 249 } 250 251 return true; 252 } 253 254 /** 255 * Allocate a new I/O request. 256 * 257 * @returns Pointer to the allocated request or NULL if out of memory. 258 * @param pThis The async I/O manager instance. 259 */ 260 static PRTAIOMGRREQ rtAioMgrReqAlloc(PRTAIOMGRINT pThis) 261 { 262 return (PRTAIOMGRREQ)RTMemCacheAlloc(pThis->hMemCacheReqs); 263 } 264 265 /** 266 * Frees an I/O request. 267 * 268 * @returns nothing. 269 * @param pThis The async I/O manager instance. 270 * @param pReq The request to free. 271 */ 272 static void rtAioMgrReqFree(PRTAIOMGRINT pThis, PRTAIOMGRREQ pReq) 273 { 274 if (pReq->cbBounceBuffer) 275 { 276 AssertPtr(pReq->pvBounceBuffer); 277 RTMemPageFree(pReq->pvBounceBuffer, pReq->cbBounceBuffer); 278 pReq->pvBounceBuffer = NULL; 279 pReq->cbBounceBuffer = 0; 280 } 281 pReq->fFlags = 0; 282 RTAioMgrFileRelease(pReq->pFile); 283 RTMemCacheFree(pThis->hMemCacheReqs, pReq); 284 } 285 286 static void rtAioMgrReqCompleteRc(PRTAIOMGRINT pThis, PRTAIOMGRREQ pReq, 287 int rcReq, size_t cbTransfered) 288 { 289 int rc = VINF_SUCCESS; 290 PRTAIOMGRFILEINT pFile; 291 292 pFile = pReq->pFile; 293 pThis->cReqsActive--; 294 pFile->AioMgr.cReqsActive--; 295 296 /* 297 * It is possible that the request failed on Linux with kernels < 2.6.23 298 * if the passed buffer was allocated with remap_pfn_range or if the file 299 * is on an NFS endpoint which does not support async and direct I/O at the same time. 300 * The endpoint will be migrated to a failsafe manager in case a request fails. 301 */ 302 if (RT_FAILURE(rcReq)) 303 { 304 pFile->pfnReqCompleted(pFile, rcReq, pReq->pvUser); 305 rtAioMgrReqFree(pThis, pReq); 306 } 307 else 308 { 309 /* 310 * Restart an incomplete transfer. 311 * This usually means that the request will return an error now 312 * but to get the cause of the error (disk full, file too big, I/O error, ...) 313 * the transfer needs to be continued. 314 */ 315 if (RT_UNLIKELY( cbTransfered < pReq->DataSeg.cbSeg 316 || ( pReq->cbBounceBuffer 317 && cbTransfered < pReq->cbBounceBuffer))) 318 { 319 RTFOFF offStart; 320 size_t cbToTransfer; 321 uint8_t *pbBuf = NULL; 322 323 Assert(cbTransfered % 512 == 0); 324 325 if (pReq->cbBounceBuffer) 326 { 327 AssertPtr(pReq->pvBounceBuffer); 328 offStart = (pReq->off & ~((RTFOFF)512-1)) + cbTransfered; 329 cbToTransfer = pReq->cbBounceBuffer - cbTransfered; 330 pbBuf = (uint8_t *)pReq->pvBounceBuffer + cbTransfered; 331 } 332 else 333 { 334 Assert(!pReq->pvBounceBuffer); 335 offStart = pReq->off + cbTransfered; 336 cbToTransfer = pReq->DataSeg.cbSeg - cbTransfered; 337 pbBuf = (uint8_t *)pReq->DataSeg.pvSeg + cbTransfered; 338 } 339 340 if ( pReq->enmType == RTAIOMGRREQTYPE_PREFETCH 341 || pReq->enmType == RTAIOMGRREQTYPE_READ) 342 { 343 rc = RTFileAioReqPrepareRead(pReq->hReqIo, pFile->hFile, offStart, 344 pbBuf, cbToTransfer, pReq); 345 } 346 else 347 { 348 AssertMsg(pReq->enmType == RTAIOMGRREQTYPE_WRITE, 349 ("Invalid transfer type\n")); 350 rc = RTFileAioReqPrepareWrite(pReq->hReqIo, pFile->hFile, offStart, 351 pbBuf, cbToTransfer, pReq); 352 } 353 AssertRC(rc); 354 355 rc = rtAioMgrReqsEnqueue(pThis, pFile, &pReq->hReqIo, 1); 356 AssertMsg(RT_SUCCESS(rc) || (rc == VERR_FILE_AIO_INSUFFICIENT_RESSOURCES), 357 ("Unexpected return code rc=%Rrc\n", rc)); 358 } 359 else if (pReq->enmType == RTAIOMGRREQTYPE_PREFETCH) 360 { 361 Assert(pReq->cbBounceBuffer); 362 pReq->enmType = RTAIOMGRREQTYPE_WRITE; 363 364 memcpy(((uint8_t *)pReq->pvBounceBuffer) + pReq->offBounceBuffer, 365 pReq->DataSeg.pvSeg, 366 pReq->DataSeg.cbSeg); 367 368 /* Write it now. */ 369 RTFOFF offStart = pReq->off & ~(RTFOFF)(512-1); 370 size_t cbToTransfer = RT_ALIGN_Z(pReq->DataSeg.cbSeg + (pReq->off - offStart), 512); 371 372 rc = RTFileAioReqPrepareWrite(pReq->hReqIo, pFile->hFile, 373 offStart, pReq->pvBounceBuffer, cbToTransfer, pReq); 374 AssertRC(rc); 375 rc = rtAioMgrReqsEnqueue(pThis, pFile, &pReq->hReqIo, 1); 376 AssertMsg(RT_SUCCESS(rc) || (rc == VERR_FILE_AIO_INSUFFICIENT_RESSOURCES), 377 ("Unexpected return code rc=%Rrc\n", rc)); 378 } 379 else 380 { 381 if (RT_SUCCESS(rc) && pReq->cbBounceBuffer) 382 { 383 if (pReq->enmType == RTAIOMGRREQTYPE_READ) 384 memcpy(pReq->DataSeg.pvSeg, 385 ((uint8_t *)pReq->pvBounceBuffer) + pReq->offBounceBuffer, 386 pReq->DataSeg.cbSeg); 387 } 388 389 /* Call completion callback */ 390 pFile->pfnReqCompleted(pFile, rcReq, pReq->pvUser); 391 rtAioMgrReqFree(pThis, pReq); 392 } 393 } /* request completed successfully */ 394 } 395 396 /** 397 * Wrapper around rtAioMgrReqCompleteRc(). 398 */ 399 static void rtAioMgrReqComplete(PRTAIOMGRINT pThis, RTFILEAIOREQ hReq) 400 { 401 size_t cbTransfered = 0; 402 int rcReq = RTFileAioReqGetRC(hReq, &cbTransfered); 403 PRTAIOMGRREQ pReq = (PRTAIOMGRREQ)RTFileAioReqGetUser(hReq); 404 405 rtAioMgrReqCompleteRc(pThis, pReq, rcReq, cbTransfered); 406 } 407 408 /** 409 * Wrapper around RTFIleAioCtxSubmit() which is also doing error handling. 410 */ 411 static int rtAioMgrReqsEnqueue(PRTAIOMGRINT pThis, PRTAIOMGRFILEINT pFile, 412 PRTFILEAIOREQ pahReqs, unsigned cReqs) 413 { 414 pThis->cReqsActive += cReqs; 415 pFile->AioMgr.cReqsActive += cReqs; 416 417 int rc = RTFileAioCtxSubmit(pThis->hAioCtx, pahReqs, cReqs); 418 if (RT_FAILURE(rc)) 419 { 420 if (rc == VERR_FILE_AIO_INSUFFICIENT_RESSOURCES) 421 { 422 /* Append any not submitted task to the waiting list. */ 423 for (size_t i = 0; i < cReqs; i++) 424 { 425 int rcReq = RTFileAioReqGetRC(pahReqs[i], NULL); 426 427 if (rcReq != VERR_FILE_AIO_IN_PROGRESS) 428 { 429 PRTAIOMGRREQ pReq = (PRTAIOMGRREQ)RTFileAioReqGetUser(pahReqs[i]); 430 431 Assert(pReq->hReqIo == pahReqs[i]); 432 RTListAppend(&pFile->AioMgr.ListWaitingReqs, &pReq->NodeWaitingList); 433 pThis->cReqsActive--; 434 pFile->AioMgr.cReqsActive--; 435 } 436 } 437 438 pThis->cReqsActiveMax = pThis->cReqsActive; 439 rc = VINF_SUCCESS; 440 } 441 else /* Another kind of error happened (full disk, ...) */ 442 { 443 /* An error happened. Find out which one caused the error and resubmit all other tasks. */ 444 for (size_t i = 0; i < cReqs; i++) 445 { 446 PRTAIOMGRREQ pReq = (PRTAIOMGRREQ)RTFileAioReqGetUser(pahReqs[i]); 447 int rcReq = RTFileAioReqGetRC(pahReqs[i], NULL); 448 449 if (rcReq == VERR_FILE_AIO_NOT_SUBMITTED) 450 { 451 /* We call ourself again to do any error handling which might come up now. */ 452 rc = rtAioMgrReqsEnqueue(pThis, pFile, &pahReqs[i], 1); 453 AssertRC(rc); 454 } 455 else if (rcReq != VERR_FILE_AIO_IN_PROGRESS) 456 rtAioMgrReqCompleteRc(pThis, pReq, rcReq, 0); 457 } 458 } 459 } 460 461 return VINF_SUCCESS; 462 } 463 464 /** 465 * Adds a list of requests to the waiting list. 466 * 467 * @returns nothing. 468 * @param pFile The file instance to add the requests to. 469 * @param pReqsHead The head of the request list to add. 470 */ 471 static void rtAioMgrFileAddReqsToWaitingList(PRTAIOMGRFILEINT pFile, PRTAIOMGRREQ pReqsHead) 472 { 473 while (pReqsHead) 474 { 475 PRTAIOMGRREQ pReqCur = pReqsHead; 476 477 pReqsHead = (PRTAIOMGRREQ)pReqsHead->WorkItem.pNext; 478 pReqsHead->WorkItem.pNext = NULL; 479 RTListAppend(&pFile->AioMgr.ListWaitingReqs, &pReqCur->NodeWaitingList); 480 } 481 } 482 483 /** 484 * Prepare the native I/o request ensuring that all alignment prerequisites of 485 * the host are met. 486 * 487 * @returns IPRT statuse code. 488 * @param pFile The file instance data. 489 * @param pReq The request to prepare. 490 */ 491 static int rtAioMgrReqPrepareNonBuffered(PRTAIOMGRFILEINT pFile, PRTAIOMGRREQ pReq) 492 { 493 int rc = VINF_SUCCESS; 494 RTFOFF offStart = pReq->off & ~(RTFOFF)(512-1); 495 size_t cbToTransfer = RT_ALIGN_Z(pReq->DataSeg.cbSeg + (pReq->off - offStart), 512); 496 void *pvBuf = pReq->DataSeg.pvSeg; 497 bool fAlignedReq = cbToTransfer == pReq->DataSeg.cbSeg 498 && offStart == pReq->off; 499 500 /* 501 * Check if the alignment requirements are met. 502 * Offset, transfer size and buffer address 503 * need to be on a 512 boundary. 504 */ 505 if ( !fAlignedReq 506 /** @todo: || ((pEpClassFile->uBitmaskAlignment & (RTR3UINTPTR)pvBuf) != (RTR3UINTPTR)pvBuf) */) 507 { 508 /* Create bounce buffer. */ 509 pReq->cbBounceBuffer = cbToTransfer; 510 511 AssertMsg(pReq->off >= offStart, ("Overflow in calculation off=%llu offStart=%llu\n", 512 pReq->off, offStart)); 513 pReq->offBounceBuffer = pReq->off - offStart; 514 515 /** @todo: I think we need something like a RTMemAllocAligned method here. 516 * Current assumption is that the maximum alignment is 4096byte 517 * (GPT disk on Windows) 518 * so we can use RTMemPageAlloc here. 519 */ 520 pReq->pvBounceBuffer = RTMemPageAlloc(cbToTransfer); 521 if (RT_LIKELY(pReq->pvBounceBuffer)) 522 { 523 pvBuf = pReq->pvBounceBuffer; 524 525 if (pReq->enmType == RTAIOMGRREQTYPE_WRITE) 526 { 527 if ( RT_UNLIKELY(cbToTransfer != pReq->DataSeg.cbSeg) 528 || RT_UNLIKELY(offStart != pReq->off)) 529 { 530 /* We have to fill the buffer first before we can update the data. */ 531 pReq->enmType = RTAIOMGRREQTYPE_WRITE; 532 } 533 else 534 memcpy(pvBuf, pReq->DataSeg.pvSeg, pReq->DataSeg.cbSeg); 535 } 536 } 537 else 538 rc = VERR_NO_MEMORY; 539 } 540 else 541 pReq->cbBounceBuffer = 0; 542 543 if (RT_SUCCESS(rc)) 544 { 545 if (pReq->enmType == RTAIOMGRREQTYPE_WRITE) 546 { 547 rc = RTFileAioReqPrepareWrite(pReq->hReqIo, pFile->hFile, 548 offStart, pvBuf, cbToTransfer, pReq); 549 } 550 else /* Read or prefetch request. */ 551 rc = RTFileAioReqPrepareRead(pReq->hReqIo, pFile->hFile, 552 offStart, pvBuf, cbToTransfer, pReq); 553 AssertRC(rc); 554 pReq->fFlags |= RTAIOMGRREQ_FLAGS_PREPARED; 555 } 556 557 return rc; 558 } 559 560 /** 561 * Prepare a new request for enqueuing. 562 * 563 * @returns IPRT status code. 564 * @param pReq The request to prepare. 565 * @param phReqIo Where to store the handle to the native I/O request on success. 566 */ 567 static int rtAioMgrPrepareReq(PRTAIOMGRREQ pReq, PRTFILEAIOREQ phReqIo) 568 { 569 int rc = VINF_SUCCESS; 570 PRTAIOMGRFILEINT pFile = pReq->pFile; 571 572 switch (pReq->enmType) 573 { 574 case RTAIOMGRREQTYPE_FLUSH: 575 { 576 rc = RTFileAioReqPrepareFlush(pReq->hReqIo, pFile->hFile, pReq); 577 break; 578 } 579 case RTAIOMGRREQTYPE_READ: 580 case RTAIOMGRREQTYPE_WRITE: 581 { 582 rc = rtAioMgrReqPrepareNonBuffered(pFile, pReq); 583 break; 584 } 585 default: 586 AssertMsgFailed(("Invalid transfer type %d\n", pReq->enmType)); 587 } /* switch transfer type */ 588 589 if (RT_SUCCESS(rc)) 590 *phReqIo = pReq->hReqIo; 591 592 return rc; 593 } 594 595 /** 596 * Prepare newly submitted requests for processing. 597 * 598 * @returns IPRT status code 599 * @param pThis The async I/O manager instance data. 600 * @param pFile The file instance. 601 * @param pReqsNew The list of new requests to prepare. 602 */ 603 static int rtAioMgrPrepareNewReqs(PRTAIOMGRINT pThis, 604 PRTAIOMGRFILEINT pFile, 605 PRTAIOMGRREQ pReqsNew) 606 { 607 RTFILEAIOREQ apReqs[20]; 608 unsigned cRequests = 0; 609 int rc = VINF_SUCCESS; 610 611 /* Go through the list and queue the requests. */ 612 while ( pReqsNew 613 && (pThis->cReqsActive + cRequests < pThis->cReqsActiveMax) 614 && RT_SUCCESS(rc)) 615 { 616 PRTAIOMGRREQ pCurr = pReqsNew; 617 pReqsNew = (PRTAIOMGRREQ)pReqsNew->WorkItem.pNext; 618 619 pCurr->WorkItem.pNext = NULL; 620 AssertMsg(VALID_PTR(pCurr->pFile) && (pCurr->pFile == pFile), 621 ("Files do not match\n")); 622 AssertMsg(!(pCurr->fFlags & RTAIOMGRREQ_FLAGS_PREPARED), 623 ("Request on the new list is already prepared\n")); 624 625 rc = rtAioMgrPrepareReq(pCurr, &apReqs[cRequests]); 626 if (RT_FAILURE(rc)) 627 rtAioMgrReqCompleteRc(pThis, pCurr, rc, 0); 628 else 629 cRequests++; 630 631 /* Queue the requests if the array is full. */ 632 if (cRequests == RT_ELEMENTS(apReqs)) 633 { 634 rc = rtAioMgrReqsEnqueue(pThis, pFile, apReqs, cRequests); 635 cRequests = 0; 636 AssertMsg(RT_SUCCESS(rc) || (rc == VERR_FILE_AIO_INSUFFICIENT_RESSOURCES), 637 ("Unexpected return code\n")); 638 } 639 } 640 641 if (cRequests) 642 { 643 rc = rtAioMgrReqsEnqueue(pThis, pFile, apReqs, cRequests); 644 AssertMsg(RT_SUCCESS(rc) || (rc == VERR_FILE_AIO_INSUFFICIENT_RESSOURCES), 645 ("Unexpected return code rc=%Rrc\n", rc)); 646 } 647 648 if (pReqsNew) 649 { 650 /* Add the rest of the tasks to the pending list */ 651 rtAioMgrFileAddReqsToWaitingList(pFile, pReqsNew); 652 } 653 654 /* Insufficient resources are not fatal. */ 655 if (rc == VERR_FILE_AIO_INSUFFICIENT_RESSOURCES) 656 rc = VINF_SUCCESS; 657 658 return rc; 659 } 660 661 /** 662 * Queues waiting requests. 663 * 664 * @returns IPRT status code. 665 * @param pThis The async I/O manager instance data. 666 * @param pFile The file to get the requests from. 667 */ 668 static int rtAioMgrQueueWaitingReqs(PRTAIOMGRINT pThis, PRTAIOMGRFILEINT pFile) 669 { 670 RTFILEAIOREQ apReqs[20]; 671 unsigned cRequests = 0; 672 int rc = VINF_SUCCESS; 673 PRTAIOMGRREQ pReqIt; 674 PRTAIOMGRREQ pReqItNext; 675 676 /* Go through the list and queue the requests. */ 677 RTListForEachSafe(&pFile->AioMgr.ListWaitingReqs, pReqIt, pReqItNext, RTAIOMGRREQ, NodeWaitingList) 678 { 679 RTListNodeRemove(&pReqIt->NodeWaitingList); 680 AssertMsg(VALID_PTR(pReqIt->pFile) && (pReqIt->pFile == pFile), 681 ("Files do not match\n")); 682 683 if (!(pReqIt->fFlags & RTAIOMGRREQ_FLAGS_PREPARED)) 684 { 685 rc = rtAioMgrPrepareReq(pReqIt, &apReqs[cRequests]); 686 if (RT_FAILURE(rc)) 687 rtAioMgrReqCompleteRc(pThis, pReqIt, rc, 0); 688 else 689 cRequests++; 690 } 691 else 692 { 693 apReqs[cRequests] = pReqIt->hReqIo; 694 cRequests++; 695 } 696 697 /* Queue the requests if the array is full. */ 698 if (cRequests == RT_ELEMENTS(apReqs)) 699 { 700 rc = rtAioMgrReqsEnqueue(pThis, pFile, apReqs, cRequests); 701 cRequests = 0; 702 AssertMsg(RT_SUCCESS(rc) || (rc == VERR_FILE_AIO_INSUFFICIENT_RESSOURCES), 703 ("Unexpected return code\n")); 704 } 705 } 706 707 if (cRequests) 708 { 709 rc = rtAioMgrReqsEnqueue(pThis, pFile, apReqs, cRequests); 710 AssertMsg(RT_SUCCESS(rc) || (rc == VERR_FILE_AIO_INSUFFICIENT_RESSOURCES), 711 ("Unexpected return code rc=%Rrc\n", rc)); 712 } 713 714 /* Insufficient resources are not fatal. */ 715 if (rc == VERR_FILE_AIO_INSUFFICIENT_RESSOURCES) 716 rc = VINF_SUCCESS; 717 718 return rc; 719 } 720 721 /** 722 * Adds all pending requests for the given file. 723 * 724 * @returns IPRT status code. 725 * @param pThis The async I/O manager instance data. 726 * @param pFile The file to get the requests from. 727 */ 728 static int rtAioMgrQueueReqs(PRTAIOMGRINT pThis, PRTAIOMGRFILEINT pFile) 729 { 730 int rc = VINF_SUCCESS; 731 PRTAIOMGRFILEINT pReqsHead = NULL; 732 733 /* Check the pending list first */ 734 if (!RTListIsEmpty(&pFile->AioMgr.ListWaitingReqs)) 735 rc = rtAioMgrQueueWaitingReqs(pThis, pFile); 736 737 if ( RT_SUCCESS(rc) 738 && RTListIsEmpty(&pFile->AioMgr.ListWaitingReqs)) 739 { 740 PRTAIOMGRREQ pReqsNew = (PRTAIOMGRREQ)RTQueueAtomicRemoveAll(&pFile->QueueReqs); 741 742 if (pReqsNew) 743 { 744 rc = rtAioMgrPrepareNewReqs(pThis, pFile, pReqsNew); 745 AssertRC(rc); 746 } 747 } 748 749 return rc; 750 } 751 752 /** 753 * Checks all files for new requests. 754 * 755 * @returns IPRT status code. 756 * @param pThis The I/O manager instance data. 757 */ 758 static int rtAioMgrCheckFiles(PRTAIOMGRINT pThis) 759 { 760 int rc = VINF_SUCCESS; 761 PRTAIOMGRFILEINT pIt; 762 763 RTListForEach(&pThis->ListFiles, pIt, RTAIOMGRFILEINT, AioMgr.NodeAioMgrFiles) 764 { 765 rc = rtAioMgrQueueReqs(pThis, pIt); 766 if (RT_FAILURE(rc)) 767 return rc; 768 } 769 770 return rc; 771 } 772 773 /** 774 * Process a blocking event from the outside. 775 * 776 * @returns IPRT status code. 777 * @param pThis The async I/O manager instance data. 778 */ 779 static int rtAioMgrProcessBlockingEvent(PRTAIOMGRINT pThis) 780 { 781 int rc = VINF_SUCCESS; 782 bool fNotifyWaiter = false; 783 784 switch (pThis->enmBlockingEvent) 785 { 786 case RTAIOMGREVENT_NO_EVENT: 787 /* Nothing to do. */ 788 break; 789 case RTAIOMGREVENT_FILE_ADD: 790 { 791 PRTAIOMGRFILEINT pFile = ASMAtomicReadPtrT(&pThis->BlockingEventData.pFileAdd, PRTAIOMGRFILEINT); 792 AssertMsg(VALID_PTR(pFile), ("Adding file event without a file to add\n")); 793 794 RTListAppend(&pThis->ListFiles, &pFile->AioMgr.NodeAioMgrFiles); 795 fNotifyWaiter = true; 796 break; 797 } 798 case RTAIOMGREVENT_FILE_CLOSE: 799 { 800 PRTAIOMGRFILEINT pFile = ASMAtomicReadPtrT(&pThis->BlockingEventData.pFileClose, PRTAIOMGRFILEINT); 801 AssertMsg(VALID_PTR(pFile), ("Close file event without a file to close\n")); 802 803 if (!(pFile->fFlags & RTAIOMGRFILE_FLAGS_CLOSING)) 804 { 805 /* Make sure all requests finished. Process the queues a last time first. */ 806 rc = rtAioMgrQueueReqs(pThis, pFile); 807 AssertRC(rc); 808 809 pFile->fFlags |= RTAIOMGRFILE_FLAGS_CLOSING; 810 fNotifyWaiter = !rtAioMgrFileRemove(pFile); 811 } 812 else if (!pFile->AioMgr.cReqsActive) 813 fNotifyWaiter = true; 814 break; 815 } 816 case RTAIOMGREVENT_SHUTDOWN: 817 { 818 if (!pThis->cReqsActive) 819 fNotifyWaiter = true; 820 break; 821 } 822 default: 823 AssertReleaseMsgFailed(("Invalid event type %d\n", pThis->enmBlockingEvent)); 824 } 825 826 if (fNotifyWaiter) 827 { 828 /* Release the waiting thread. */ 829 rc = RTSemEventSignal(pThis->hEventSemBlock); 830 AssertRC(rc); 831 } 832 833 return rc; 834 } 835 181 836 /** 182 837 * async I/O manager worker loop. … … 190 845 PRTAIOMGRINT pThis = (PRTAIOMGRINT)pvUser; 191 846 bool fRunning = true; 847 int rc = VINF_SUCCESS; 192 848 193 849 do … … 195 851 uint32_t cReqsCompleted = 0; 196 852 RTFILEAIOREQ ahReqsCompleted[32]; 197 int rc = RTFileAioCtxWait(pThis->hAioCtx, 1, RT_INDEFINITE_WAIT, &ahReqsCompleted[0], 198 RT_ELEMENTS(ahReqsCompleted), &cReqsCompleted); 199 if (RT_FAILURE(rc) && rc != VERR_INTERRUPTED) 200 { 201 } 202 fRunning = ASMAtomicUoReadBool(&pThis->fRunning); 203 } while (fRunning); 204 205 return VINF_SUCCESS; 853 rc = RTFileAioCtxWait(pThis->hAioCtx, 1, RT_INDEFINITE_WAIT, &ahReqsCompleted[0], 854 RT_ELEMENTS(ahReqsCompleted), &cReqsCompleted); 855 if (rc == VERR_INTERRUPTED) 856 { 857 /* Process external event. */ 858 rtAioMgrProcessBlockingEvent(pThis); 859 rc = rtAioMgrCheckFiles(pThis); 860 } 861 else if (RT_FAILURE(rc)) 862 { 863 /* Something bad happened. */ 864 /** @todo: */ 865 } 866 else 867 { 868 /* Requests completed. */ 869 for (uint32_t i = 0; i < cReqsCompleted; i++) 870 rtAioMgrReqComplete(pThis, ahReqsCompleted[i]); 871 872 /* Check files for new requests and queue waiting requests. */ 873 rc = rtAioMgrCheckFiles(pThis); 874 } 875 } while ( fRunning 876 && RT_SUCCESS(rc)); 877 878 return rc; 879 } 880 881 /** 882 * Wakes up the async I/O manager. 883 * 884 * @returns IPRT status code. 885 * @param pThis The async I/O manager. 886 */ 887 static int rtAioMgrWakeup(PRTAIOMGRINT pThis) 888 { 889 return RTFileAioCtxWakeup(pThis->hAioCtx); 890 } 891 892 /** 893 * Waits until the async I/O manager handled the given event. 894 * 895 * @returns IPRT status code. 896 * @param pThis The async I/O manager. 897 * @param enmEvent The event to pass to the manager. 898 */ 899 static int rtAioMgrWaitForBlockingEvent(PRTAIOMGRINT pThis, RTAIOMGREVENT enmEvent) 900 { 901 Assert(pThis->enmBlockingEvent == RTAIOMGREVENT_NO_EVENT); 902 ASMAtomicWriteU32((volatile uint32_t *)&pThis->enmBlockingEvent, enmEvent); 903 904 /* Wakeup the async I/O manager */ 905 int rc = rtAioMgrWakeup(pThis); 906 if (RT_FAILURE(rc)) 907 return rc; 908 909 /* Wait for completion. */ 910 rc = RTSemEventWait(pThis->hEventSemBlock, RT_INDEFINITE_WAIT); 911 AssertRC(rc); 912 913 ASMAtomicWriteU32((volatile uint32_t *)&pThis->enmBlockingEvent, RTAIOMGREVENT_NO_EVENT); 914 915 return rc; 916 } 917 918 /** 919 * Add a given file to the given I/O manager. 920 * 921 * @returns IPRT status code. 922 * @param pThis The async I/O manager. 923 * @param pFile The file to add. 924 */ 925 static int rtAioMgrAddFile(PRTAIOMGRINT pThis, PRTAIOMGRFILEINT pFile) 926 { 927 /* Update the assigned I/O manager. */ 928 ASMAtomicWritePtr(&pFile->pAioMgr, pThis); 929 930 int rc = RTCritSectEnter(&pThis->CritSectBlockingEvent); 931 AssertRCReturn(rc, rc); 932 933 ASMAtomicWritePtr(&pThis->BlockingEventData.pFileAdd, pFile); 934 rc = rtAioMgrWaitForBlockingEvent(pThis, RTAIOMGREVENT_FILE_ADD); 935 ASMAtomicWriteNullPtr(&pThis->BlockingEventData.pFileAdd); 936 937 RTCritSectLeave(&pThis->CritSectBlockingEvent); 938 return rc; 939 } 940 941 /** 942 * Removes a given file from the given I/O manager. 943 * 944 * @returns IPRT status code. 945 * @param pThis The async I/O manager. 946 * @param pFile The file to remove. 947 */ 948 static int rtAioMgrCloseFile(PRTAIOMGRINT pThis, PRTAIOMGRFILEINT pFile) 949 { 950 int rc = RTCritSectEnter(&pThis->CritSectBlockingEvent); 951 AssertRCReturn(rc, rc); 952 953 ASMAtomicWritePtr(&pThis->BlockingEventData.pFileClose, pFile); 954 rc = rtAioMgrWaitForBlockingEvent(pThis, RTAIOMGREVENT_FILE_CLOSE); 955 ASMAtomicWriteNullPtr(&pThis->BlockingEventData.pFileClose); 956 957 RTCritSectLeave(&pThis->CritSectBlockingEvent); 958 959 return rc; 960 } 961 962 /** 963 * Process a shutdown event. 964 * 965 * @returns IPRT status code. 966 * @param pThis The async I/O manager to shut down. 967 */ 968 static int rtAioMgrShutdown(PRTAIOMGRINT pThis) 969 { 970 int rc = RTCritSectEnter(&pThis->CritSectBlockingEvent); 971 AssertRCReturn(rc, rc); 972 973 rc = rtAioMgrWaitForBlockingEvent(pThis, RTAIOMGREVENT_SHUTDOWN); 974 RTCritSectLeave(&pThis->CritSectBlockingEvent); 975 976 return rc; 206 977 } 207 978 … … 216 987 int rc; 217 988 218 ASMAtomicXchgBool(&pThis->fRunning, false); 219 rc = RTFileAioCtxWakeup(pThis->hAioCtx); 989 rc = rtAioMgrShutdown(pThis); 220 990 AssertRC(rc); 221 991 … … 226 996 AssertRC(rc); 227 997 228 pThis->hThread = NIL_RTTHREAD; 229 pThis->hAioCtx = NIL_RTFILEAIOCTX; 230 pThis->u32Magic = ~RTAIOMGR_MAGIC; 998 rc = RTMemCacheDestroy(pThis->hMemCacheReqs); 999 AssertRC(rc); 1000 1001 pThis->hThread = NIL_RTTHREAD; 1002 pThis->hAioCtx = NIL_RTFILEAIOCTX; 1003 pThis->hMemCacheReqs = NIL_RTMEMCACHE; 1004 pThis->u32Magic = ~RTAIOMGR_MAGIC; 231 1005 RTMemFree(pThis); 1006 } 1007 1008 /** 1009 * Queues a new request for processing. 1010 */ 1011 static void rtAioMgrFileQueueReq(PRTAIOMGRFILEINT pThis, PRTAIOMGRREQ pReq) 1012 { 1013 RTAioMgrFileRetain(pThis); 1014 RTQueueAtomicInsert(&pThis->QueueReqs, &pReq->WorkItem); 1015 rtAioMgrWakeup(pThis->pAioMgr); 232 1016 } 233 1017 … … 241 1025 { 242 1026 pThis->u32Magic = ~RTAIOMGRFILE_MAGIC; 1027 RTAioMgrRelease(pThis->pAioMgr); 243 1028 RTMemFree(pThis); 1029 } 1030 1031 /** 1032 * Queues a new I/O request. 1033 * 1034 * @returns IPRT status code. 1035 * @param hAioMgrFile The I/O manager file handle. 1036 * @param off Start offset of the I/o request. 1037 * @param pSgBuf Data S/G buffer. 1038 * @param cbIo How much to transfer. 1039 * @param pvUser Opaque user data. 1040 * @param enmType I/O direction type (read/write). 1041 */ 1042 static int rtAioMgrFileIoReqCreate(RTAIOMGRFILE hAioMgrFile, RTFOFF off, PRTSGBUF pSgBuf, 1043 size_t cbIo, void *pvUser, RTAIOMGRREQTYPE enmType) 1044 { 1045 int rc; 1046 PRTAIOMGRFILEINT pFile = hAioMgrFile; 1047 PRTAIOMGRINT pAioMgr; 1048 1049 AssertPtrReturn(pFile, VERR_INVALID_HANDLE); 1050 pAioMgr = pFile->pAioMgr; 1051 1052 PRTAIOMGRREQ pReq = rtAioMgrReqAlloc(pAioMgr); 1053 if (RT_LIKELY(pReq)) 1054 { 1055 unsigned cSeg = 1; 1056 size_t cbSeg = RTSgBufSegArrayCreate(pSgBuf, &pReq->DataSeg, &cSeg, cbIo); 1057 1058 if (cbSeg == cbIo) 1059 { 1060 pReq->enmType = enmType; 1061 pReq->pFile = pFile; 1062 pReq->pvUser = pvUser; 1063 pReq->off = off; 1064 rtAioMgrFileQueueReq(pFile, pReq); 1065 rc = VERR_FILE_AIO_IN_PROGRESS; 1066 } 1067 else 1068 { 1069 /** @todo: Real S/G buffer support. */ 1070 rtAioMgrReqFree(pAioMgr, pReq); 1071 rc = VERR_NOT_SUPPORTED; 1072 } 1073 } 1074 else 1075 rc = VERR_NO_MEMORY; 1076 1077 return rc; 1078 } 1079 1080 /** 1081 * Request constructor for the memory cache. 1082 * 1083 * @returns IPRT status code. 1084 * @param hMemCache The cache handle. 1085 * @param pvObj The memory object that should be initialized. 1086 * @param pvUser The user argument. 1087 */ 1088 static DECLCALLBACK(int) rtAioMgrReqCtor(RTMEMCACHE hMemCache, void *pvObj, void *pvUser) 1089 { 1090 PRTAIOMGRREQ pReq = (PRTAIOMGRREQ)pvObj; 1091 1092 memset(pReq, 0, sizeof(RTAIOMGRREQ)); 1093 return RTFileAioReqCreate(&pReq->hReqIo); 1094 } 1095 1096 /** 1097 * Request destructor for the memory cache. 1098 * 1099 * @param hMemCache The cache handle. 1100 * @param pvObj The memory object that should be destroyed. 1101 * @param pvUser The user argument. 1102 */ 1103 static DECLCALLBACK(void) rtAioMgrReqDtor(RTMEMCACHE hMemCache, void *pvObj, void *pvUser) 1104 { 1105 PRTAIOMGRREQ pReq = (PRTAIOMGRREQ)pvObj; 1106 int rc = RTFileAioReqDestroy(pReq->hReqIo); 1107 1108 AssertRC(rc); 244 1109 } 245 1110 … … 257 1122 pThis->u32Magic = RTAIOMGR_MAGIC; 258 1123 pThis->cRefs = 1; 259 pThis->fRunning = true; 260 rc = RTFileAioCtxCreate(&pThis->hAioCtx, cReqsMax == UINT32_MAX 261 ? RTFILEAIO_UNLIMITED_REQS 262 : cReqsMax, 263 RTFILEAIOCTX_FLAGS_WAIT_WITHOUT_PENDING_REQUESTS); 1124 RTListInit(&pThis->ListFiles); 1125 rc = RTMemCacheCreate(&pThis->hMemCacheReqs, sizeof(RTAIOMGRREQ), 1126 0, UINT32_MAX, rtAioMgrReqCtor, rtAioMgrReqDtor, NULL, 0); 264 1127 if (RT_SUCCESS(rc)) 265 1128 { 266 rc = RTThreadCreateF(&pThis->hThread, rtAioMgrWorker, pThis, 0, RTTHREADTYPE_IO, 267 RTTHREADFLAGS_WAITABLE, "AioMgr-%p", pThis); 1129 rc = RTFileAioCtxCreate(&pThis->hAioCtx, cReqsMax == UINT32_MAX 1130 ? RTFILEAIO_UNLIMITED_REQS 1131 : cReqsMax, 1132 RTFILEAIOCTX_FLAGS_WAIT_WITHOUT_PENDING_REQUESTS); 1133 if (RT_SUCCESS(rc)) 1134 { 1135 rc = RTThreadCreateF(&pThis->hThread, rtAioMgrWorker, pThis, 0, RTTHREADTYPE_IO, 1136 RTTHREADFLAGS_WAITABLE, "AioMgr-%p", pThis); 1137 if (RT_FAILURE(rc)) 1138 { 1139 rc = RTFileAioCtxDestroy(pThis->hAioCtx); 1140 AssertRC(rc); 1141 } 1142 } 1143 268 1144 if (RT_FAILURE(rc)) 269 { 270 rc = RTFileAioCtxDestroy(pThis->hAioCtx); 271 AssertRC(rc); 272 } 273 } 1145 RTMemCacheDestroy(pThis->hMemCacheReqs); 1146 } 1147 274 1148 if (RT_FAILURE(rc)) 275 1149 RTMemFree(pThis); … … 329 1203 pThis->hFile = hFile; 330 1204 pThis->pAioMgr = hAioMgr; 1205 pThis->pfnReqCompleted = pfnReqComplete; 1206 RTQueueAtomicInit(&pThis->QueueReqs); 1207 RTAioMgrRetain(hAioMgr); 331 1208 rc = RTFileAioCtxAssociateWithFile(pThis->pAioMgr->hAioCtx, hFile); 332 1209 if (RT_FAILURE(rc)) … … 372 1249 PRTSGBUF pSgBuf, size_t cbRead, void *pvUser) 373 1250 { 374 return VERR_NOT_IMPLEMENTED; 1251 return rtAioMgrFileIoReqCreate(hAioMgrFile, off, pSgBuf, cbRead, pvUser, 1252 RTAIOMGRREQTYPE_READ); 375 1253 } 376 1254 … … 378 1256 PRTSGBUF pSgBuf, size_t cbWrite, void *pvUser) 379 1257 { 380 return VERR_NOT_IMPLEMENTED; 1258 return rtAioMgrFileIoReqCreate(hAioMgrFile, off, pSgBuf, cbWrite, pvUser, 1259 RTAIOMGRREQTYPE_WRITE); 381 1260 } 382 1261 383 1262 RTDECL(int) RTAioMgrFileFlush(RTAIOMGRFILE hAioMgrFile, void *pvUser) 384 1263 { 385 return VERR_NOT_IMPLEMENTED; 386 } 387 1264 PRTAIOMGRFILEINT pFile = hAioMgrFile; 1265 PRTAIOMGRINT pAioMgr; 1266 1267 AssertPtrReturn(pFile, VERR_INVALID_HANDLE); 1268 1269 pAioMgr = pFile->pAioMgr; 1270 1271 PRTAIOMGRREQ pReq = rtAioMgrReqAlloc(pAioMgr); 1272 if (RT_UNLIKELY(!pReq)) 1273 return VERR_NO_MEMORY; 1274 1275 pReq->pFile = pFile; 1276 pReq->enmType = RTAIOMGRREQTYPE_FLUSH; 1277 pReq->pvUser = pvUser; 1278 rtAioMgrFileQueueReq(pFile, pReq); 1279 1280 return VERR_FILE_AIO_IN_PROGRESS; 1281 } 1282
Note:
See TracChangeset
for help on using the changeset viewer.