VirtualBox

Changeset 33536 in vbox


Ignore:
Timestamp:
Oct 28, 2010 8:30:48 AM (14 years ago)
Author:
vboxsync
svn:sync-xref-src-repo-rev:
67136
Message:

Main-OVF: Use the circular buffer on export and move the write operation to the
worker thread as well.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/src/VBox/Main/ApplianceImplIO.cpp

    r33289 r33536  
    2929#include <iprt/path.h>
    3030#include <iprt/asm.h>
    31 #include <iprt/semaphore.h>
    3231#include <iprt/stream.h>
    3332#include <iprt/circbuf.h>
     
    5857    /** Completion callback. */
    5958    PFNVDCOMPLETED pfnCompleted;
     59    /** Storage handle for the next callback in chain. */
     60    void *pvStorage;
     61    /** Current file open mode. */
     62    uint32_t fOpenMode;
    6063    /** Our own storage handle. */
    6164    PSHA1STORAGE pSha1Storage;
    62     /** Storage handle for the next callback in chain. */
    63     void *pvStorage;
    64     /** Memory buffer used for caching and SHA1 calculation. */
    65     char *pcBuf;
    66     /** Size of the memory buffer. */
    67     size_t cbBuf;
    68     /** Memory buffer for writing zeros. */
    69     void *pvZeroBuf;
    70     /** Size of the zero memory buffer. */
    71     size_t cbZeroBuf;
    72     /** Current position in the caching memory buffer. */
    73     size_t cbCurBuf;
    74     /** Current absolute position. */
     65    /** Circular buffer used for transferring data from/to the worker thread. */
     66    PRTCIRCBUF pCircBuf;
     67    /** Current absolute position (regardless of the real read/written data). */
    7568    uint64_t cbCurAll;
    7669    /** Current real position in the file. */
    7770    uint64_t cbCurFile;
    78     /** Handle of the SHA1 worker thread. */
    79     RTTHREAD pMfThread;
     71    /** Handle of the worker thread. */
     72    RTTHREAD pWorkerThread;
    8073    /** Status of the worker thread. */
    8174    volatile uint32_t u32Status;
    8275    /** Event for signaling a new status. */
    8376    RTSEMEVENT newStatusEvent;
    84     /** Event for signaling a finished SHA1 calculation. */
    85     RTSEMEVENT calcFinishedEvent;
     77    /** Event for signaling a finished task of the worker thread. */
     78    RTSEMEVENT workFinishedEvent;
    8679    /** SHA1 calculation context. */
    8780    RTSHA1CONTEXT ctx;
    88     /* Circular buffer in read mode. */
    89     PRTCIRCBUF pCircBuf;
    90     /* Are we reached end of file. */
     81    /** Write mode only: Memory buffer for writing zeros. */
     82    void *pvZeroBuf;
     83    /** Write mode only: Size of the zero memory buffer. */
     84    size_t cbZeroBuf;
     85    /** Read mode only: Indicate if we reached end of file. */
    9186    volatile bool fEOF;
    9287//    uint64_t calls;
     
    9893 ******************************************************************************/
    9994
    100 #define STATUS_WAIT UINT32_C(0)
    101 #define STATUS_CALC UINT32_C(1)
    102 #define STATUS_END  UINT32_C(3)
    103 #define STATUS_READ UINT32_C(4)
     95#define STATUS_WAIT  UINT32_C(0)
     96#define STATUS_WRITE UINT32_C(1)
     97#define STATUS_READ  UINT32_C(2)
     98#define STATUS_END   UINT32_C(3)
    10499
    105100/* Enable for getting some flow history. */
     
    471466    PSHA1STORAGEINTERNAL pInt = (PSHA1STORAGEINTERNAL)pvUser;
    472467
     468    PVDINTERFACE pIO = VDInterfaceGet(pInt->pSha1Storage->pVDImageIfaces, VDINTERFACETYPE_IO);
     469    AssertPtrReturn(pIO, VERR_INVALID_PARAMETER);
     470    PVDINTERFACEIO pCallbacks = VDGetInterfaceIO(pIO);
     471    AssertPtrReturn(pCallbacks, VERR_INVALID_PARAMETER);
     472
    473473    int rc = VINF_SUCCESS;
    474474    bool fLoop = true;
     
    489489                break;
    490490            }
    491             case STATUS_CALC:
     491            case STATUS_WRITE:
    492492            {
    493                 /* Update the SHA1 context with the next data block. */
    494                 RTSha1Update(&pInt->ctx, pInt->pcBuf, pInt->cbCurBuf);
     493                size_t cbAvail = RTCircBufUsed(pInt->pCircBuf);
     494                size_t cbMemAllRead = 0;
     495                bool fStop = false;
     496                bool fEOF = false;
     497                /* First loop over all the free memory in the circular
     498                 * memory buffer (could be turn around at the end). */
     499                for(;;)
     500                {
     501                    if (   cbMemAllRead == cbAvail
     502                        || fStop == true)
     503                        break;
     504                    char *pcBuf;
     505                    size_t cbMemToRead = cbAvail - cbMemAllRead;
     506                    size_t cbMemRead = 0;
     507                    /* Try to acquire all the used space of the circular buffer. */
     508                    RTCircBufAcquireReadBlock(pInt->pCircBuf, cbMemToRead, (void**)&pcBuf, &cbMemRead);
     509                    size_t cbAllWritten = 0;
     510                    /* Second, write as long as used memory is there. The write
     511                     * method could also split the writes up into to smaller
     512                     * parts. */
     513                    for(;;)
     514                    {
     515                        if (cbAllWritten == cbMemRead)
     516                            break;
     517                        size_t cbToWrite = cbMemRead - cbAllWritten;
     518                        size_t cbWritten = 0;
     519                        rc = pCallbacks->pfnWriteSync(pIO->pvUser, pInt->pvStorage, pInt->cbCurFile, &pcBuf[cbAllWritten], cbToWrite, &cbWritten);
     520//                        RTPrintf ("%lu %lu %lu %Rrc\n", pInt->cbCurFile, cbToRead, cbRead, rc);
     521                        if (RT_FAILURE(rc))
     522                        {
     523                            fStop = true;
     524                            fLoop = false;
     525                            break;
     526                        }
     527                        if (cbWritten == 0)
     528                        {
     529                            fStop = true;
     530                            fLoop = false;
     531                            fEOF = true;
     532//                            RTPrintf("EOF\n");
     533                            break;
     534                        }
     535                        cbAllWritten += cbWritten;
     536                        pInt->cbCurFile += cbWritten;
     537                    }
     538                    /* Update the SHA1 context with the next data block. */
     539                    if (pInt->pSha1Storage->fCreateDigest)
     540                        RTSha1Update(&pInt->ctx, pcBuf, cbAllWritten);
     541                    /* Mark the block as empty. */
     542                    RTCircBufReleaseReadBlock(pInt->pCircBuf, cbAllWritten);
     543                    cbMemAllRead += cbAllWritten;
     544                }
    495545                /* Reset the thread status and signal the main thread that we
    496                    are finished. */
    497                 ASMAtomicCmpXchgU32(&pInt->u32Status, STATUS_WAIT, STATUS_CALC);
    498                 rc = RTSemEventSignal(pInt->calcFinishedEvent);
    499                 break;
    500             }
    501             case STATUS_END:
    502             {
    503                 /* End signaled */
    504                 fLoop = false;
     546                 * are finished. Use CmpXchg, so we not overwrite other states
     547                 * which could be signaled in the meantime. */
     548                ASMAtomicCmpXchgU32(&pInt->u32Status, STATUS_WAIT, STATUS_WRITE);
     549                rc = RTSemEventSignal(pInt->workFinishedEvent);
    505550                break;
    506551            }
    507552            case STATUS_READ:
    508553            {
    509                 PVDINTERFACE pIO = VDInterfaceGet(pInt->pSha1Storage->pVDImageIfaces, VDINTERFACETYPE_IO);
    510                 AssertPtrReturn(pIO, VERR_INVALID_PARAMETER);
    511                 PVDINTERFACEIO pCallbacks = VDGetInterfaceIO(pIO);
    512                 AssertPtrReturn(pCallbacks, VERR_INVALID_PARAMETER);
    513 
    514554                size_t cbAvail = RTCircBufFree(pInt->pCircBuf);
    515 //                RTPrintf("############################################################################### th: avail %ld\n", cbAvail);
    516 
    517                 /* ************ CHECK for 0 */
    518                 /* First loop over all the available memory in the circular
    519                  * memory buffer (could be turn around at the end). */
    520555                size_t cbMemAllWrite = 0;
    521556                bool fStop = false;
    522557                bool fEOF = false;
     558                /* First loop over all the available memory in the circular
     559                 * memory buffer (could be turn around at the end). */
    523560                for(;;)
    524561                {
     
    545582                        if (RT_FAILURE(rc))
    546583                        {
     584                            fStop = true;
    547585                            fLoop = false;
    548                             fStop = true;
    549586                            break;
    550587                        }
     
    561598                    }
    562599                    /* Update the SHA1 context with the next data block. */
    563                     RTSha1Update(&pInt->ctx, pcBuf, cbAllRead);
     600                    if (pInt->pSha1Storage->fCreateDigest)
     601                        RTSha1Update(&pInt->ctx, pcBuf, cbAllRead);
    564602                    /* Mark the block as full. */
    565603                    RTCircBufReleaseWriteBlock(pInt->pCircBuf, cbAllRead);
     
    569607                    ASMAtomicWriteBool(&pInt->fEOF, true);
    570608                /* Reset the thread status and signal the main thread that we
    571                    are finished. */
     609                 * are finished. Use CmpXchg, so we not overwrite other states
     610                 * which could be signaled in the meantime. */
    572611                ASMAtomicCmpXchgU32(&pInt->u32Status, STATUS_WAIT, STATUS_READ);
    573                 rc = RTSemEventSignal(pInt->calcFinishedEvent);
     612                rc = RTSemEventSignal(pInt->workFinishedEvent);
     613                break;
     614            }
     615            case STATUS_END:
     616            {
     617                /* End signaled */
     618                fLoop = false;
    574619                break;
    575620            }
     
    592637    {
    593638//        RTPrintf(" wait\n");
    594         if (!(   ASMAtomicReadU32(&pInt->u32Status) == STATUS_CALC
     639        if (!(   ASMAtomicReadU32(&pInt->u32Status) == STATUS_WRITE
    595640              || ASMAtomicReadU32(&pInt->u32Status) == STATUS_READ))
    596641            break;
    597         rc = RTSemEventWait(pInt->calcFinishedEvent, 100);
     642        rc = RTSemEventWait(pInt->workFinishedEvent, 100);
    598643    }
    599644    if (rc == VERR_TIMEOUT)
     
    602647}
    603648
    604 DECLINLINE(int) sha1FlushCurBuf(PVDINTERFACE pIO, PVDINTERFACEIO pCallbacks, PSHA1STORAGEINTERNAL pInt, bool fCreateDigest)
     649DECLINLINE(int) sha1FlushCurBuf(PSHA1STORAGEINTERNAL pInt)
    605650{
    606651    int rc = VINF_SUCCESS;
    607     if (fCreateDigest)
    608     {
    609         /* Let the sha1 worker thread start immediately. */
    610         rc = sha1SignalManifestThread(pInt, STATUS_CALC);
     652    if (pInt->fOpenMode & RTFILE_O_WRITE)
     653    {
     654        /* Let the write worker thread start immediately. */
     655        rc = sha1SignalManifestThread(pInt, STATUS_WRITE);
    611656        if (RT_FAILURE(rc))
    612657            return rc;
    613     }
    614     /* Write the buffer content to disk. */
    615     size_t cbAllWritten = 0;
    616     for(;;)
    617     {
    618         /* Finished? */
    619         if (cbAllWritten == pInt->cbCurBuf)
    620             break;
    621         size_t cbToWrite = pInt->cbCurBuf - cbAllWritten;
    622         size_t cbWritten = 0;
    623         rc = pCallbacks->pfnWriteSync(pIO->pvUser, pInt->pvStorage, pInt->cbCurFile, &pInt->pcBuf[cbAllWritten], cbToWrite, &cbWritten);
    624         if (RT_FAILURE(rc))
    625             return rc;
    626         pInt->cbCurFile += cbWritten;
    627         cbAllWritten += cbWritten;
    628     }
    629     if (fCreateDigest)
    630     {
    631         /* Wait until the sha1 worker thread has finished. */
     658
     659        /* Wait until the write worker thread has finished. */
    632660        rc = sha1WaitForManifestThreadFinished(pInt);
    633661    }
    634     if (RT_SUCCESS(rc))
    635         pInt->cbCurBuf = 0;
    636662
    637663    return rc;
     
    666692        pInt->pSha1Storage = pSha1Storage;
    667693        pInt->fEOF         = false;
    668 
    669 
    670         if (fOpen & RTFILE_O_READ)
     694        pInt->fOpenMode    = fOpen;
     695
     696        /* Circular buffer in the read case. */
     697        rc = RTCircBufCreate(&pInt->pCircBuf, _1M * 2);
     698        if (RT_FAILURE(rc))
     699            break;
     700
     701        if (fOpen & RTFILE_O_WRITE)
    671702        {
    672             /* Circular buffer in the read case. */
    673             rc = RTCircBufCreate(&pInt->pCircBuf, _1M * 2);
    674             if (RT_FAILURE(rc))
    675                 break;
    676         }
    677         else if (fOpen & RTFILE_O_WRITE)
    678         {
    679             /* For caching reasons and to be able to calculate the sha1 sum of the
    680                data we need a memory buffer. */
    681             pInt->cbBuf = _1M;
    682             pInt->pcBuf = (char*)RTMemAlloc(pInt->cbBuf);
    683             if (!pInt->pcBuf)
    684             {
    685                 rc = VERR_NO_MEMORY;
    686                 break;
    687             }
    688703            /* The zero buffer is used for appending empty parts at the end of the
    689704             * file (or our buffer) in setSize or when uOffset in writeSync is
     
    698713        }
    699714
    700         if (   fOpen & RTFILE_O_READ
    701             || pSha1Storage->fCreateDigest)
    702         {
    703             /* Create an event semaphore to indicate a state change for the sha1
    704                worker thread. */
    705             rc = RTSemEventCreate(&pInt->newStatusEvent);
    706             if (RT_FAILURE(rc))
    707                 break;
    708             /* Create an event semaphore to indicate a finished calculation of the
    709                sha1 worker thread. */
    710             rc = RTSemEventCreate(&pInt->calcFinishedEvent);
    711             if (RT_FAILURE(rc))
    712                 break;
    713             /* Create the sha1 worker thread. */
    714             rc = RTThreadCreate(&pInt->pMfThread, sha1CalcWorkerThread, pInt, 0, RTTHREADTYPE_MAIN_HEAVY_WORKER, RTTHREADFLAGS_WAITABLE, "SHA1-Worker");
    715             if (RT_FAILURE(rc))
    716                 break;
    717         }
     715        /* Create an event semaphore to indicate a state change for the worker
     716         * thread. */
     717        rc = RTSemEventCreate(&pInt->newStatusEvent);
     718        if (RT_FAILURE(rc))
     719            break;
     720        /* Create an event semaphore to indicate a finished calculation of the
     721           worker thread. */
     722        rc = RTSemEventCreate(&pInt->workFinishedEvent);
     723        if (RT_FAILURE(rc))
     724            break;
     725        /* Create the worker thread. */
     726        rc = RTThreadCreate(&pInt->pWorkerThread, sha1CalcWorkerThread, pInt, 0, RTTHREADTYPE_MAIN_HEAVY_WORKER, RTTHREADFLAGS_WAITABLE, "SHA1-Worker");
     727        if (RT_FAILURE(rc))
     728            break;
    718729
    719730        if (pSha1Storage->fCreateDigest)
    720             /* Create a sha1 context the sha1 worker thread will work with. */
     731            /* Create a sha1 context the worker thread will work with. */
    721732            RTSha1Init(&pInt->ctx);
    722733
     
    725736                                 fOpen, pInt->pfnCompleted,
    726737                                 &pInt->pvStorage);
     738        if (RT_FAILURE(rc))
     739            break;
     740
     741        if (fOpen & RTFILE_O_READ)
     742        {
     743            /* Immediately let the worker thread start the reading. */
     744            rc = sha1SignalManifestThread(pInt, STATUS_READ);
     745        }
    727746    }
    728747    while(0);
     
    730749    if (RT_FAILURE(rc))
    731750    {
    732         if (pInt->pMfThread)
     751        if (pInt->pWorkerThread)
    733752        {
    734753            sha1SignalManifestThread(pInt, STATUS_END);
    735             RTThreadWait(pInt->pMfThread, RT_INDEFINITE_WAIT, 0);
     754            RTThreadWait(pInt->pWorkerThread, RT_INDEFINITE_WAIT, 0);
    736755        }
    737         if (pInt->calcFinishedEvent)
    738             RTSemEventDestroy(pInt->calcFinishedEvent);
     756        if (pInt->workFinishedEvent)
     757            RTSemEventDestroy(pInt->workFinishedEvent);
    739758        if (pInt->newStatusEvent)
    740759            RTSemEventDestroy(pInt->newStatusEvent);
     
    743762        if (pInt->pvZeroBuf)
    744763            RTMemFree(pInt->pvZeroBuf);
    745         if (pInt->pcBuf)
    746             RTMemFree(pInt->pcBuf);
    747764        RTMemFree(pInt);
    748765    }
     
    772789
    773790    /* Make sure all pending writes are flushed */
    774     if (pInt->cbCurBuf > 0)
    775         rc = sha1FlushCurBuf(pIO, pCallbacks, pInt, pSha1Storage->fCreateDigest);
    776 
    777     if (pInt->pMfThread)
     791    rc = sha1FlushCurBuf(pInt);
     792
     793    if (pInt->pWorkerThread)
     794    {
    778795        /* Signal the worker thread to end himself */
    779796        rc = sha1SignalManifestThread(pInt, STATUS_END);
    780 
    781     if (pSha1Storage->fCreateDigest)
     797        /* Worker thread stopped? */
     798        rc = RTThreadWait(pInt->pWorkerThread, RT_INDEFINITE_WAIT, 0);
     799    }
     800
     801    if (   RT_SUCCESS(rc)
     802        && pSha1Storage->fCreateDigest)
    782803    {
    783804        /* Finally calculate & format the SHA1 sum */
     
    795816    }
    796817
    797     if (pInt->pMfThread)
    798         /* Worker thread stopped? */
    799         rc = RTThreadWait(pInt->pMfThread, RT_INDEFINITE_WAIT, 0);
    800 
    801818    /* Close the file */
    802819    rc = pCallbacks->pfnClose(pIO->pvUser, pInt->pvStorage);
     
    805822
    806823    /* Cleanup */
    807     if (pInt->calcFinishedEvent)
    808         RTSemEventDestroy(pInt->calcFinishedEvent);
     824    if (pInt->workFinishedEvent)
     825        RTSemEventDestroy(pInt->workFinishedEvent);
    809826    if (pInt->newStatusEvent)
    810827        RTSemEventDestroy(pInt->newStatusEvent);
     
    813830    if (pInt->pvZeroBuf)
    814831        RTMemFree(pInt->pvZeroBuf);
    815     if (pInt->pcBuf)
    816         RTMemFree(pInt->pcBuf);
    817832    RTMemFree(pInt);
    818833
     
    982997        if (cbAllWritten == cbWrite)
    983998            break;
    984         size_t cbToWrite = RT_MIN(pInt->cbBuf - pInt->cbCurBuf, cbWrite - cbAllWritten);
    985         memcpy(&pInt->pcBuf[pInt->cbCurBuf], &((char*)pvBuf)[cbAllWritten], cbToWrite);
    986         pInt->cbCurBuf += cbToWrite;
    987         pInt->cbCurAll += cbToWrite;
    988         cbAllWritten += cbToWrite;
    989         /* Need to start a real write? */
    990         if (pInt->cbCurBuf == pInt->cbBuf)
     999        size_t cbAvail = RTCircBufFree(pInt->pCircBuf);
     1000        if (   cbAvail == 0
     1001            && pInt->fEOF)
     1002            return VERR_EOF;
     1003        /* If there isn't enough free space make sure the worker thread is
     1004         * writing some data. */
     1005        if ((cbWrite - cbAllWritten) > cbAvail)
    9911006        {
    992             rc = sha1FlushCurBuf(pIO, pCallbacks, pInt, pSha1Storage->fCreateDigest);
    993             if (RT_FAILURE(rc))
     1007            rc = sha1SignalManifestThread(pInt, STATUS_WRITE);
     1008            if(RT_FAILURE(rc))
    9941009                break;
     1010            /* If there is _no_ free space available, we have to wait until it is. */
     1011            if (cbAvail == 0)
     1012            {
     1013                rc = sha1WaitForManifestThreadFinished(pInt);
     1014                if (RT_FAILURE(rc))
     1015                    break;
     1016                cbAvail = RTCircBufFree(pInt->pCircBuf);
     1017//                RTPrintf("############## wait %lu %lu %lu \n", cbRead, cbAllRead, cbAvail);
     1018//                pInt->waits++;
     1019            }
    9951020        }
     1021        size_t cbToWrite = RT_MIN(cbWrite - cbAllWritten, cbAvail);
     1022        char *pcBuf;
     1023        size_t cbMemWritten = 0;
     1024        /* Acquire a block for writing from our circular buffer. */
     1025        RTCircBufAcquireWriteBlock(pInt->pCircBuf, cbToWrite, (void**)&pcBuf, &cbMemWritten);
     1026        memcpy(pcBuf, &((char*)pvBuf)[cbAllWritten], cbMemWritten);
     1027        /* Mark the block full. */
     1028        RTCircBufReleaseWriteBlock(pInt->pCircBuf, cbMemWritten);
     1029        cbAllWritten += cbMemWritten;
     1030        pInt->cbCurAll += cbMemWritten;
    9961031    }
     1032
    9971033    if (pcbWritten)
    9981034        *pcbWritten = cbAllWritten;
     1035
     1036    /* Signal the thread to write more data in the mean time. */
     1037    if (   RT_SUCCESS(rc)
     1038           && RTCircBufUsed(pInt->pCircBuf) >= (RTCircBufSize(pInt->pCircBuf) / 2))
     1039        rc = sha1SignalManifestThread(pInt, STATUS_WRITE);
    9991040
    10001041    return rc;
     
    11031144    PSHA1STORAGEINTERNAL pInt = (PSHA1STORAGEINTERNAL)pvStorage;
    11041145
    1105     int rc = VINF_SUCCESS;
    1106 
    11071146    /* Check if there is still something in the buffer. If yes, flush it. */
    1108     if (pInt->cbCurBuf > 0)
    1109     {
    1110         rc = sha1FlushCurBuf(pIO, pCallbacks, pInt, pSha1Storage->fCreateDigest);
    1111         if (RT_FAILURE(rc))
    1112             return rc;
    1113     }
     1147    int rc = sha1FlushCurBuf(pInt);
     1148    if (RT_FAILURE(rc))
     1149        return rc;
    11141150
    11151151    return pCallbacks->pfnFlushSync(pIO->pvUser, pInt->pvStorage);
Note: See TracChangeset for help on using the changeset viewer.

© 2025 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette