VirtualBox

Ignore:
Timestamp:
Dec 4, 2019 1:19:31 PM (5 years ago)
Author:
vboxsync
svn:sync-xref-src-repo-rev:
135217
Message:

Runtime/ioqueue: Finish the fallback file provider

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/src/VBox/Runtime/common/ioqueue/ioqueue-stdfile-provider.cpp

    r79983 r82383  
    5252/** The waiting thread was interrupted by the external wakeup call. */
    5353#define RTIOQUEUE_STDFILE_PROV_STATE_F_EVTWAIT_INTR             RT_BIT(1)
     54#define RTIOQUEUE_STDFILE_PROV_STATE_F_EVTWAIT_INTR_BIT         1
    5455/** The I/O queue worker thread needs to be woken up to process new requests. */
    5556#define RTIOQUEUE_STDFILE_PROV_STATE_F_WORKER_NEED_WAKEUP       RT_BIT(2)
     
    109110    /** Submission queue producer index. */
    110111    volatile uint32_t           idxSqProd;
     112    /** Submission queue producer value for any uncommitted requests. */
     113    uint32_t                    idxSqProdUncommit;
    111114    /** Submission queue consumer index. */
    112115    volatile uint32_t           idxSqCons;
     
    178181
    179182    /* Write the result back into the completion queue. */
    180     pCqEntry->rcReq  = rcReq;
    181     pCqEntry->pvUser = pSqEntry->pvUser;
     183    pCqEntry->rcReq    = rcReq;
     184    pCqEntry->pvUser   = pSqEntry->pvUser;
     185    pCqEntry->cbXfered = RT_SUCCESS(rcReq) ? pSqEntry->cbReq : 0;
    182186}
    183187
     
    215219
    216220        /* Process all requests. */
     221        uint32_t cCqFree = 0;
     222        if (idxCqCons > pThis->idxCqProd)
     223            cCqFree = pThis->cCqEntries - (pThis->cCqEntries - idxCqCons) - pThis->idxCqProd;
     224        else
     225            cCqFree = pThis->cCqEntries - pThis->idxCqProd - idxCqCons;
    217226        do
    218227        {
    219228            while (   idxSqCons != idxSqProd
    220                    && idxCqCons != pThis->idxCqProd)
     229                   && cCqFree)
    221230            {
    222231                PCRTIOQUEUESSQENTRY pSqEntry = &pThis->paSqEntryBase[idxSqCons];
     
    227236
    228237                idxSqCons = (idxSqCons + 1) % pThis->cSqEntries;
     238                cCqFree--;
    229239                pThis->idxCqProd = (pThis->idxCqProd + 1) % pThis->cCqEntries;
     240                ASMAtomicWriteU32(&pThis->idxSqCons, idxSqCons);
    230241                ASMWriteFence();
    231242                if (ASMAtomicReadU32(&pThis->fState) & RTIOQUEUE_STDFILE_PROV_STATE_F_EVTWAIT_NEED_WAKEUP)
     
    236247            }
    237248
    238             ASMWriteFence();
    239             ASMAtomicWriteU32(&pThis->idxSqCons, idxSqCons);
    240249            idxSqProd = ASMAtomicReadU32(&pThis->idxSqProd);
    241             idxSqCons = ASMAtomicReadU32(&pThis->idxSqCons);
    242         } while (idxSqCons != idxSqProd);
     250        } while (   idxSqCons != idxSqProd
     251                 && cCqFree);
    243252    }
    244253
     
    264273    int rc = VINF_SUCCESS;
    265274
    266     pThis->cSqEntries = cSqEntries;
    267     pThis->cCqEntries = cCqEntries;
    268     pThis->idxSqProd  = 0;
    269     pThis->idxSqCons  = 0;
    270     pThis->idxCqProd  = 0;
    271     pThis->idxCqCons  = 0;
    272     pThis->fShutdown  = false;
    273     pThis->fState     = 0;
     275    cSqEntries++;
     276    cCqEntries++;
     277
     278    pThis->cSqEntries        = cSqEntries;
     279    pThis->cCqEntries        = cCqEntries;
     280    pThis->idxSqProd         = 0;
     281    pThis->idxSqProdUncommit = 0;
     282    pThis->idxSqCons         = 0;
     283    pThis->idxCqProd         = 0;
     284    pThis->idxCqCons         = 0;
     285    pThis->fShutdown         = false;
     286    pThis->fState            = 0;
    274287
    275288    pThis->paSqEntryBase = (PRTIOQUEUESSQENTRY)RTMemAllocZ(cSqEntries * sizeof(RTIOQUEUESSQENTRY));
     
    361374{
    362375    PRTIOQUEUEPROVINT pThis = hIoQueueProv;
    363     uint32_t idxSqProd = ASMAtomicReadU32(&pThis->idxSqProd);
    364     PRTIOQUEUESSQENTRY pSqEntry = &pThis->paSqEntryBase[idxSqProd];
     376    PRTIOQUEUESSQENTRY pSqEntry = &pThis->paSqEntryBase[pThis->idxSqProdUncommit];
    365377
    366378    pSqEntry->hFile     = pHandle->u.hFile;
     
    372384    pSqEntry->fSg       = false;
    373385    pSqEntry->u.pvBuf   = pvBuf;
    374     ASMWriteFence();
    375 
    376     idxSqProd = (idxSqProd + 1) % pThis->cSqEntries;
    377     ASMAtomicWriteU32(&pThis->idxSqProd, idxSqProd);
     386
     387    pThis->idxSqProdUncommit = (pThis->idxSqProdUncommit + 1) % pThis->cSqEntries;
    378388    return VINF_SUCCESS;
    379389}
     
    386396{
    387397    PRTIOQUEUEPROVINT pThis = hIoQueueProv;
    388     uint32_t idxSqProd = ASMAtomicReadU32(&pThis->idxSqProd);
    389     PRTIOQUEUESSQENTRY pSqEntry = &pThis->paSqEntryBase[idxSqProd];
     398    PRTIOQUEUESSQENTRY pSqEntry = &pThis->paSqEntryBase[pThis->idxSqProdUncommit];
    390399
    391400    pSqEntry->hFile     = pHandle->u.hFile;
     
    397406    pSqEntry->fSg       = true;
    398407    pSqEntry->u.pSgBuf  = pSgBuf;
    399     ASMWriteFence();
    400 
    401     idxSqProd = (idxSqProd + 1) % pThis->cSqEntries;
    402     ASMAtomicWriteU32(&pThis->idxSqProd, idxSqProd);
     408
     409    pThis->idxSqProdUncommit = (pThis->idxSqProdUncommit + 1) % pThis->cSqEntries;
    403410    return VINF_SUCCESS;
    404411}
     
    409416{
    410417    PRTIOQUEUEPROVINT pThis = hIoQueueProv;
    411     RT_NOREF(pcReqsCommitted);
    412 
     418
     419    if (pThis->idxSqProd > pThis->idxSqProdUncommit)
     420        *pcReqsCommitted = pThis->cSqEntries - pThis->idxSqProd + pThis->idxSqProdUncommit;
     421    else
     422        *pcReqsCommitted = pThis->idxSqProdUncommit - pThis->idxSqProd;
     423
     424    ASMWriteFence();
     425    ASMAtomicWriteU32(&pThis->idxSqProd, pThis->idxSqProdUncommit);
    413426    return RTSemEventSignal(pThis->hSemEvtWorker);
    414427}
     
    419432                                                      uint32_t cMinWait, uint32_t *pcCEvt, uint32_t fFlags)
    420433{
    421     PRTIOQUEUEPROVINT pThis = hIoQueueProv;
    422     RT_NOREF(pThis, paCEvt, cCEvt, cMinWait, pcCEvt, fFlags);
    423 
    424     return VERR_NOT_IMPLEMENTED;
     434    RT_NOREF(fFlags);
     435
     436    PRTIOQUEUEPROVINT pThis = hIoQueueProv;
     437    int rc = VINF_SUCCESS;
     438    uint32_t idxCEvt = 0;
     439
     440    while (   RT_SUCCESS(rc)
     441           && cMinWait
     442           && cCEvt)
     443    {
     444        ASMAtomicOrU32(&pThis->fState, RTIOQUEUE_STDFILE_PROV_STATE_F_EVTWAIT_NEED_WAKEUP);
     445        uint32_t idxCqProd = ASMAtomicReadU32(&pThis->idxCqProd);
     446        uint32_t idxCqCons = ASMAtomicReadU32(&pThis->idxCqCons);
     447
     448        if (idxCqCons == idxCqProd)
     449        {
     450            rc = RTSemEventWait(pThis->hSemEvtWaitEvts, RT_INDEFINITE_WAIT);
     451            AssertRC(rc);
     452            if (ASMAtomicBitTestAndClear(&pThis->fState, RTIOQUEUE_STDFILE_PROV_STATE_F_EVTWAIT_INTR_BIT))
     453            {
     454                rc = VERR_INTERRUPTED;
     455                ASMAtomicBitTestAndClear(&pThis->fState, RTIOQUEUE_STDFILE_PROV_STATE_F_WORKER_NEED_WAKEUP_BIT);
     456                break;
     457            }
     458
     459            idxCqProd = ASMAtomicReadU32(&pThis->idxCqProd);
     460            idxCqCons = ASMAtomicReadU32(&pThis->idxCqCons);
     461        }
     462
     463        ASMAtomicBitTestAndClear(&pThis->fState, RTIOQUEUE_STDFILE_PROV_STATE_F_WORKER_NEED_WAKEUP_BIT);
     464
     465        /* Process all requests. */
     466        while (   idxCqCons != idxCqProd
     467               && cCEvt)
     468        {
     469            PRTIOQUEUECEVT pCqEntry = &pThis->paCqEntryBase[idxCqCons];
     470
     471            paCEvt[idxCEvt].rcReq    = pCqEntry->rcReq;
     472            paCEvt[idxCEvt].pvUser   = pCqEntry->pvUser;
     473            paCEvt[idxCEvt].cbXfered = pCqEntry->cbXfered;
     474            ASMReadFence();
     475
     476            idxCEvt++;
     477            cCEvt--;
     478            cMinWait--;
     479
     480            idxCqCons = (idxCqCons + 1) % pThis->cCqEntries;
     481            pThis->idxCqCons = (pThis->idxCqCons + 1) % pThis->cCqEntries;
     482            ASMWriteFence();
     483        }
     484    }
     485
     486    *pcCEvt = idxCEvt;
     487    return rc;
    425488}
    426489
Note: See TracChangeset for help on using the changeset viewer.

© 2024 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette