VirtualBox

Changeset 44462 in vbox


Ignore:
Timestamp:
Jan 30, 2013 12:44:14 PM (12 years ago)
Author:
vboxsync
svn:sync-xref-src-repo-rev:
83468
Message:

pipe-os2.cpp: Basic implementation passes the test case.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/src/VBox/Runtime/r3/os2/pipe-os2.cpp

    r39690 r44462  
    55
    66/*
    7  * Copyright (C) 2010 Oracle Corporation
     7 * Copyright (C) 2010-2013 Oracle Corporation
    88 *
    99 * This file is part of VirtualBox Open Source Edition (OSE), as
     
    2929*   Header Files                                                               *
    3030*******************************************************************************/
     31#define INCL_ERRORS
     32#define INCL_DOSSEMAPHORES
     33#include <os2.h>
     34
    3135#include <iprt/pipe.h>
    3236#include "internal/iprt.h"
    3337
     38#include <iprt/asm.h>
    3439#include <iprt/assert.h>
     40#include <iprt/critsect.h>
    3541#include <iprt/err.h>
     42#include <iprt/mem.h>
     43#include <iprt/string.h>
     44#include <iprt/poll.h>
     45#include <iprt/process.h>
     46#include <iprt/thread.h>
     47#include <iprt/time.h>
     48#include "internal/pipe.h"
     49#include "internal/magics.h"
     50
     51
     52/*******************************************************************************
     53*   Defined Constants And Macros                                               *
     54*******************************************************************************/
     55/** The pipe buffer size we prefer. */
     56#define RTPIPE_OS2_SIZE     _32K
     57
     58
     59/*******************************************************************************
     60*   Structures and Typedefs                                                    *
     61*******************************************************************************/
     62typedef struct RTPIPEINTERNAL
     63{
     64    /** Magic value (RTPIPE_MAGIC). */
     65    uint32_t            u32Magic;
     66    /** The pipe handle. */
     67    HPIPE               hPipe;
     68    /** Set if this is the read end, clear if it's the write end. */
     69    bool                fRead;
     70    /** Whether the pipe is in blocking or non-blocking mode. */
     71    bool                fBlocking;
     72    /** Set if the pipe is broken. */
     73    bool                fBrokenPipe;
     74    /** Usage counter. */
     75    uint32_t            cUsers;
     76
     77    /** The event semaphore associated with the pipe. */
     78    HEV                 hev;
     79    /** The handle of the poll set currently polling on this pipe.
     80     *  We can only have one poller at the time (lazy bird). */
     81    RTPOLLSET           hPollSet;
     82    /** Critical section protecting the above members.
     83     * (Taking the lazy/simple approach.) */
     84    RTCRITSECT          CritSect;
     85
     86} RTPIPEINTERNAL;
     87
     88
     89/**
     90 * Ensures that the pipe has a semaphore associated with it.
     91 *
     92 * @returns VBox status code.
     93 * @param   pThis               The pipe.
     94 */
     95static int rtPipeOs2EnsureSem(RTPIPEINTERNAL *pThis)
     96{
     97    if (pThis->hev != NULLHANDLE)
     98        return VINF_SUCCESS;
     99
     100    HEV     hev;
     101    APIRET  orc = DosCreateEventSem(NULL, &hev, DC_SEM_SHARED, FALSE);
     102    if (orc == NO_ERROR)
     103    {
     104        orc = DosSetNPipeSem(pThis->hPipe, (HSEM)hev, 1);
     105        if (orc == NO_ERROR)
     106        {
     107            pThis->hev = hev;
     108            return VINF_SUCCESS;
     109        }
     110
     111        DosCloseEventSem(hev);
     112    }
     113    return RTErrConvertFromOS2(orc);
     114}
    36115
    37116
    38117RTDECL(int)  RTPipeCreate(PRTPIPE phPipeRead, PRTPIPE phPipeWrite, uint32_t fFlags)
    39118{
    40     return VERR_NOT_IMPLEMENTED;
     119    AssertPtrReturn(phPipeRead, VERR_INVALID_POINTER);
     120    AssertPtrReturn(phPipeWrite, VERR_INVALID_POINTER);
     121    AssertReturn(!(fFlags & ~RTPIPE_C_VALID_MASK), VERR_INVALID_PARAMETER);
     122
     123    /*
     124     * Try create and connect a pipe pair.
     125     */
     126    APIRET  orc;
     127    HPIPE   hPipeR;
     128    HFILE   hPipeW;
     129    int     rc;
     130    for (;;)
     131    {
     132        static volatile uint32_t    g_iNextPipe = 0;
     133        char                        szName[128];
     134        RTStrPrintf(szName, sizeof(szName), "\\pipe\\iprt-pipe-%u-%u", RTProcSelf(), ASMAtomicIncU32(&g_iNextPipe));
     135
     136        /*
     137         * Create the read end of the pipe.
     138         */
     139        ULONG fPipeMode = 1 /*instance*/ | NP_TYPE_BYTE | NP_READMODE_BYTE | NP_NOWAIT;
     140        ULONG fOpenMode = NP_ACCESS_DUPLEX | NP_WRITEBEHIND;
     141        if (fFlags & RTPIPE_C_INHERIT_READ)
     142            fOpenMode |= NP_INHERIT;
     143        else
     144            fOpenMode |= NP_NOINHERIT;
     145        orc = DosCreateNPipe((PSZ)szName, &hPipeR, fOpenMode, fPipeMode, RTPIPE_OS2_SIZE, RTPIPE_OS2_SIZE, NP_DEFAULT_WAIT);
     146        if (orc == NO_ERROR)
     147        {
     148            orc = DosConnectNPipe(hPipeR);
     149            if (orc == ERROR_PIPE_NOT_CONNECTED || orc == NO_ERROR)
     150            {
     151                /*
     152                 * Connect to the pipe (the write end), attach sem below.
     153                 */
     154                ULONG ulAction = 0;
     155                ULONG fOpenW   = OPEN_ACTION_FAIL_IF_NEW | OPEN_ACTION_OPEN_IF_EXISTS;
     156                ULONG fModeW   = OPEN_ACCESS_WRITEONLY | OPEN_SHARE_DENYNONE | OPEN_FLAGS_FAIL_ON_ERROR;
     157                if (!(fFlags & RTPIPE_C_INHERIT_WRITE))
     158                    fModeW |= OPEN_FLAGS_NOINHERIT;
     159                orc = DosOpen((PSZ)szName, &hPipeW, &ulAction, 0 /*cbFile*/, FILE_NORMAL,
     160                              fOpenW, fModeW, NULL /*peaop2*/);
     161                if (orc == NO_ERROR)
     162                    break;
     163            }
     164            DosClose(hPipeR);
     165        }
     166        if (   orc != ERROR_PIPE_BUSY     /* already exist - compatible */
     167            && orc != ERROR_ACCESS_DENIED /* already exist - incompatible (?) */)
     168            return RTErrConvertFromOS2(orc);
     169        /* else: try again with a new name */
     170    }
     171
     172    /*
     173     * Create the two handles.
     174     */
     175    RTPIPEINTERNAL *pThisR = (RTPIPEINTERNAL *)RTMemAllocZ(sizeof(RTPIPEINTERNAL));
     176    if (pThisR)
     177    {
     178        RTPIPEINTERNAL *pThisW = (RTPIPEINTERNAL *)RTMemAllocZ(sizeof(RTPIPEINTERNAL));
     179        if (pThisW)
     180        {
     181            /* Crit sects. */
     182            rc = RTCritSectInit(&pThisR->CritSect);
     183            if (RT_SUCCESS(rc))
     184            {
     185                rc = RTCritSectInit(&pThisW->CritSect);
     186                if (RT_SUCCESS(rc))
     187                {
     188                    /* Initialize the structures. */
     189                    pThisR->u32Magic        = RTPIPE_MAGIC;
     190                    pThisW->u32Magic        = RTPIPE_MAGIC;
     191                    pThisR->hPipe           = hPipeR;
     192                    pThisW->hPipe           = hPipeW;
     193                    pThisR->hev             = NULLHANDLE;
     194                    pThisW->hev             = NULLHANDLE;
     195                    pThisR->fRead           = true;
     196                    pThisW->fRead           = false;
     197                    pThisR->fBlocking       = false;
     198                    pThisW->fBlocking       = true;
     199                    //pThisR->fBrokenPipe     = false;
     200                    //pThisW->fBrokenPipe     = false;
     201                    //pThisR->cUsers          = 0;
     202                    //pThisW->cUsers          = 0;
     203                    pThisR->hPollSet        = NIL_RTPOLLSET;
     204                    pThisW->hPollSet        = NIL_RTPOLLSET;
     205
     206                    *phPipeRead  = pThisR;
     207                    *phPipeWrite = pThisW;
     208                    return VINF_SUCCESS;
     209                }
     210
     211                RTCritSectDelete(&pThisR->CritSect);
     212            }
     213            RTMemFree(pThisW);
     214        }
     215        else
     216            rc = VERR_NO_MEMORY;
     217        RTMemFree(pThisR);
     218    }
     219    else
     220        rc = VERR_NO_MEMORY;
     221
     222    /* Don't call DosDisConnectNPipe! */
     223    DosClose(hPipeW);
     224    DosClose(hPipeR);
     225    return rc;
    41226}
    42227
     
    44229RTDECL(int)  RTPipeClose(RTPIPE hPipe)
    45230{
    46     return VERR_NOT_IMPLEMENTED;
     231    RTPIPEINTERNAL *pThis = hPipe;
     232    if (pThis == NIL_RTPIPE)
     233        return VINF_SUCCESS;
     234    AssertPtrReturn(pThis, VERR_INVALID_PARAMETER);
     235    AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE);
     236
     237    /*
     238     * Do the cleanup.
     239     */
     240    AssertReturn(ASMAtomicCmpXchgU32(&pThis->u32Magic, ~RTPIPE_MAGIC, RTPIPE_MAGIC), VERR_INVALID_HANDLE);
     241    RTCritSectEnter(&pThis->CritSect);
     242    Assert(pThis->cUsers == 0);
     243
     244    /* Don't call DosDisConnectNPipe! */
     245    DosClose(pThis->hPipe);
     246    pThis->hPipe = (HPIPE)-1;
     247
     248    if (pThis->hev != NULLHANDLE)
     249    {
     250        DosCloseEventSem(pThis->hev);
     251        pThis->hev = NULLHANDLE;
     252    }
     253
     254    RTCritSectLeave(&pThis->CritSect);
     255    RTCritSectDelete(&pThis->CritSect);
     256
     257    RTMemFree(pThis);
     258
     259    return VINF_SUCCESS;
    47260}
    48261
     
    50263RTDECL(int)  RTPipeFromNative(PRTPIPE phPipe, RTHCINTPTR hNativePipe, uint32_t fFlags)
    51264{
    52     return VERR_NOT_IMPLEMENTED;
    53 }
    54 
     265    AssertPtrReturn(phPipe, VERR_INVALID_POINTER);
     266    AssertReturn(!(fFlags & ~RTPIPE_N_VALID_MASK), VERR_INVALID_PARAMETER);
     267    AssertReturn(!!(fFlags & RTPIPE_N_READ) != !!(fFlags & RTPIPE_N_WRITE), VERR_INVALID_PARAMETER);
     268
     269    /*
     270     * Get and validate the pipe handle info.
     271     */
     272    HPIPE hNative = (HPIPE)hNativePipe;
     273    ULONG ulType  = 0;
     274    ULONG ulAttr  = 0;
     275    APIRET orc = DosQueryHType(hNative, &ulType, &ulAttr);
     276    AssertMsgReturn(orc == NO_ERROR, ("%d\n", orc), RTErrConvertFromOS2(orc));
     277    AssertReturn((ulType & 0x7) == HANDTYPE_PIPE, VERR_INVALID_HANDLE);
     278
     279#if 0
     280    union
     281    {
     282        PIPEINFO    PipeInfo;
     283        uint8_t     abPadding[sizeof(PIPEINFO) + 127];
     284    } Buf;
     285    orc = DosQueryNPipeInfo(hNative, 1, &Buf, sizeof(Buf));
     286    if (orc != NO_ERROR)
     287    {
     288        /* Sorry, anonymous pips are not supported. */
     289        AssertMsgFailed(("%d\n", orc));
     290        return VERR_INVALID_HANDLE;
     291    }
     292    AssertReturn(Buf.PipeInfo.cbMaxInst == 1, VERR_INVALID_HANDLE);
     293#endif
     294
     295    ULONG fPipeState = 0;
     296    orc = DosQueryNPHState(hNative, &fPipeState);
     297    if (orc != NO_ERROR)
     298    {
     299        /* Sorry, anonymous pips are not supported. */
     300        AssertMsgFailed(("%d\n", orc));
     301        return VERR_INVALID_HANDLE;
     302    }
     303    AssertReturn(!(fPipeState & NP_TYPE_MESSAGE), VERR_INVALID_HANDLE);
     304    AssertReturn(!(fPipeState & NP_READMODE_MESSAGE), VERR_INVALID_HANDLE);
     305    AssertReturn((fPipeState & 0xff) == 1, VERR_INVALID_HANDLE);
     306
     307    ULONG fFileState = 0;
     308    orc = DosQueryFHState(hNative, &fFileState);
     309    AssertMsgReturn(orc == NO_ERROR, ("%d\n", orc), VERR_INVALID_HANDLE);
     310    AssertMsgReturn(   (fFileState & 0x3) == (fFlags & RTPIPE_N_READ ? OPEN_ACCESS_READONLY : OPEN_ACCESS_WRITEONLY)
     311                    || (fFileState & 0x3) == OPEN_ACCESS_READWRITE
     312                    , ("%#x\n", fFileState), VERR_INVALID_HANDLE);
     313
     314    /*
     315     * Looks kind of OK. Fix the inherit flag.
     316     */
     317    orc = DosSetFHState(hNative,   (fFileState & (OPEN_FLAGS_WRITE_THROUGH | OPEN_FLAGS_FAIL_ON_ERROR | OPEN_FLAGS_NO_CACHE))
     318                                 | (fFlags & RTPIPE_N_INHERIT ? 0 : OPEN_FLAGS_NOINHERIT));
     319    AssertMsgReturn(orc == NO_ERROR, ("%d\n", orc), RTErrConvertFromOS2(orc));
     320
     321
     322    /*
     323     * Create a handle so we can try rtPipeQueryInfo on it
     324     * and see if we need to duplicate it to make that call work.
     325     */
     326    RTPIPEINTERNAL *pThis = (RTPIPEINTERNAL *)RTMemAllocZ(sizeof(RTPIPEINTERNAL));
     327    if (!pThis)
     328        return VERR_NO_MEMORY;
     329    int rc = RTCritSectInit(&pThis->CritSect);
     330    if (RT_SUCCESS(rc))
     331    {
     332        pThis->u32Magic        = RTPIPE_MAGIC;
     333        pThis->hPipe           = hNative;
     334        pThis->hev             = NULLHANDLE;
     335        pThis->fRead           = !!(fFlags & RTPIPE_N_READ);
     336        pThis->fBlocking       = !(fPipeState & NP_NOWAIT);
     337        //pThis->fBrokenPipe     = false;
     338        //pThis->cUsers          = 0;
     339        pThis->hPollSet        = NIL_RTPOLLSET;
     340
     341        *phPipe = pThis;
     342        return VINF_SUCCESS;
     343
     344        //RTCritSectDelete(&pThis->CritSect);
     345    }
     346    RTMemFree(pThis);
     347    return rc;
     348}
    55349
    56350RTDECL(RTHCINTPTR) RTPipeToNative(RTPIPE hPipe)
    57351{
    58     return -1;
     352    RTPIPEINTERNAL *pThis = hPipe;
     353    AssertPtrReturn(pThis, -1);
     354    AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, -1);
     355
     356    return (RTHCINTPTR)pThis->hPipe;
     357}
     358
     359/**
     360 * Prepare blocking mode.
     361 *
     362 * @returns IPRT status code.
     363 * @retval  VERR_WRONG_ORDER if simultaneous non-blocking and blocking access is
     364 *          attempted.
     365 *
     366 * @param   pThis               The pipe handle.
     367 *
     368 * @remarks Caller owns the critical section.
     369 */
     370static int rtPipeTryBlocking(RTPIPEINTERNAL *pThis)
     371{
     372    if (!pThis->fBlocking)
     373    {
     374        if (pThis->cUsers != 0)
     375            return VERR_WRONG_ORDER;
     376
     377        APIRET orc = DosSetNPHState(pThis->hPipe, NP_WAIT | NP_READMODE_BYTE);
     378        if (orc != NO_ERROR)
     379        {
     380            if (orc != ERROR_BROKEN_PIPE && orc != ERROR_PIPE_NOT_CONNECTED)
     381                return RTErrConvertFromOS2(orc);
     382            pThis->fBrokenPipe = true;
     383        }
     384        pThis->fBlocking = true;
     385    }
     386
     387    pThis->cUsers++;
     388    return VINF_SUCCESS;
     389}
     390
     391
     392/**
     393 * Prepare non-blocking mode.
     394 *
     395 * @returns IPRT status code.
     396 * @retval  VERR_WRONG_ORDER if simultaneous non-blocking and blocking access is
     397 *          attempted.
     398 *
     399 * @param   pThis               The pipe handle.
     400 */
     401static int rtPipeTryNonBlocking(RTPIPEINTERNAL *pThis)
     402{
     403    if (pThis->fBlocking)
     404    {
     405        if (pThis->cUsers != 0)
     406            return VERR_WRONG_ORDER;
     407
     408        APIRET orc = DosSetNPHState(pThis->hPipe, NP_NOWAIT | NP_READMODE_BYTE);
     409        if (orc != NO_ERROR)
     410        {
     411            if (orc != ERROR_BROKEN_PIPE && orc != ERROR_PIPE_NOT_CONNECTED)
     412                return RTErrConvertFromOS2(orc);
     413            pThis->fBrokenPipe = true;
     414        }
     415        pThis->fBlocking = false;
     416    }
     417
     418    pThis->cUsers++;
     419    return VINF_SUCCESS;
     420}
     421
     422
     423/**
     424 * Checks if the read pipe has been broken.
     425 *
     426 * @returns true if broken, false if no.
     427 * @param   pThis               The pipe handle (read).
     428 */
     429static bool rtPipeOs2IsBroken(RTPIPEINTERNAL *pThis)
     430{
     431    Assert(pThis->fRead);
     432
     433#if 0
     434    /*
     435     * Query it via the semaphore. Not sure how fast this is...
     436     */
     437    PIPESEMSTATE aStates[3]; RT_ZERO(aStates);
     438    APIRET orc = DosQueryNPipeSemState(pThis->hev, &aStates[0], sizeof(aStates));
     439    if (orc == NO_ERROR)
     440    {
     441        if (aStates[0].fStatus == NPSS_CLOSE)
     442            return true;
     443        if (aStates[0].fStatus == NPSS_RDATA)
     444            return false;
     445    }
     446    AssertMsgFailed(("%d / %d\n", orc, aStates[0].fStatus));
     447
     448    /*
     449     * Fall back / alternative method.
     450     */
     451#endif
     452    ULONG       cbActual = 0;
     453    ULONG       ulState  = 0;
     454    AVAILDATA   Avail    = { 0, 0 };
     455    APIRET orc = DosPeekNPipe(pThis->hPipe, NULL, 0, &cbActual, &Avail, &ulState);
     456    if (orc != NO_ERROR)
     457    {
     458        if (orc != ERROR_PIPE_BUSY)
     459            AssertMsgFailed(("%d\n", orc));
     460        return false;
     461    }
     462
     463    return ulState != NP_STATE_CONNECTED;
    59464}
    60465
     
    62467RTDECL(int) RTPipeRead(RTPIPE hPipe, void *pvBuf, size_t cbToRead, size_t *pcbRead)
    63468{
    64     return VERR_NOT_IMPLEMENTED;
     469    RTPIPEINTERNAL *pThis = hPipe;
     470    AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
     471    AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE);
     472    AssertReturn(pThis->fRead, VERR_ACCESS_DENIED);
     473    AssertPtr(pcbRead);
     474    AssertPtr(pvBuf);
     475
     476    int rc = RTCritSectEnter(&pThis->CritSect);
     477    if (RT_SUCCESS(rc))
     478    {
     479        rc = rtPipeTryNonBlocking(pThis);
     480        if (RT_SUCCESS(rc))
     481        {
     482            RTCritSectLeave(&pThis->CritSect);
     483
     484            ULONG cbActual = 0;
     485            APIRET orc = DosRead(pThis->hPipe, pvBuf, cbToRead, &cbActual);
     486            if (orc == NO_ERROR)
     487            {
     488                if (cbActual || !cbToRead || !rtPipeOs2IsBroken(pThis))
     489                    *pcbRead = cbActual;
     490                else
     491                    rc = VERR_BROKEN_PIPE;
     492            }
     493            else if (orc == ERROR_NO_DATA)
     494            {
     495                *pcbRead = 0;
     496                rc = VINF_TRY_AGAIN;
     497            }
     498            else
     499                rc = RTErrConvertFromOS2(orc);
     500
     501            RTCritSectEnter(&pThis->CritSect);
     502            if (rc == VERR_BROKEN_PIPE)
     503                pThis->fBrokenPipe = true;
     504            pThis->cUsers--;
     505        }
     506        else
     507            rc = VERR_WRONG_ORDER;
     508        RTCritSectLeave(&pThis->CritSect);
     509    }
     510    return rc;
    65511}
    66512
     
    68514RTDECL(int) RTPipeReadBlocking(RTPIPE hPipe, void *pvBuf, size_t cbToRead, size_t *pcbRead)
    69515{
    70     return VERR_NOT_IMPLEMENTED;
     516    RTPIPEINTERNAL *pThis = hPipe;
     517    AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
     518    AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE);
     519    AssertReturn(pThis->fRead, VERR_ACCESS_DENIED);
     520    AssertPtr(pvBuf);
     521
     522    int rc = RTCritSectEnter(&pThis->CritSect);
     523    if (RT_SUCCESS(rc))
     524    {
     525        rc = rtPipeTryBlocking(pThis);
     526        if (RT_SUCCESS(rc))
     527        {
     528            RTCritSectLeave(&pThis->CritSect);
     529
     530            size_t cbTotalRead = 0;
     531            while (cbToRead > 0)
     532            {
     533                ULONG  cbActual = 0;
     534                APIRET orc = DosRead(pThis->hPipe, pvBuf, cbToRead, &cbActual);
     535                if (orc != NO_ERROR)
     536                {
     537                    rc = RTErrConvertFromOS2(orc);
     538                    break;
     539                }
     540                if (!cbActual && rtPipeOs2IsBroken(pThis))
     541                {
     542                    rc = VERR_BROKEN_PIPE;
     543                    break;
     544                }
     545
     546                /* advance */
     547                pvBuf        = (char *)pvBuf + cbActual;
     548                cbTotalRead += cbActual;
     549                cbToRead    -= cbActual;
     550            }
     551
     552            if (pcbRead)
     553            {
     554                *pcbRead = cbTotalRead;
     555                if (   RT_FAILURE(rc)
     556                    && cbTotalRead)
     557                    rc = VINF_SUCCESS;
     558            }
     559
     560            RTCritSectEnter(&pThis->CritSect);
     561            if (rc == VERR_BROKEN_PIPE)
     562                pThis->fBrokenPipe = true;
     563            pThis->cUsers--;
     564        }
     565        else
     566            rc = VERR_WRONG_ORDER;
     567        RTCritSectLeave(&pThis->CritSect);
     568    }
     569    return rc;
     570}
     571
     572
     573/**
     574 * Gets the available write buffer size of the pipe.
     575 *
     576 * @returns Number of bytes, 1 on failure.
     577 * @param   pThis               The pipe handle.
     578 */
     579static ULONG rtPipeOs2GetSpace(RTPIPEINTERNAL *pThis)
     580{
     581    Assert(!pThis->fRead);
     582
     583#if 0 /* Not sure which is more efficient, neither are really optimal, I fear. */
     584    /*
     585     * Query via semaphore state.
     586     * This will walk the list of active named pipes...
     587     */
     588    /** @todo Check how hev and hpipe are associated, if complicated, use the
     589     *        alternative method below. */
     590    PIPESEMSTATE aStates[3]; RT_ZERO(aStates);
     591    APIRET orc = DosQueryNPipeSemState((HSEM)pThis->hev, &aStates[0], sizeof(aStates));
     592    if (orc == NO_ERROR)
     593    {
     594        if (aStates[0].fStatus == NPSS_WSPACE)
     595            return aStates[0].usAvail;
     596        if (aStates[1].fStatus == NPSS_WSPACE)
     597            return aStates[1].usAvail;
     598        return 0;
     599    }
     600    AssertMsgFailed(("%d / %d\n", orc, aStates[0].fStatus));
     601
     602#else
     603    /*
     604     * Query via the pipe info.
     605     * This will have to lookup and store the pipe name.
     606     */
     607    union
     608    {
     609        PIPEINFO    PipeInfo;
     610        uint8_t     abPadding[sizeof(PIPEINFO) + 127];
     611    } Buf;
     612    APIRET orc = DosQueryNPipeInfo(pThis->hPipe, 1, &Buf, sizeof(Buf));
     613    if (orc == NO_ERROR)
     614        return Buf.PipeInfo.cbOut;
     615    AssertMsgFailed(("%d\n", orc));
     616#endif
     617
     618    return 1;
    71619}
    72620
     
    74622RTDECL(int) RTPipeWrite(RTPIPE hPipe, const void *pvBuf, size_t cbToWrite, size_t *pcbWritten)
    75623{
    76     return VERR_NOT_IMPLEMENTED;
     624    RTPIPEINTERNAL *pThis = hPipe;
     625    AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
     626    AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE);
     627    AssertReturn(!pThis->fRead, VERR_ACCESS_DENIED);
     628    AssertPtr(pcbWritten);
     629    AssertPtr(pvBuf);
     630
     631    int rc = RTCritSectEnter(&pThis->CritSect);
     632    if (RT_SUCCESS(rc))
     633    {
     634        rc = rtPipeTryNonBlocking(pThis);
     635        if (RT_SUCCESS(rc))
     636        {
     637            if (cbToWrite > 0)
     638            {
     639                ULONG  cbActual = 0;
     640                APIRET orc = DosWrite(pThis->hPipe, pvBuf, cbToWrite, &cbActual);
     641                if (orc == NO_ERROR && cbActual == 0)
     642                {
     643                    /* Retry with the request adjusted to the available buffer space. */
     644                    ULONG cbAvail = rtPipeOs2GetSpace(pThis);
     645                    orc = DosWrite(pThis->hPipe, pvBuf, RT_MIN(cbAvail, cbToWrite), &cbActual);
     646                }
     647
     648                if (orc == NO_ERROR)
     649                {
     650                    *pcbWritten = cbActual;
     651                    if (cbActual == 0)
     652                        rc = VINF_TRY_AGAIN;
     653                }
     654                else
     655                {
     656                    rc = RTErrConvertFromOS2(orc);
     657                    if (rc == VERR_PIPE_NOT_CONNECTED)
     658                        rc = VERR_BROKEN_PIPE;
     659                }
     660            }
     661            else
     662                *pcbWritten = 0;
     663
     664            if (rc == VERR_BROKEN_PIPE)
     665                pThis->fBrokenPipe = true;
     666            pThis->cUsers--;
     667        }
     668        else
     669            rc = VERR_WRONG_ORDER;
     670        RTCritSectLeave(&pThis->CritSect);
     671    }
     672    return rc;
    77673}
    78674
     
    80676RTDECL(int) RTPipeWriteBlocking(RTPIPE hPipe, const void *pvBuf, size_t cbToWrite, size_t *pcbWritten)
    81677{
    82     return VERR_NOT_IMPLEMENTED;
     678    RTPIPEINTERNAL *pThis = hPipe;
     679    AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
     680    AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE);
     681    AssertReturn(!pThis->fRead, VERR_ACCESS_DENIED);
     682    AssertPtr(pvBuf);
     683    AssertPtrNull(pcbWritten);
     684
     685    int rc = RTCritSectEnter(&pThis->CritSect);
     686    if (RT_SUCCESS(rc))
     687    {
     688        rc = rtPipeTryBlocking(pThis);
     689        if (RT_SUCCESS(rc))
     690        {
     691            RTCritSectLeave(&pThis->CritSect);
     692
     693            size_t cbTotalWritten = 0;
     694            while (cbToWrite > 0)
     695            {
     696                ULONG cbActual = 0;
     697                APIRET orc = DosWrite(pThis->hPipe, pvBuf, cbToWrite, &cbActual);
     698                if (orc != NO_ERROR)
     699                {
     700                    rc = RTErrConvertFromOS2(orc);
     701                    if (rc == VERR_PIPE_NOT_CONNECTED)
     702                        rc = VERR_BROKEN_PIPE;
     703                    break;
     704                }
     705                pvBuf           = (char const *)pvBuf + cbActual;
     706                cbToWrite      -= cbActual;
     707                cbTotalWritten += cbActual;
     708            }
     709
     710            if (pcbWritten)
     711            {
     712                *pcbWritten = cbTotalWritten;
     713                if (   RT_FAILURE(rc)
     714                    && cbTotalWritten)
     715                    rc = VINF_SUCCESS;
     716            }
     717
     718            RTCritSectEnter(&pThis->CritSect);
     719            if (rc == VERR_BROKEN_PIPE)
     720                pThis->fBrokenPipe = true;
     721            pThis->cUsers--;
     722        }
     723        else
     724            rc = VERR_WRONG_ORDER;
     725        RTCritSectLeave(&pThis->CritSect);
     726    }
     727    return rc;
    83728}
    84729
     
    86731RTDECL(int) RTPipeFlush(RTPIPE hPipe)
    87732{
    88     return VERR_NOT_IMPLEMENTED;
     733    RTPIPEINTERNAL *pThis = hPipe;
     734    AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
     735    AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE);
     736    AssertReturn(!pThis->fRead, VERR_ACCESS_DENIED);
     737
     738    APIRET orc = DosResetBuffer(pThis->hPipe);
     739    if (orc != NO_ERROR)
     740    {
     741        int rc = RTErrConvertFromOS2(orc);
     742        if (rc == VERR_BROKEN_PIPE)
     743        {
     744            RTCritSectEnter(&pThis->CritSect);
     745            pThis->fBrokenPipe = true;
     746            RTCritSectLeave(&pThis->CritSect);
     747        }
     748        return rc;
     749    }
     750    return VINF_SUCCESS;
    89751}
    90752
     
    92754RTDECL(int) RTPipeSelectOne(RTPIPE hPipe, RTMSINTERVAL cMillies)
    93755{
    94     return VERR_NOT_IMPLEMENTED;
     756    RTPIPEINTERNAL *pThis = hPipe;
     757    AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
     758    AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE);
     759
     760    uint64_t const StartMsTS = RTTimeMilliTS();
     761
     762    int rc = RTCritSectEnter(&pThis->CritSect);
     763    if (RT_FAILURE(rc))
     764        return rc;
     765    rc = rtPipeOs2EnsureSem(pThis);
     766    if (RT_SUCCESS(rc))
     767    {
     768        for (unsigned iLoop = 0;; iLoop++)
     769        {
     770            /*
     771             * Check the handle state.
     772             */
     773            PIPESEMSTATE aStates[4]; RT_ZERO(aStates);
     774            APIRET orc = DosQueryNPipeSemState((HSEM)pThis->hev, &aStates[0], sizeof(aStates));
     775            if (orc != NO_ERROR)
     776            {
     777                rc = RTErrConvertFromOS2(orc);
     778                break;
     779            }
     780            int i = 0;
     781            if (pThis->fRead)
     782                while (aStates[i].fStatus == NPSS_WSPACE)
     783                    i++;
     784            else
     785                while (aStates[i].fStatus == NPSS_RDATA)
     786                    i++;
     787            if (aStates[i].fStatus == NPSS_CLOSE)
     788                break;
     789            Assert(aStates[i].fStatus == NPSS_WSPACE || aStates[i].fStatus == NPSS_RDATA || aStates[i].fStatus == NPSS_EOI);
     790            if (   aStates[i].fStatus != NPSS_EOI
     791                && aStates[0].usAvail > 0)
     792                break;
     793
     794            /*
     795             * Check for timeout.
     796             */
     797            ULONG cMsMaxWait = SEM_INDEFINITE_WAIT;
     798            if (cMillies != RT_INDEFINITE_WAIT)
     799            {
     800                uint64_t cElapsed = RTTimeMilliTS() - StartMsTS;
     801                if (cElapsed >= cMillies)
     802                {
     803                    rc = VERR_TIMEOUT;
     804                    break;
     805                }
     806                cMsMaxWait = cMillies - (uint32_t)cElapsed;
     807            }
     808
     809            /*
     810             * Wait.
     811             */
     812            RTCritSectLeave(&pThis->CritSect);
     813            orc = DosWaitEventSem(pThis->hev, cMsMaxWait);
     814            RTCritSectEnter(&pThis->CritSect);
     815            if (orc != NO_ERROR && orc != ERROR_TIMEOUT && orc != ERROR_SEM_TIMEOUT )
     816            {
     817                rc = RTErrConvertFromOS2(orc);
     818                break;
     819            }
     820        }
     821
     822        if (rc == VERR_BROKEN_PIPE)
     823            pThis->fBrokenPipe = true;
     824    }
     825
     826    RTCritSectLeave(&pThis->CritSect);
     827    return rc;
    95828}
    96829
     
    98831RTDECL(int) RTPipeQueryReadable(RTPIPE hPipe, size_t *pcbReadable)
    99832{
    100     return VERR_NOT_IMPLEMENTED;
    101 }
    102 
     833    RTPIPEINTERNAL *pThis = hPipe;
     834    AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
     835    AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE);
     836    AssertReturn(pThis->fRead, VERR_PIPE_NOT_READ);
     837    AssertPtrReturn(pcbReadable, VERR_INVALID_POINTER);
     838
     839    int rc = RTCritSectEnter(&pThis->CritSect);
     840    if (RT_FAILURE(rc))
     841        return rc;
     842
     843    ULONG       cbActual = 0;
     844    ULONG       ulState  = 0;
     845    AVAILDATA   Avail    = { 0, 0 };
     846    APIRET orc = DosPeekNPipe(pThis->hPipe, NULL, 0, &cbActual, &Avail, &ulState);
     847    if (orc == NO_ERROR)
     848    {
     849        if (Avail.cbpipe > 0 || ulState == NP_STATE_CONNECTED)
     850            *pcbReadable = Avail.cbpipe;
     851        else
     852            rc = VERR_PIPE_NOT_CONNECTED; /*??*/
     853    }
     854    else
     855        rc = RTErrConvertFromOS2(orc);
     856
     857    RTCritSectLeave(&pThis->CritSect);
     858    return rc;
     859}
     860
     861
     862#if 0
     863/**
     864 * Internal RTPollSetAdd helper that returns the handle that should be added to
     865 * the pollset.
     866 *
     867 * @returns Valid handle on success, INVALID_HANDLE_VALUE on failure.
     868 * @param   hPipe               The pipe handle.
     869 * @param   fEvents             The events we're polling for.
     870 * @param   ph                  where to put the primary handle.
     871 */
     872int rtPipePollGetHandle(RTPIPE hPipe, uint32_t fEvents, PHANDLE ph)
     873{
     874    RTPIPEINTERNAL *pThis = hPipe;
     875    AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
     876    AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE);
     877
     878    AssertReturn(!(fEvents & RTPOLL_EVT_READ) || pThis->fRead, VERR_INVALID_PARAMETER);
     879    AssertReturn(!(fEvents & RTPOLL_EVT_WRITE) || !pThis->fRead, VERR_INVALID_PARAMETER);
     880
     881    /* Later: Try register an event handle with the pipe like on OS/2, there is
     882       a file control for doing this obviously intended for the OS/2 subsys.
     883       The question is whether this still exists on Vista and W7. */
     884    *ph = pThis->Overlapped.hEvent;
     885    return VINF_SUCCESS;
     886}
     887
     888
     889/**
     890 * Checks for pending events.
     891 *
     892 * @returns Event mask or 0.
     893 * @param   pThis               The pipe handle.
     894 * @param   fEvents             The desired events.
     895 */
     896static uint32_t rtPipePollCheck(RTPIPEINTERNAL *pThis, uint32_t fEvents)
     897{
     898    uint32_t fRetEvents = 0;
     899    if (pThis->fBrokenPipe)
     900        fRetEvents |= RTPOLL_EVT_ERROR;
     901    else if (pThis->fRead)
     902    {
     903        if (!pThis->fIOPending)
     904        {
     905            DWORD cbAvailable;
     906            if (PeekNamedPipe(pThis->hPipe, NULL, 0, NULL, &cbAvailable, NULL))
     907            {
     908                if (   (fEvents & RTPOLL_EVT_READ)
     909                    && cbAvailable > 0)
     910                    fRetEvents |= RTPOLL_EVT_READ;
     911            }
     912            else
     913            {
     914                if (GetLastError() == ERROR_BROKEN_PIPE)
     915                    pThis->fBrokenPipe = true;
     916                fRetEvents |= RTPOLL_EVT_ERROR;
     917            }
     918        }
     919    }
     920    else
     921    {
     922        if (pThis->fIOPending)
     923        {
     924            rtPipeWriteCheckCompletion(pThis);
     925            if (pThis->fBrokenPipe)
     926                fRetEvents |= RTPOLL_EVT_ERROR;
     927        }
     928        if (   !pThis->fIOPending
     929            && !fRetEvents)
     930        {
     931            FILE_PIPE_LOCAL_INFORMATION Info;
     932            if (rtPipeQueryInfo(pThis, &Info))
     933            {
     934                /* Check for broken pipe. */
     935                if (Info.NamedPipeState == FILE_PIPE_CLOSING_STATE)
     936                {
     937                    fRetEvents = RTPOLL_EVT_ERROR;
     938                    pThis->fBrokenPipe = true;
     939                }
     940
     941                /* Check if there is available buffer space. */
     942                if (   !fRetEvents
     943                    && (fEvents & RTPOLL_EVT_WRITE)
     944                    && (   Info.WriteQuotaAvailable > 0
     945                        || Info.OutboundQuota == 0)
     946                    )
     947                    fRetEvents |= RTPOLL_EVT_WRITE;
     948            }
     949            else if (fEvents & RTPOLL_EVT_WRITE)
     950                fRetEvents |= RTPOLL_EVT_WRITE;
     951        }
     952    }
     953
     954    return fRetEvents;
     955}
     956
     957
     958/**
     959 * Internal RTPoll helper that polls the pipe handle and, if @a fNoWait is
     960 * clear, starts whatever actions we've got running during the poll call.
     961 *
     962 * @returns 0 if no pending events, actions initiated if @a fNoWait is clear.
     963 *          Event mask (in @a fEvents) and no actions if the handle is ready
     964 *          already.
     965 *          UINT32_MAX (asserted) if the pipe handle is busy in I/O or a
     966 *          different poll set.
     967 *
     968 * @param   hPipe               The pipe handle.
     969 * @param   hPollSet            The poll set handle (for access checks).
     970 * @param   fEvents             The events we're polling for.
     971 * @param   fFinalEntry         Set if this is the final entry for this handle
     972 *                              in this poll set.  This can be used for dealing
     973 *                              with duplicate entries.
     974 * @param   fNoWait             Set if it's a zero-wait poll call.  Clear if
     975 *                              we'll wait for an event to occur.
     976 */
     977uint32_t rtPipePollStart(RTPIPE hPipe, RTPOLLSET hPollSet, uint32_t fEvents, bool fFinalEntry, bool fNoWait)
     978{
     979    /** @todo All this polling code could be optimized to make fewer system
     980     *        calls; like for instance the ResetEvent calls. */
     981    RTPIPEINTERNAL *pThis = hPipe;
     982    AssertPtrReturn(pThis, UINT32_MAX);
     983    AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, UINT32_MAX);
     984
     985    int rc = RTCritSectEnter(&pThis->CritSect);
     986    AssertRCReturn(rc, UINT32_MAX);
     987
     988    /* Check that this is the only current use of this pipe. */
     989    uint32_t fRetEvents;
     990    if (   pThis->cUsers   == 0
     991        || pThis->hPollSet == hPollSet)
     992    {
     993        /* Check what the current events are. */
     994        fRetEvents = rtPipePollCheck(pThis, fEvents);
     995        if (   !fRetEvents
     996            && !fNoWait)
     997        {
     998            /* Make sure the event semaphore has been reset. */
     999            if (!pThis->fIOPending)
     1000            {
     1001                rc = ResetEvent(pThis->Overlapped.hEvent);
     1002                Assert(rc == TRUE);
     1003            }
     1004
     1005            /* Kick off the zero byte read thing if applicable. */
     1006            if (   !pThis->fIOPending
     1007                && pThis->fRead
     1008                && (fEvents & RTPOLL_EVT_READ)
     1009               )
     1010            {
     1011                DWORD cbRead = 0;
     1012                if (ReadFile(pThis->hPipe, pThis->abBuf, 0, &cbRead, &pThis->Overlapped))
     1013                    fRetEvents = rtPipePollCheck(pThis, fEvents);
     1014                else if (GetLastError() == ERROR_IO_PENDING)
     1015                {
     1016                    pThis->fIOPending    = true;
     1017                    pThis->fZeroByteRead = true;
     1018                }
     1019                else
     1020                    fRetEvents = RTPOLL_EVT_ERROR;
     1021            }
     1022
     1023            /* If we're still set for the waiting, record the poll set and
     1024               mark the pipe used. */
     1025            if (!fRetEvents)
     1026            {
     1027                pThis->cUsers++;
     1028                pThis->hPollSet = hPollSet;
     1029            }
     1030        }
     1031    }
     1032    else
     1033    {
     1034        AssertFailed();
     1035        fRetEvents = UINT32_MAX;
     1036    }
     1037
     1038    RTCritSectLeave(&pThis->CritSect);
     1039    return fRetEvents;
     1040}
     1041
     1042
     1043/**
     1044 * Called after a WaitForMultipleObjects returned in order to check for pending
     1045 * events and stop whatever actions that rtPipePollStart() initiated.
     1046 *
     1047 * @returns Event mask or 0.
     1048 *
     1049 * @param   hPipe               The pipe handle.
     1050 * @param   fEvents             The events we're polling for.
     1051 * @param   fFinalEntry         Set if this is the final entry for this handle
     1052 *                              in this poll set.  This can be used for dealing
     1053 *                              with duplicate entries.  Only keep in mind that
     1054 *                              this method is called in reverse order, so the
     1055 *                              first call will have this set (when the entire
     1056 *                              set was processed).
     1057 * @param   fHarvestEvents      Set if we should check for pending events.
     1058 */
     1059uint32_t rtPipePollDone(RTPIPE hPipe, uint32_t fEvents, bool fFinalEntry, bool fHarvestEvents)
     1060{
     1061    RTPIPEINTERNAL *pThis = hPipe;
     1062    AssertPtrReturn(pThis, 0);
     1063    AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, 0);
     1064
     1065    int rc = RTCritSectEnter(&pThis->CritSect);
     1066    AssertRCReturn(rc, 0);
     1067
     1068    Assert(pThis->cUsers > 0);
     1069
     1070
     1071    /* Cancel the zero byte read. */
     1072    uint32_t fRetEvents = 0;
     1073    if (pThis->fZeroByteRead)
     1074    {
     1075        CancelIo(pThis->hPipe);
     1076        DWORD cbRead = 0;
     1077        if (   !GetOverlappedResult(pThis->hPipe, &pThis->Overlapped, &cbRead, TRUE /*fWait*/)
     1078            && GetLastError() != ERROR_OPERATION_ABORTED)
     1079            fRetEvents = RTPOLL_EVT_ERROR;
     1080
     1081        pThis->fIOPending    = false;
     1082        pThis->fZeroByteRead = false;
     1083    }
     1084
     1085    /* harvest events. */
     1086    fRetEvents |= rtPipePollCheck(pThis, fEvents);
     1087
     1088    /* update counters. */
     1089    pThis->cUsers--;
     1090    if (!pThis->cUsers)
     1091        pThis->hPollSet = NIL_RTPOLLSET;
     1092
     1093    RTCritSectLeave(&pThis->CritSect);
     1094    return fRetEvents;
     1095}
     1096#endif
Note: See TracChangeset for help on using the changeset viewer.

© 2025 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette