VirtualBox

Changeset 27313 in vbox


Ignore:
Timestamp:
Mar 12, 2010 2:25:27 AM (15 years ago)
Author:
vboxsync
Message:

pipe-win.cpp: Pipe implementation for windows; fails two test wrt to writes supposed to filling up the pipe buffer.

Location:
trunk/src/VBox/Runtime
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • trunk/src/VBox/Runtime/r3/win/pipe-win.cpp

    r26824 r27313  
    3838#include "internal/iprt.h"
    3939
     40#include <iprt/asm.h>
    4041#include <iprt/assert.h>
     42#include <iprt/critsect.h>
    4143#include <iprt/err.h>
     44#include <iprt/mem.h>
     45#include <iprt/string.h>
     46#include <iprt/process.h>
     47#include <iprt/thread.h>
     48#include <iprt/time.h>
     49#include "internal/magics.h"
     50
     51
     52/*******************************************************************************
     53*   Structures and Typedefs                                                    *
     54*******************************************************************************/
     55typedef struct RTPIPEINTERNAL
     56{
     57    /** Magic value (RTPIPE_MAGIC). */
     58    uint32_t            u32Magic;
     59    /** The pipe handle. */
     60    HANDLE              hPipe;
     61    /** Set if this is the read end, clear if it's the write end. */
     62    bool                fRead;
     63    /** Set if there is already pending I/O. */
     64    bool                fIOPending;
     65    /** The number of users of the current mode. */
     66    uint32_t            cModeUsers;
     67    /** The overlapped I/O structure we use. */
     68    OVERLAPPED          Overlapped;
     69    /** Bounce buffer for writes. */
     70    uint8_t            *pbBounceBuf;
     71    /** Amount of used buffer space. */
     72    size_t              cbBounceBufUsed;
     73    /** Amount of allocated buffer space. */
     74    size_t              cbBounceBufAlloc;
     75    /** Critical section protecting the above members.
     76     * (Taking the lazy/simple approach.) */
     77    RTCRITSECT          CritSect;
     78} RTPIPEINTERNAL;
    4279
    4380
    4481RTDECL(int)  RTPipeCreate(PRTPIPE phPipeRead, PRTPIPE phPipeWrite, uint32_t fFlags)
    4582{
     83    AssertPtrReturn(phPipeRead, VERR_INVALID_POINTER);
     84    AssertPtrReturn(phPipeWrite, VERR_INVALID_POINTER);
     85    AssertReturn(!(fFlags & ~RTPIPE_C_VALID_MASK), VERR_INVALID_PARAMETER);
     86
     87    /*
     88     * Create the read end of the pipe.
     89     */
     90    DWORD   dwErr;
     91    HANDLE  hPipeR;
     92    HANDLE  hPipeW;
     93    int     rc;
     94    for (;;)
     95    {
     96        static volatile uint32_t    g_iNextPipe = 0;
     97        char                        szName[128];
     98        RTStrPrintf(szName, sizeof(szName), "\\\\.\\pipe\\iprt-pipe-%u-%u", RTProcSelf(), ASMAtomicIncU32(&g_iNextPipe));
     99
     100        SECURITY_ATTRIBUTES  SecurityAttributes;
     101        PSECURITY_ATTRIBUTES pSecurityAttributes = NULL;
     102        if (fFlags & RTPIPE_C_INHERIT_READ)
     103        {
     104            SecurityAttributes.nLength              = sizeof(SecurityAttributes);
     105            SecurityAttributes.lpSecurityDescriptor = NULL;
     106            SecurityAttributes.bInheritHandle       = TRUE;
     107            pSecurityAttributes = &SecurityAttributes;
     108        }
     109
     110        DWORD dwOpenMode = PIPE_ACCESS_INBOUND | FILE_FLAG_OVERLAPPED;
     111#ifdef FILE_FLAG_FIRST_PIPE_INSTANCE
     112        dwOpenMode |= FILE_FLAG_FIRST_PIPE_INSTANCE;
     113#endif
     114
     115        DWORD dwPipeMode = PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT;
     116#ifdef PIPE_REJECT_REMOTE_CLIENTS
     117        dwPipeMode |= PIPE_REJECT_REMOTE_CLIENTS;
     118#endif
     119
     120        hPipeR = CreateNamedPipeA(szName, dwOpenMode, dwPipeMode, 1 /*nMaxInstances*/, _64K,  _64K,
     121                                  NMPWAIT_USE_DEFAULT_WAIT, pSecurityAttributes);
     122#ifdef PIPE_REJECT_REMOTE_CLIENTS
     123        if (hPipeR == INVALID_HANDLE_VALUE && GetLastError() == ERROR_INVALID_PARAMETER)
     124        {
     125            dwPipeMode &= ~PIPE_REJECT_REMOTE_CLIENTS;
     126            hPipeR = CreateNamedPipeA(szName, dwOpenMode, dwPipeMode, 1 /*nMaxInstances*/, _64K,  _64K,
     127                                      NMPWAIT_USE_DEFAULT_WAIT, pSecurityAttributes);
     128        }
     129#endif
     130#ifdef FILE_FLAG_FIRST_PIPE_INSTANCE
     131        if (hPipeR == INVALID_HANDLE_VALUE && GetLastError() == ERROR_INVALID_PARAMETER)
     132        {
     133            dwOpenMode &= ~FILE_FLAG_FIRST_PIPE_INSTANCE;
     134            hPipeR = CreateNamedPipeA(szName, dwOpenMode, dwPipeMode, 1 /*nMaxInstances*/, _64K,  _64K,
     135                                      NMPWAIT_USE_DEFAULT_WAIT, pSecurityAttributes);
     136        }
     137#endif
     138        if (hPipeR != INVALID_HANDLE_VALUE)
     139        {
     140            /*
     141             * Connect to the pipe (the write end).
     142             * We add FILE_READ_ATTRIBUTES here to make sure we can query the
     143             * pipe state later on.
     144             */
     145            pSecurityAttributes = NULL;
     146            if (fFlags & RTPIPE_C_INHERIT_WRITE)
     147            {
     148                SecurityAttributes.nLength              = sizeof(SecurityAttributes);
     149                SecurityAttributes.lpSecurityDescriptor = NULL;
     150                SecurityAttributes.bInheritHandle       = TRUE;
     151                pSecurityAttributes = &SecurityAttributes;
     152            }
     153
     154            hPipeW = CreateFileA(szName,
     155                                 GENERIC_WRITE | FILE_READ_ATTRIBUTES /*dwDesiredAccess*/,
     156                                 0 /*dwShareMode*/,
     157                                 pSecurityAttributes,
     158                                 OPEN_EXISTING /* dwCreationDisposition */,
     159                                 FILE_FLAG_OVERLAPPED /*dwFlagsAndAttributes*/,
     160                                 NULL /*hTemplateFile*/);
     161            if (hPipeW != INVALID_HANDLE_VALUE)
     162                break;
     163            dwErr = GetLastError();
     164            CloseHandle(hPipeR);
     165        }
     166        else
     167            dwErr = GetLastError();
     168        if (   dwErr != ERROR_PIPE_BUSY     /* already exist - compatible */
     169            && dwErr != ERROR_ACCESS_DENIED /* already exist - incompatible */)
     170            return RTErrConvertFromWin32(dwErr);
     171        /* else: try again with a new name */
     172    }
     173
     174    /*
     175     * Create the two handles.
     176     */
     177    RTPIPEINTERNAL *pThisR = (RTPIPEINTERNAL *)RTMemAllocZ(sizeof(RTPIPEINTERNAL));
     178    if (pThisR)
     179    {
     180        RTPIPEINTERNAL *pThisW = (RTPIPEINTERNAL *)RTMemAllocZ(sizeof(RTPIPEINTERNAL));
     181        if (pThisW)
     182        {
     183            rc = RTCritSectInit(&pThisR->CritSect);
     184            if (RT_SUCCESS(rc))
     185            {
     186                rc = RTCritSectInit(&pThisW->CritSect);
     187                if (RT_SUCCESS(rc))
     188                {
     189                    pThisR->Overlapped.hEvent = CreateEvent(NULL, TRUE /*fManualReset*/,
     190                                                            TRUE /*fInitialState*/, NULL /*pName*/);
     191                    if (pThisR->Overlapped.hEvent != NULL)
     192                    {
     193                        pThisW->Overlapped.hEvent = CreateEvent(NULL, TRUE /*fManualReset*/,
     194                                                                TRUE /*fInitialState*/, NULL /*pName*/);
     195                        if (pThisW->Overlapped.hEvent != NULL)
     196                        {
     197                            pThisR->u32Magic        = RTPIPE_MAGIC;
     198                            pThisW->u32Magic        = RTPIPE_MAGIC;
     199                            pThisR->hPipe           = hPipeR;
     200                            pThisW->hPipe           = hPipeW;
     201                            pThisR->fRead           = true;
     202                            pThisW->fRead           = false;
     203                            pThisR->fIOPending      = false;
     204                            pThisW->fIOPending      = false;
     205                            //pThisR->cModeUsers      = 0;
     206                            //pThisW->cModeUsers      = 0;
     207                            //pThisR->pbBounceBuf     = NULL;
     208                            //pThisW->pbBounceBuf     = NULL;
     209                            //pThisR->cbBounceBufUsed = 0;
     210                            //pThisW->cbBounceBufUsed = 0;
     211                            //pThisR->cbBounceBufAlloc= 0;
     212                            //pThisW->cbBounceBufAlloc= 0;
     213
     214                            *phPipeRead  = pThisR;
     215                            *phPipeWrite = pThisW;
     216                            return VINF_SUCCESS;
     217                        }
     218                        CloseHandle(pThisR->Overlapped.hEvent);
     219                    }
     220                    RTCritSectDelete(&pThisW->CritSect);
     221                }
     222                RTCritSectDelete(&pThisR->CritSect);
     223            }
     224            RTMemFree(pThisW);
     225        }
     226        else
     227            rc = VERR_NO_MEMORY;
     228        RTMemFree(pThisR);
     229    }
     230    else
     231        rc = VERR_NO_MEMORY;
     232
     233    CloseHandle(hPipeR);
     234    CloseHandle(hPipeW);
     235    return rc;
     236}
     237
     238
     239/**
     240 * Common worker for handling I/O completion.
     241 *
     242 * This is used by RTPipeClose, RTPipeWrite and RTPipeWriteBlocking.
     243 *
     244 * @returns IPRT status code.
     245 * @param   pThis               The pipe instance handle.
     246 */
     247static int rtPipeWriteCheckCompletion(RTPIPEINTERNAL *pThis)
     248{
     249    int rc;
     250    DWORD dwRc = WaitForSingleObject(pThis->Overlapped.hEvent, 0);
     251    if (dwRc == WAIT_OBJECT_0)
     252    {
     253        DWORD cbWritten = 0;
     254        if (GetOverlappedResult(pThis->hPipe, &pThis->Overlapped, &cbWritten, TRUE))
     255        {
     256            for (;;)
     257            {
     258                if (cbWritten >= pThis->cbBounceBufUsed)
     259                {
     260                    pThis->fIOPending = false;
     261                    rc = VINF_SUCCESS;
     262                    break;
     263                }
     264
     265                /* resubmit the remainder of the buffer - can this actually happen? */
     266                memmove(&pThis->pbBounceBuf[0], &pThis->pbBounceBuf[cbWritten], pThis->cbBounceBufUsed - cbWritten);
     267                rc = ResetEvent(pThis->Overlapped.hEvent); Assert(rc == TRUE);
     268                if (!WriteFile(pThis->hPipe, pThis->pbBounceBuf, (DWORD)pThis->cbBounceBufUsed,
     269                               &cbWritten, &pThis->Overlapped))
     270                {
     271                    if (GetLastError() == ERROR_IO_PENDING)
     272                        rc = VINF_TRY_AGAIN;
     273                    else
     274                    {
     275                        pThis->fIOPending = false;
     276                        if (GetLastError() == ERROR_NO_DATA)
     277                            rc = VERR_BROKEN_PIPE;
     278                        else
     279                            rc = RTErrConvertFromWin32(GetLastError());
     280                    }
     281                    break;
     282                }
     283                Assert(cbWritten > 0);
     284            }
     285        }
     286        else
     287        {
     288            pThis->fIOPending = false;
     289            rc = RTErrConvertFromWin32(GetLastError());
     290        }
     291    }
     292    else if (dwRc == WAIT_TIMEOUT)
     293        rc = VINF_TRY_AGAIN;
     294    else
     295    {
     296        pThis->fIOPending = false;
     297        if (dwRc == WAIT_ABANDONED)
     298            rc = VERR_INVALID_HANDLE;
     299        else
     300            rc = RTErrConvertFromWin32(GetLastError());
     301    }
     302    return rc;
     303}
     304
     305
     306
     307RTDECL(int)  RTPipeClose(RTPIPE hPipe)
     308{
     309    RTPIPEINTERNAL *pThis = hPipe;
     310    if (pThis == NIL_RTPIPE)
     311        return VINF_SUCCESS;
     312    AssertPtrReturn(pThis, VERR_INVALID_PARAMETER);
     313    AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE);
     314
     315    /*
     316     * Do the cleanup.
     317     */
     318    AssertReturn(ASMAtomicCmpXchgU32(&pThis->u32Magic, ~RTPIPE_MAGIC, RTPIPE_MAGIC), VERR_INVALID_HANDLE);
     319    RTCritSectEnter(&pThis->CritSect);
     320    Assert(pThis->cModeUsers == 0);
     321
     322    if (!pThis->fRead && pThis->fIOPending)
     323        rtPipeWriteCheckCompletion(pThis);
     324
     325    CloseHandle(pThis->hPipe);
     326    pThis->hPipe = INVALID_HANDLE_VALUE;
     327
     328    CloseHandle(pThis->Overlapped.hEvent);
     329    pThis->Overlapped.hEvent = NULL;
     330
     331    RTMemFree(pThis->pbBounceBuf);
     332    pThis->pbBounceBuf = NULL;
     333
     334    RTCritSectLeave(&pThis->CritSect);
     335    RTCritSectDelete(&pThis->CritSect);
     336
     337    RTMemFree(pThis);
     338
     339    return VINF_SUCCESS;
     340}
     341
     342
     343RTDECL(RTHCINTPTR) RTPipeToNative(RTPIPE hPipe)
     344{
     345    RTPIPEINTERNAL *pThis = hPipe;
     346    AssertPtrReturn(pThis, (RTHCINTPTR)(unsigned int)-1);
     347    AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, (RTHCINTPTR)(unsigned int)-1);
     348
     349    return (RTHCINTPTR)pThis->hPipe;
     350}
     351
     352
     353RTDECL(int) RTPipeRead(RTPIPE hPipe, void *pvBuf, size_t cbToRead, size_t *pcbRead)
     354{
     355    RTPIPEINTERNAL *pThis = hPipe;
     356    AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
     357    AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE);
     358    AssertReturn(pThis->fRead, VERR_ACCESS_DENIED);
     359    AssertPtr(pcbRead);
     360    AssertPtr(pvBuf);
     361
     362    int rc = RTCritSectEnter(&pThis->CritSect);
     363    if (RT_SUCCESS(rc))
     364    {
     365        /* No concurrent readers, sorry. */
     366        if (pThis->cModeUsers == 0)
     367        {
     368            pThis->cModeUsers++;
     369
     370            /*
     371             * Kick of a an overlapped read.  It should return immedately if
     372             * there is bytes in the buffer.  If not, we'll cancel it and see
     373             * what we get back.
     374             */
     375            rc = ResetEvent(pThis->Overlapped.hEvent); Assert(rc == TRUE);
     376            DWORD cbRead = 0;
     377            if (   cbToRead == 0
     378                || ReadFile(pThis->hPipe, pvBuf,
     379                            cbToRead <= ~(DWORD)0 ? (DWORD)cbToRead : ~(DWORD)0,
     380                            &cbRead, &pThis->Overlapped))
     381            {
     382                *pcbRead = cbRead;
     383                rc = VINF_SUCCESS;
     384            }
     385            else if (GetLastError() == ERROR_IO_PENDING)
     386            {
     387                pThis->fIOPending = true;
     388                RTCritSectLeave(&pThis->CritSect);
     389
     390                if (!CancelIo(pThis->hPipe))
     391                    WaitForSingleObject(pThis->Overlapped.hEvent, INFINITE);
     392                if (GetOverlappedResult(pThis->hPipe, &pThis->Overlapped, &cbRead, TRUE /*fWait*/))
     393                {
     394                    *pcbRead = cbRead;
     395                    rc = VINF_SUCCESS;
     396                }
     397                else if (GetLastError() == ERROR_OPERATION_ABORTED)
     398                {
     399                    *pcbRead = 0;
     400                    rc = VINF_TRY_AGAIN;
     401                }
     402                else
     403                    rc = RTErrConvertFromWin32(GetLastError());
     404
     405                RTCritSectEnter(&pThis->CritSect);
     406                pThis->fIOPending = false;
     407            }
     408            else
     409                rc = RTErrConvertFromWin32(GetLastError());
     410
     411            pThis->cModeUsers--;
     412        }
     413        else
     414            rc = VERR_WRONG_ORDER;
     415        RTCritSectLeave(&pThis->CritSect);
     416    }
     417    return rc;
     418}
     419
     420
     421RTDECL(int) RTPipeReadBlocking(RTPIPE hPipe, void *pvBuf, size_t cbToRead, size_t *pcbRead)
     422{
     423    RTPIPEINTERNAL *pThis = hPipe;
     424    AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
     425    AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE);
     426    AssertReturn(pThis->fRead, VERR_ACCESS_DENIED);
     427    AssertPtr(pvBuf);
     428
     429    int rc = RTCritSectEnter(&pThis->CritSect);
     430    if (RT_SUCCESS(rc))
     431    {
     432        /* No concurrent readers, sorry. */
     433        if (pThis->cModeUsers == 0)
     434        {
     435            pThis->cModeUsers++;
     436
     437            size_t cbTotalRead = 0;
     438            while (cbToRead > 0)
     439            {
     440                /*
     441                 * Kick of a an overlapped read.  It should return immedately if
     442                 * there is bytes in the buffer.  If not, we'll cancel it and see
     443                 * what we get back.
     444                 */
     445                rc = ResetEvent(pThis->Overlapped.hEvent); Assert(rc == TRUE);
     446                DWORD cbRead = 0;
     447                pThis->fIOPending = true;
     448                RTCritSectLeave(&pThis->CritSect);
     449
     450                if (ReadFile(pThis->hPipe, pvBuf,
     451                             cbToRead <= ~(DWORD)0 ? (DWORD)cbToRead : ~(DWORD)0,
     452                             &cbRead, &pThis->Overlapped))
     453                    rc = VINF_SUCCESS;
     454                else if (GetLastError() == ERROR_IO_PENDING)
     455                {
     456                    WaitForSingleObject(pThis->Overlapped.hEvent, INFINITE);
     457                    if (GetOverlappedResult(pThis->hPipe, &pThis->Overlapped, &cbRead, TRUE /*fWait*/))
     458                        rc = VINF_SUCCESS;
     459                    else
     460                        rc = RTErrConvertFromWin32(GetLastError());
     461                }
     462                else
     463                    rc = RTErrConvertFromWin32(GetLastError());
     464
     465                RTCritSectEnter(&pThis->CritSect);
     466                pThis->fIOPending = false;
     467                if (RT_FAILURE(rc))
     468                    break;
     469
     470                /* advance */
     471                cbToRead    -= cbRead;
     472                cbTotalRead += cbRead;
     473                pvBuf        = (uint8_t *)pvBuf + cbRead;
     474            }
     475
     476            if (pcbRead)
     477            {
     478                *pcbRead = cbTotalRead;
     479                if (   RT_FAILURE(rc)
     480                    && cbTotalRead
     481                    && rc != VERR_INVALID_POINTER)
     482                    rc = VINF_SUCCESS;
     483            }
     484
     485            pThis->cModeUsers--;
     486        }
     487        else
     488            rc = VERR_WRONG_ORDER;
     489        RTCritSectLeave(&pThis->CritSect);
     490    }
     491    return rc;
     492}
     493
     494
     495RTDECL(int) RTPipeWrite(RTPIPE hPipe, const void *pvBuf, size_t cbToWrite, size_t *pcbWritten)
     496{
     497    RTPIPEINTERNAL *pThis = hPipe;
     498    AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
     499    AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE);
     500    AssertReturn(!pThis->fRead, VERR_ACCESS_DENIED);
     501    AssertPtr(pcbWritten);
     502    AssertPtr(pvBuf);
     503
     504    int rc = RTCritSectEnter(&pThis->CritSect);
     505    if (RT_SUCCESS(rc))
     506    {
     507        /* No concurrent readers, sorry. */
     508        if (pThis->cModeUsers == 0)
     509        {
     510            pThis->cModeUsers++;
     511
     512            /* If I/O is pending, check if it has completed. */
     513            if (pThis->fIOPending)
     514                rc = rtPipeWriteCheckCompletion(pThis);
     515            else
     516                rc = VINF_SUCCESS;
     517            if (rc == VINF_SUCCESS)
     518            {
     519                Assert(!pThis->fIOPending);
     520
     521                /* Do the bounce buffering. */
     522                if (    pThis->cbBounceBufAlloc < cbToWrite
     523                    &&  pThis->cbBounceBufAlloc < _64K)
     524                {
     525                    if (cbToWrite > _64K)
     526                        cbToWrite = _64K;
     527                    void *pv = RTMemRealloc(pThis->pbBounceBuf, RT_ALIGN_Z(cbToWrite, _1K));
     528                    if (pv)
     529                    {
     530                        pThis->pbBounceBuf = (uint8_t *)pv;
     531                        pThis->cbBounceBufAlloc = RT_ALIGN_Z(cbToWrite, _1K);
     532                    }
     533                    else
     534                        rc = VERR_NO_MEMORY;
     535                }
     536                else if (cbToWrite > _64K)
     537                    cbToWrite = _64K;
     538                if (RT_SUCCESS(rc) && cbToWrite)
     539                {
     540                    memcpy(pThis->pbBounceBuf, pvBuf, cbToWrite);
     541                    pThis->cbBounceBufUsed = (uint32_t)cbToWrite;
     542
     543                    /* Submit the write. */
     544                    rc = ResetEvent(pThis->Overlapped.hEvent); Assert(rc == TRUE);
     545                    DWORD cbWritten = 0;
     546                    if (WriteFile(pThis->hPipe, pThis->pbBounceBuf, (DWORD)pThis->cbBounceBufUsed,
     547                                  &cbWritten, &pThis->Overlapped))
     548                    {
     549                        *pcbWritten = cbWritten;
     550                        rc = VINF_SUCCESS;
     551                    }
     552                    else if (GetLastError() == ERROR_IO_PENDING)
     553                    {
     554                        *pcbWritten = cbWritten;
     555                        pThis->fIOPending = true;
     556                        rc = VINF_SUCCESS;
     557                    }
     558                    else if (GetLastError() == ERROR_NO_DATA)
     559                        rc = VERR_BROKEN_PIPE;
     560                    else
     561                        rc = RTErrConvertFromWin32(GetLastError());
     562                }
     563                else if (RT_SUCCESS(rc))
     564                    *pcbWritten = 0;
     565            }
     566            else if (RT_SUCCESS(rc))
     567                *pcbWritten = 0;
     568
     569            pThis->cModeUsers--;
     570        }
     571        else
     572            rc = VERR_WRONG_ORDER;
     573        RTCritSectLeave(&pThis->CritSect);
     574    }
     575    return rc;
     576}
     577
     578
     579RTDECL(int) RTPipeWriteBlocking(RTPIPE hPipe, const void *pvBuf, size_t cbToWrite, size_t *pcbWritten)
     580{
     581    RTPIPEINTERNAL *pThis = hPipe;
     582    AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
     583    AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE);
     584    AssertReturn(!pThis->fRead, VERR_ACCESS_DENIED);
     585    AssertPtr(pvBuf);
     586    AssertPtrNull(pcbWritten);
     587
     588    int rc = RTCritSectEnter(&pThis->CritSect);
     589    if (RT_SUCCESS(rc))
     590    {
     591        /* No concurrent readers, sorry. */
     592        if (pThis->cModeUsers == 0)
     593        {
     594            pThis->cModeUsers++;
     595
     596            /*
     597             * If I/O is pending, wait for it to complete.
     598             */
     599            if (pThis->fIOPending)
     600            {
     601                rc = rtPipeWriteCheckCompletion(pThis);
     602                while (rc == VINF_TRY_AGAIN)
     603                {
     604                    Assert(pThis->fIOPending);
     605                    HANDLE hEvent = pThis->Overlapped.hEvent;
     606                    RTCritSectLeave(&pThis->CritSect);
     607                    WaitForSingleObject(pThis->Overlapped.hEvent, INFINITE);
     608                    RTCritSectEnter(&pThis->CritSect);
     609                }
     610            }
     611            if (RT_SUCCESS(rc))
     612            {
     613                Assert(!pThis->fIOPending);
     614
     615                /*
     616                 * Try write everything.
     617                 * No bounce buffering, cModeUsers protects us.
     618                 */
     619                size_t cbTotalWritten = 0;
     620                while (cbToWrite > 0)
     621                {
     622                    rc = ResetEvent(pThis->Overlapped.hEvent); Assert(rc == TRUE);
     623                    pThis->fIOPending = true;
     624                    RTCritSectLeave(&pThis->CritSect);
     625
     626                    DWORD cbWritten = 0;
     627                    if (WriteFile(pThis->hPipe, pvBuf,
     628                                  cbToWrite <= ~(DWORD)0 ? (DWORD)cbToWrite : ~(DWORD)0,
     629                                  &cbWritten, &pThis->Overlapped))
     630                        rc = VINF_SUCCESS;
     631                    else if (GetLastError() == ERROR_IO_PENDING)
     632                    {
     633                        WaitForSingleObject(pThis->Overlapped.hEvent, INFINITE);
     634                        if (GetOverlappedResult(pThis->hPipe, &pThis->Overlapped, &cbWritten, TRUE /*fWait*/))
     635                            rc = VINF_SUCCESS;
     636                        else
     637                            rc = RTErrConvertFromWin32(GetLastError());
     638                    }
     639                    else if (GetLastError() == ERROR_NO_DATA)
     640                        rc = VERR_BROKEN_PIPE;
     641                    else
     642                        rc = RTErrConvertFromWin32(GetLastError());
     643
     644                    RTCritSectEnter(&pThis->CritSect);
     645                    pThis->fIOPending = false;
     646                    if (RT_FAILURE(rc))
     647                        break;
     648
     649                    /* advance */
     650                    pvBuf           = (char const *)pvBuf + cbWritten;
     651                    cbTotalWritten += cbWritten;
     652                    cbToWrite      -= cbWritten;
     653                }
     654
     655                if (pcbWritten)
     656                {
     657                    *pcbWritten = cbTotalWritten;
     658                    if (   RT_FAILURE(rc)
     659                        && cbTotalWritten
     660                        && rc != VERR_INVALID_POINTER)
     661                        rc = VINF_SUCCESS;
     662                }
     663            }
     664
     665            pThis->cModeUsers--;
     666        }
     667        else
     668            rc = VERR_WRONG_ORDER;
     669        RTCritSectLeave(&pThis->CritSect);
     670    }
     671    return rc;
     672
     673#if 1
    46674    return VERR_NOT_IMPLEMENTED;
    47 }
    48 
    49 
    50 RTDECL(int)  RTPipeClose(RTPIPE hPipe)
    51 {
    52     return VERR_NOT_IMPLEMENTED;
    53 }
    54 
    55 
    56 RTDECL(RTHCINTPTR) RTPipeToNative(RTPIPE hPipe)
    57 {
    58     return (RTHCINTPTR)INVALID_HANDLE_VALUE;
    59 }
    60 
    61 
    62 RTDECL(int) RTPipeRead(RTPIPE hPipe, void *pvBuf, size_t cbToRead, size_t *pcbRead)
    63 {
    64     return VERR_NOT_IMPLEMENTED;
    65 }
    66 
    67 
    68 RTDECL(int) RTPipeReadBlocking(RTPIPE hPipe, void *pvBuf, size_t cbToRead, size_t *pcbRead)
    69 {
    70     return VERR_NOT_IMPLEMENTED;
    71 }
    72 
    73 
    74 RTDECL(int) RTPipeWrite(RTPIPE hPipe, const void *pvBuf, size_t cbToWrite, size_t *pcbWritten)
    75 {
    76     return VERR_NOT_IMPLEMENTED;
    77 }
    78 
    79 
    80 RTDECL(int) RTPipeWriteBlocking(RTPIPE hPipe, const void *pvBuf, size_t cbToWrite, size_t *pcbWritten)
    81 {
    82     return VERR_NOT_IMPLEMENTED;
     675#else
     676    int rc = rtPipeTryBlocking(pThis);
     677    if (RT_SUCCESS(rc))
     678    {
     679        size_t cbTotalWritten = 0;
     680        while (cbToWrite > 0)
     681        {
     682            ssize_t cbWritten = write(pThis->fd, pvBuf, RT_MIN(cbToWrite, SSIZE_MAX));
     683            if (cbWritten < 0)
     684            {
     685                rc = RTErrConvertFromErrno(errno);
     686                break;
     687            }
     688
     689            /* advance */
     690            pvBuf           = (char const *)pvBuf + cbWritten;
     691            cbTotalWritten += cbWritten;
     692            cbToWrite      -= cbWritten;
     693        }
     694
     695        if (pcbWritten)
     696        {
     697            *pcbWritten = cbTotalWritten;
     698            if (   RT_FAILURE(rc)
     699                && cbTotalWritten
     700                && rc != VERR_INVALID_POINTER)
     701                rc = VINF_SUCCESS;
     702        }
     703
     704        ASMAtomicDecU32(&pThis->u32State);
     705    }
     706    return rc;
     707#endif
    83708}
    84709
     
    86711RTDECL(int) RTPipeFlush(RTPIPE hPipe)
    87712{
    88     return VERR_NOT_IMPLEMENTED;
     713    RTPIPEINTERNAL *pThis = hPipe;
     714    AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
     715    AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE);
     716    AssertReturn(!pThis->fRead, VERR_ACCESS_DENIED);
     717#if 1
     718    return VERR_NOT_SUPPORTED;
     719#else
     720
     721    if (fsync(pThis->fd))
     722    {
     723        if (errno == EINVAL || errno == ENOTSUP)
     724            return VERR_NOT_SUPPORTED;
     725        return RTErrConvertFromErrno(errno);
     726    }
     727    return VINF_SUCCESS;
     728#endif
    89729}
    90730
     
    92732RTDECL(int) RTPipeSelectOne(RTPIPE hPipe, RTMSINTERVAL cMillies)
    93733{
    94     return VERR_NOT_IMPLEMENTED;
    95 }
    96 
     734    RTPIPEINTERNAL *pThis = hPipe;
     735    AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
     736    AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE);
     737
     738    uint64_t const StartMsTS = RTTimeMilliTS();
     739
     740    int rc = RTCritSectEnter(&pThis->CritSect);
     741    if (RT_FAILURE(rc))
     742        return rc;
     743    for (unsigned iLoop = 0;; iLoop++)
     744    {
     745        uint8_t abBuf[4];
     746        bool    fPendingRead  = false;
     747        HANDLE  hWait         = INVALID_HANDLE_VALUE;
     748        if (pThis->fRead)
     749        {
     750            if (pThis->fIOPending)
     751                hWait = pThis->Overlapped.hEvent;
     752            else
     753            {
     754                /* Peek at the pipe buffer and see how many bytes it contains. */
     755                DWORD cbAvailable;
     756                if (   PeekNamedPipe(pThis->hPipe, NULL, 0, NULL, &cbAvailable, NULL)
     757                    && cbAvailable > 0)
     758                {
     759                    rc = VINF_SUCCESS;
     760                    break;
     761                }
     762
     763                /* Start a zero byte read operation that we can wait on. */
     764                if (cMillies == 0)
     765                {
     766                    rc = VERR_TIMEOUT;
     767                    break;
     768                }
     769                AssertBreakStmt(pThis->cModeUsers == 0, rc = VERR_INTERNAL_ERROR_5);
     770                rc = ResetEvent(pThis->Overlapped.hEvent); Assert(rc == TRUE);
     771                DWORD cbRead = 0;
     772                if (ReadFile(pThis->hPipe, &abBuf[0], 0, &cbRead, &pThis->Overlapped))
     773                {
     774                    rc = VINF_SUCCESS;
     775                    if (iLoop > 10)
     776                        RTThreadYield();
     777                }
     778                else if (GetLastError() == ERROR_IO_PENDING)
     779                {
     780                    pThis->cModeUsers++;
     781                    pThis->fIOPending = true;
     782                    fPendingRead = true;
     783                    hWait = pThis->Overlapped.hEvent;
     784                }
     785                else
     786                    rc = RTErrConvertFromWin32(GetLastError());
     787            }
     788        }
     789        else
     790        {
     791            if (pThis->fIOPending)
     792                hWait = pThis->Overlapped.hEvent;
     793            else
     794            {
     795                /* If nothing pending, the next write will succeed because
     796                   we buffer it and pretend that it does... */
     797                rc = VINF_SUCCESS;
     798                break;
     799            }
     800        }
     801        if (RT_FAILURE(rc))
     802            break;
     803
     804        /*
     805         * Check for timeout.
     806         */
     807        DWORD cMsMaxWait = INFINITE;
     808        if (   cMillies != RT_INDEFINITE_WAIT
     809            && (   hWait != INVALID_HANDLE_VALUE
     810                || iLoop > 10)
     811           )
     812        {
     813            uint64_t cElapsed = RTTimeMilliTS() - StartMsTS;
     814            if (cElapsed >= cMillies)
     815            {
     816                rc = VERR_TIMEOUT;
     817                break;
     818            }
     819            cMsMaxWait = cMillies - (uint32_t)cElapsed;
     820        }
     821
     822        /*
     823         * Wait.
     824         */
     825        if (hWait != INVALID_HANDLE_VALUE)
     826        {
     827            RTCritSectLeave(&pThis->CritSect);
     828
     829            DWORD dwRc = WaitForSingleObject(hWait, cMsMaxWait);
     830            if (dwRc == WAIT_OBJECT_0)
     831                rc = VINF_SUCCESS;
     832            else if (dwRc == WAIT_TIMEOUT)
     833                rc = VERR_TIMEOUT;
     834            else if (dwRc == WAIT_ABANDONED)
     835                rc = VERR_INVALID_HANDLE;
     836            else
     837                rc = RTErrConvertFromWin32(GetLastError());
     838            if (   RT_FAILURE(rc)
     839                && pThis->u32Magic != RTPIPE_MAGIC)
     840                return rc;
     841
     842            RTCritSectEnter(&pThis->CritSect);
     843            if (fPendingRead)
     844            {
     845                pThis->cModeUsers--;
     846                pThis->fIOPending = false;
     847                if (rc != VINF_SUCCESS)
     848                    CancelIo(pThis->hPipe);
     849                DWORD cbRead = 0;
     850                GetOverlappedResult(pThis->hPipe, &pThis->Overlapped, &cbRead, TRUE /*fWait*/);
     851            }
     852            if (RT_FAILURE(rc))
     853                break;
     854        }
     855    }
     856
     857    RTCritSectLeave(&pThis->CritSect);
     858    return rc;
     859}
     860
  • trunk/src/VBox/Runtime/testcase/tstRTPipe.cpp

    r27129 r27313  
    6161        size_t cbWritten = _1G;
    6262        rc = RTPipeWrite(hPipeW, s_abBuf, sizeof(s_abBuf), &cbWritten);
    63         RTTESTI_CHECK(rc == VINF_SUCCESS || rc == VINF_TRY_AGAIN);
     63        RTTESTI_CHECK_MSG(rc == VINF_SUCCESS || rc == VINF_TRY_AGAIN, ("rc=%Rrc\n", rc));
    6464        if (rc != VINF_SUCCESS)
    6565            break;
Note: See TracChangeset for help on using the changeset viewer.

© 2024 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette