Changeset 33536 in vbox
- Timestamp:
- Oct 28, 2010 8:30:48 AM (14 years ago)
- svn:sync-xref-src-repo-rev:
- 67136
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/src/VBox/Main/ApplianceImplIO.cpp
r33289 r33536 29 29 #include <iprt/path.h> 30 30 #include <iprt/asm.h> 31 #include <iprt/semaphore.h>32 31 #include <iprt/stream.h> 33 32 #include <iprt/circbuf.h> … … 58 57 /** Completion callback. */ 59 58 PFNVDCOMPLETED pfnCompleted; 59 /** Storage handle for the next callback in chain. */ 60 void *pvStorage; 61 /** Current file open mode. */ 62 uint32_t fOpenMode; 60 63 /** Our own storage handle. */ 61 64 PSHA1STORAGE pSha1Storage; 62 /** Storage handle for the next callback in chain. */ 63 void *pvStorage; 64 /** Memory buffer used for caching and SHA1 calculation. */ 65 char *pcBuf; 66 /** Size of the memory buffer. */ 67 size_t cbBuf; 68 /** Memory buffer for writing zeros. */ 69 void *pvZeroBuf; 70 /** Size of the zero memory buffer. */ 71 size_t cbZeroBuf; 72 /** Current position in the caching memory buffer. */ 73 size_t cbCurBuf; 74 /** Current absolute position. */ 65 /** Circular buffer used for transferring data from/to the worker thread. */ 66 PRTCIRCBUF pCircBuf; 67 /** Current absolute position (regardless of the real read/written data). */ 75 68 uint64_t cbCurAll; 76 69 /** Current real position in the file. */ 77 70 uint64_t cbCurFile; 78 /** Handle of the SHA1worker thread. */79 RTTHREAD p MfThread;71 /** Handle of the worker thread. */ 72 RTTHREAD pWorkerThread; 80 73 /** Status of the worker thread. */ 81 74 volatile uint32_t u32Status; 82 75 /** Event for signaling a new status. */ 83 76 RTSEMEVENT newStatusEvent; 84 /** Event for signaling a finished SHA1 calculation. */85 RTSEMEVENT calcFinishedEvent;77 /** Event for signaling a finished task of the worker thread. */ 78 RTSEMEVENT workFinishedEvent; 86 79 /** SHA1 calculation context. */ 87 80 RTSHA1CONTEXT ctx; 88 /* Circular buffer in read mode. */ 89 PRTCIRCBUF pCircBuf; 90 /* Are we reached end of file. */ 81 /** Write mode only: Memory buffer for writing zeros. */ 82 void *pvZeroBuf; 83 /** Write mode only: Size of the zero memory buffer. */ 84 size_t cbZeroBuf; 85 /** Read mode only: Indicate if we reached end of file. */ 91 86 volatile bool fEOF; 92 87 // uint64_t calls; … … 98 93 ******************************************************************************/ 99 94 100 #define STATUS_WAIT UINT32_C(0)101 #define STATUS_ CALCUINT32_C(1)102 #define STATUS_ END UINT32_C(3)103 #define STATUS_ READ UINT32_C(4)95 #define STATUS_WAIT UINT32_C(0) 96 #define STATUS_WRITE UINT32_C(1) 97 #define STATUS_READ UINT32_C(2) 98 #define STATUS_END UINT32_C(3) 104 99 105 100 /* Enable for getting some flow history. */ … … 471 466 PSHA1STORAGEINTERNAL pInt = (PSHA1STORAGEINTERNAL)pvUser; 472 467 468 PVDINTERFACE pIO = VDInterfaceGet(pInt->pSha1Storage->pVDImageIfaces, VDINTERFACETYPE_IO); 469 AssertPtrReturn(pIO, VERR_INVALID_PARAMETER); 470 PVDINTERFACEIO pCallbacks = VDGetInterfaceIO(pIO); 471 AssertPtrReturn(pCallbacks, VERR_INVALID_PARAMETER); 472 473 473 int rc = VINF_SUCCESS; 474 474 bool fLoop = true; … … 489 489 break; 490 490 } 491 case STATUS_ CALC:491 case STATUS_WRITE: 492 492 { 493 /* Update the SHA1 context with the next data block. */ 494 RTSha1Update(&pInt->ctx, pInt->pcBuf, pInt->cbCurBuf); 493 size_t cbAvail = RTCircBufUsed(pInt->pCircBuf); 494 size_t cbMemAllRead = 0; 495 bool fStop = false; 496 bool fEOF = false; 497 /* First loop over all the free memory in the circular 498 * memory buffer (could be turn around at the end). */ 499 for(;;) 500 { 501 if ( cbMemAllRead == cbAvail 502 || fStop == true) 503 break; 504 char *pcBuf; 505 size_t cbMemToRead = cbAvail - cbMemAllRead; 506 size_t cbMemRead = 0; 507 /* Try to acquire all the used space of the circular buffer. */ 508 RTCircBufAcquireReadBlock(pInt->pCircBuf, cbMemToRead, (void**)&pcBuf, &cbMemRead); 509 size_t cbAllWritten = 0; 510 /* Second, write as long as used memory is there. The write 511 * method could also split the writes up into to smaller 512 * parts. */ 513 for(;;) 514 { 515 if (cbAllWritten == cbMemRead) 516 break; 517 size_t cbToWrite = cbMemRead - cbAllWritten; 518 size_t cbWritten = 0; 519 rc = pCallbacks->pfnWriteSync(pIO->pvUser, pInt->pvStorage, pInt->cbCurFile, &pcBuf[cbAllWritten], cbToWrite, &cbWritten); 520 // RTPrintf ("%lu %lu %lu %Rrc\n", pInt->cbCurFile, cbToRead, cbRead, rc); 521 if (RT_FAILURE(rc)) 522 { 523 fStop = true; 524 fLoop = false; 525 break; 526 } 527 if (cbWritten == 0) 528 { 529 fStop = true; 530 fLoop = false; 531 fEOF = true; 532 // RTPrintf("EOF\n"); 533 break; 534 } 535 cbAllWritten += cbWritten; 536 pInt->cbCurFile += cbWritten; 537 } 538 /* Update the SHA1 context with the next data block. */ 539 if (pInt->pSha1Storage->fCreateDigest) 540 RTSha1Update(&pInt->ctx, pcBuf, cbAllWritten); 541 /* Mark the block as empty. */ 542 RTCircBufReleaseReadBlock(pInt->pCircBuf, cbAllWritten); 543 cbMemAllRead += cbAllWritten; 544 } 495 545 /* Reset the thread status and signal the main thread that we 496 are finished. */ 497 ASMAtomicCmpXchgU32(&pInt->u32Status, STATUS_WAIT, STATUS_CALC); 498 rc = RTSemEventSignal(pInt->calcFinishedEvent); 499 break; 500 } 501 case STATUS_END: 502 { 503 /* End signaled */ 504 fLoop = false; 546 * are finished. Use CmpXchg, so we not overwrite other states 547 * which could be signaled in the meantime. */ 548 ASMAtomicCmpXchgU32(&pInt->u32Status, STATUS_WAIT, STATUS_WRITE); 549 rc = RTSemEventSignal(pInt->workFinishedEvent); 505 550 break; 506 551 } 507 552 case STATUS_READ: 508 553 { 509 PVDINTERFACE pIO = VDInterfaceGet(pInt->pSha1Storage->pVDImageIfaces, VDINTERFACETYPE_IO);510 AssertPtrReturn(pIO, VERR_INVALID_PARAMETER);511 PVDINTERFACEIO pCallbacks = VDGetInterfaceIO(pIO);512 AssertPtrReturn(pCallbacks, VERR_INVALID_PARAMETER);513 514 554 size_t cbAvail = RTCircBufFree(pInt->pCircBuf); 515 // RTPrintf("############################################################################### th: avail %ld\n", cbAvail);516 517 /* ************ CHECK for 0 */518 /* First loop over all the available memory in the circular519 * memory buffer (could be turn around at the end). */520 555 size_t cbMemAllWrite = 0; 521 556 bool fStop = false; 522 557 bool fEOF = false; 558 /* First loop over all the available memory in the circular 559 * memory buffer (could be turn around at the end). */ 523 560 for(;;) 524 561 { … … 545 582 if (RT_FAILURE(rc)) 546 583 { 584 fStop = true; 547 585 fLoop = false; 548 fStop = true;549 586 break; 550 587 } … … 561 598 } 562 599 /* Update the SHA1 context with the next data block. */ 563 RTSha1Update(&pInt->ctx, pcBuf, cbAllRead); 600 if (pInt->pSha1Storage->fCreateDigest) 601 RTSha1Update(&pInt->ctx, pcBuf, cbAllRead); 564 602 /* Mark the block as full. */ 565 603 RTCircBufReleaseWriteBlock(pInt->pCircBuf, cbAllRead); … … 569 607 ASMAtomicWriteBool(&pInt->fEOF, true); 570 608 /* Reset the thread status and signal the main thread that we 571 are finished. */ 609 * are finished. Use CmpXchg, so we not overwrite other states 610 * which could be signaled in the meantime. */ 572 611 ASMAtomicCmpXchgU32(&pInt->u32Status, STATUS_WAIT, STATUS_READ); 573 rc = RTSemEventSignal(pInt->calcFinishedEvent); 612 rc = RTSemEventSignal(pInt->workFinishedEvent); 613 break; 614 } 615 case STATUS_END: 616 { 617 /* End signaled */ 618 fLoop = false; 574 619 break; 575 620 } … … 592 637 { 593 638 // RTPrintf(" wait\n"); 594 if (!( ASMAtomicReadU32(&pInt->u32Status) == STATUS_ CALC639 if (!( ASMAtomicReadU32(&pInt->u32Status) == STATUS_WRITE 595 640 || ASMAtomicReadU32(&pInt->u32Status) == STATUS_READ)) 596 641 break; 597 rc = RTSemEventWait(pInt-> calcFinishedEvent, 100);642 rc = RTSemEventWait(pInt->workFinishedEvent, 100); 598 643 } 599 644 if (rc == VERR_TIMEOUT) … … 602 647 } 603 648 604 DECLINLINE(int) sha1FlushCurBuf(P VDINTERFACE pIO, PVDINTERFACEIO pCallbacks, PSHA1STORAGEINTERNAL pInt, bool fCreateDigest)649 DECLINLINE(int) sha1FlushCurBuf(PSHA1STORAGEINTERNAL pInt) 605 650 { 606 651 int rc = VINF_SUCCESS; 607 if ( fCreateDigest)608 { 609 /* Let the sha1worker thread start immediately. */610 rc = sha1SignalManifestThread(pInt, STATUS_ CALC);652 if (pInt->fOpenMode & RTFILE_O_WRITE) 653 { 654 /* Let the write worker thread start immediately. */ 655 rc = sha1SignalManifestThread(pInt, STATUS_WRITE); 611 656 if (RT_FAILURE(rc)) 612 657 return rc; 613 } 614 /* Write the buffer content to disk. */ 615 size_t cbAllWritten = 0; 616 for(;;) 617 { 618 /* Finished? */ 619 if (cbAllWritten == pInt->cbCurBuf) 620 break; 621 size_t cbToWrite = pInt->cbCurBuf - cbAllWritten; 622 size_t cbWritten = 0; 623 rc = pCallbacks->pfnWriteSync(pIO->pvUser, pInt->pvStorage, pInt->cbCurFile, &pInt->pcBuf[cbAllWritten], cbToWrite, &cbWritten); 624 if (RT_FAILURE(rc)) 625 return rc; 626 pInt->cbCurFile += cbWritten; 627 cbAllWritten += cbWritten; 628 } 629 if (fCreateDigest) 630 { 631 /* Wait until the sha1 worker thread has finished. */ 658 659 /* Wait until the write worker thread has finished. */ 632 660 rc = sha1WaitForManifestThreadFinished(pInt); 633 661 } 634 if (RT_SUCCESS(rc))635 pInt->cbCurBuf = 0;636 662 637 663 return rc; … … 666 692 pInt->pSha1Storage = pSha1Storage; 667 693 pInt->fEOF = false; 668 669 670 if (fOpen & RTFILE_O_READ) 694 pInt->fOpenMode = fOpen; 695 696 /* Circular buffer in the read case. */ 697 rc = RTCircBufCreate(&pInt->pCircBuf, _1M * 2); 698 if (RT_FAILURE(rc)) 699 break; 700 701 if (fOpen & RTFILE_O_WRITE) 671 702 { 672 /* Circular buffer in the read case. */673 rc = RTCircBufCreate(&pInt->pCircBuf, _1M * 2);674 if (RT_FAILURE(rc))675 break;676 }677 else if (fOpen & RTFILE_O_WRITE)678 {679 /* For caching reasons and to be able to calculate the sha1 sum of the680 data we need a memory buffer. */681 pInt->cbBuf = _1M;682 pInt->pcBuf = (char*)RTMemAlloc(pInt->cbBuf);683 if (!pInt->pcBuf)684 {685 rc = VERR_NO_MEMORY;686 break;687 }688 703 /* The zero buffer is used for appending empty parts at the end of the 689 704 * file (or our buffer) in setSize or when uOffset in writeSync is … … 698 713 } 699 714 700 if ( fOpen & RTFILE_O_READ 701 || pSha1Storage->fCreateDigest) 702 { 703 /* Create an event semaphore to indicate a state change for the sha1 704 worker thread. */ 705 rc = RTSemEventCreate(&pInt->newStatusEvent); 706 if (RT_FAILURE(rc)) 707 break; 708 /* Create an event semaphore to indicate a finished calculation of the 709 sha1 worker thread. */ 710 rc = RTSemEventCreate(&pInt->calcFinishedEvent); 711 if (RT_FAILURE(rc)) 712 break; 713 /* Create the sha1 worker thread. */ 714 rc = RTThreadCreate(&pInt->pMfThread, sha1CalcWorkerThread, pInt, 0, RTTHREADTYPE_MAIN_HEAVY_WORKER, RTTHREADFLAGS_WAITABLE, "SHA1-Worker"); 715 if (RT_FAILURE(rc)) 716 break; 717 } 715 /* Create an event semaphore to indicate a state change for the worker 716 * thread. */ 717 rc = RTSemEventCreate(&pInt->newStatusEvent); 718 if (RT_FAILURE(rc)) 719 break; 720 /* Create an event semaphore to indicate a finished calculation of the 721 worker thread. */ 722 rc = RTSemEventCreate(&pInt->workFinishedEvent); 723 if (RT_FAILURE(rc)) 724 break; 725 /* Create the worker thread. */ 726 rc = RTThreadCreate(&pInt->pWorkerThread, sha1CalcWorkerThread, pInt, 0, RTTHREADTYPE_MAIN_HEAVY_WORKER, RTTHREADFLAGS_WAITABLE, "SHA1-Worker"); 727 if (RT_FAILURE(rc)) 728 break; 718 729 719 730 if (pSha1Storage->fCreateDigest) 720 /* Create a sha1 context the sha1worker thread will work with. */731 /* Create a sha1 context the worker thread will work with. */ 721 732 RTSha1Init(&pInt->ctx); 722 733 … … 725 736 fOpen, pInt->pfnCompleted, 726 737 &pInt->pvStorage); 738 if (RT_FAILURE(rc)) 739 break; 740 741 if (fOpen & RTFILE_O_READ) 742 { 743 /* Immediately let the worker thread start the reading. */ 744 rc = sha1SignalManifestThread(pInt, STATUS_READ); 745 } 727 746 } 728 747 while(0); … … 730 749 if (RT_FAILURE(rc)) 731 750 { 732 if (pInt->p MfThread)751 if (pInt->pWorkerThread) 733 752 { 734 753 sha1SignalManifestThread(pInt, STATUS_END); 735 RTThreadWait(pInt->p MfThread, RT_INDEFINITE_WAIT, 0);754 RTThreadWait(pInt->pWorkerThread, RT_INDEFINITE_WAIT, 0); 736 755 } 737 if (pInt-> calcFinishedEvent)738 RTSemEventDestroy(pInt-> calcFinishedEvent);756 if (pInt->workFinishedEvent) 757 RTSemEventDestroy(pInt->workFinishedEvent); 739 758 if (pInt->newStatusEvent) 740 759 RTSemEventDestroy(pInt->newStatusEvent); … … 743 762 if (pInt->pvZeroBuf) 744 763 RTMemFree(pInt->pvZeroBuf); 745 if (pInt->pcBuf)746 RTMemFree(pInt->pcBuf);747 764 RTMemFree(pInt); 748 765 } … … 772 789 773 790 /* Make sure all pending writes are flushed */ 774 if (pInt->cbCurBuf > 0)775 rc = sha1FlushCurBuf(pIO, pCallbacks, pInt, pSha1Storage->fCreateDigest); 776 777 if (pInt->pMfThread)791 rc = sha1FlushCurBuf(pInt); 792 793 if (pInt->pWorkerThread) 794 { 778 795 /* Signal the worker thread to end himself */ 779 796 rc = sha1SignalManifestThread(pInt, STATUS_END); 780 781 if (pSha1Storage->fCreateDigest) 797 /* Worker thread stopped? */ 798 rc = RTThreadWait(pInt->pWorkerThread, RT_INDEFINITE_WAIT, 0); 799 } 800 801 if ( RT_SUCCESS(rc) 802 && pSha1Storage->fCreateDigest) 782 803 { 783 804 /* Finally calculate & format the SHA1 sum */ … … 795 816 } 796 817 797 if (pInt->pMfThread)798 /* Worker thread stopped? */799 rc = RTThreadWait(pInt->pMfThread, RT_INDEFINITE_WAIT, 0);800 801 818 /* Close the file */ 802 819 rc = pCallbacks->pfnClose(pIO->pvUser, pInt->pvStorage); … … 805 822 806 823 /* Cleanup */ 807 if (pInt-> calcFinishedEvent)808 RTSemEventDestroy(pInt-> calcFinishedEvent);824 if (pInt->workFinishedEvent) 825 RTSemEventDestroy(pInt->workFinishedEvent); 809 826 if (pInt->newStatusEvent) 810 827 RTSemEventDestroy(pInt->newStatusEvent); … … 813 830 if (pInt->pvZeroBuf) 814 831 RTMemFree(pInt->pvZeroBuf); 815 if (pInt->pcBuf)816 RTMemFree(pInt->pcBuf);817 832 RTMemFree(pInt); 818 833 … … 982 997 if (cbAllWritten == cbWrite) 983 998 break; 984 size_t cb ToWrite = RT_MIN(pInt->cbBuf - pInt->cbCurBuf, cbWrite - cbAllWritten);985 memcpy(&pInt->pcBuf[pInt->cbCurBuf], &((char*)pvBuf)[cbAllWritten], cbToWrite);986 pInt->cbCurBuf += cbToWrite;987 pInt->cbCurAll += cbToWrite;988 cbAllWritten += cbToWrite;989 /* Need to start a real write?*/990 if ( pInt->cbCurBuf == pInt->cbBuf)999 size_t cbAvail = RTCircBufFree(pInt->pCircBuf); 1000 if ( cbAvail == 0 1001 && pInt->fEOF) 1002 return VERR_EOF; 1003 /* If there isn't enough free space make sure the worker thread is 1004 * writing some data. */ 1005 if ((cbWrite - cbAllWritten) > cbAvail) 991 1006 { 992 rc = sha1 FlushCurBuf(pIO, pCallbacks, pInt, pSha1Storage->fCreateDigest);993 if 1007 rc = sha1SignalManifestThread(pInt, STATUS_WRITE); 1008 if(RT_FAILURE(rc)) 994 1009 break; 1010 /* If there is _no_ free space available, we have to wait until it is. */ 1011 if (cbAvail == 0) 1012 { 1013 rc = sha1WaitForManifestThreadFinished(pInt); 1014 if (RT_FAILURE(rc)) 1015 break; 1016 cbAvail = RTCircBufFree(pInt->pCircBuf); 1017 // RTPrintf("############## wait %lu %lu %lu \n", cbRead, cbAllRead, cbAvail); 1018 // pInt->waits++; 1019 } 995 1020 } 1021 size_t cbToWrite = RT_MIN(cbWrite - cbAllWritten, cbAvail); 1022 char *pcBuf; 1023 size_t cbMemWritten = 0; 1024 /* Acquire a block for writing from our circular buffer. */ 1025 RTCircBufAcquireWriteBlock(pInt->pCircBuf, cbToWrite, (void**)&pcBuf, &cbMemWritten); 1026 memcpy(pcBuf, &((char*)pvBuf)[cbAllWritten], cbMemWritten); 1027 /* Mark the block full. */ 1028 RTCircBufReleaseWriteBlock(pInt->pCircBuf, cbMemWritten); 1029 cbAllWritten += cbMemWritten; 1030 pInt->cbCurAll += cbMemWritten; 996 1031 } 1032 997 1033 if (pcbWritten) 998 1034 *pcbWritten = cbAllWritten; 1035 1036 /* Signal the thread to write more data in the mean time. */ 1037 if ( RT_SUCCESS(rc) 1038 && RTCircBufUsed(pInt->pCircBuf) >= (RTCircBufSize(pInt->pCircBuf) / 2)) 1039 rc = sha1SignalManifestThread(pInt, STATUS_WRITE); 999 1040 1000 1041 return rc; … … 1103 1144 PSHA1STORAGEINTERNAL pInt = (PSHA1STORAGEINTERNAL)pvStorage; 1104 1145 1105 int rc = VINF_SUCCESS;1106 1107 1146 /* Check if there is still something in the buffer. If yes, flush it. */ 1108 if (pInt->cbCurBuf > 0) 1109 { 1110 rc = sha1FlushCurBuf(pIO, pCallbacks, pInt, pSha1Storage->fCreateDigest); 1111 if (RT_FAILURE(rc)) 1112 return rc; 1113 } 1147 int rc = sha1FlushCurBuf(pInt); 1148 if (RT_FAILURE(rc)) 1149 return rc; 1114 1150 1115 1151 return pCallbacks->pfnFlushSync(pIO->pvUser, pInt->pvStorage);
Note:
See TracChangeset
for help on using the changeset viewer.