/* $Id: localipc-posix.cpp 58290 2015-10-17 21:52:28Z vboxsync $ */ /** @file * IPRT - Local IPC Server & Client, Posix. */ /* * Copyright (C) 2006-2013 Oracle Corporation * * This file is part of VirtualBox Open Source Edition (OSE), as * available from http://www.virtualbox.org. This file is free software; * you can redistribute it and/or modify it under the terms of the GNU * General Public License (GPL) as published by the Free Software * Foundation, in version 2 as it comes in the "COPYING" file of the * VirtualBox OSE distribution. VirtualBox OSE is distributed in the * hope that it will be useful, but WITHOUT ANY WARRANTY of any kind. * * The contents of this file may alternatively be used under the terms * of the Common Development and Distribution License Version 1.0 * (CDDL) only, as it comes in the "COPYING.CDDL" file of the * VirtualBox OSE distribution, in which case the provisions of the * CDDL are applicable instead of those of the GPL. * * You may elect to license modified versions of this file under the * terms and conditions of either the GPL or the CDDL or both. */ /******************************************************************************* * Header Files * *******************************************************************************/ #define LOG_GROUP RTLOGGROUP_LOCALIPC #include "internal/iprt.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #ifndef RT_OS_OS2 # include # include #endif #include #include #include "internal/magics.h" #include "internal/socket.h" /******************************************************************************* * Structures and Typedefs * *******************************************************************************/ /** * Local IPC service instance, POSIX. */ typedef struct RTLOCALIPCSERVERINT { /** The magic (RTLOCALIPCSERVER_MAGIC). */ uint32_t u32Magic; /** The creation flags. */ uint32_t fFlags; /** Critical section protecting the structure. */ RTCRITSECT CritSect; /** The number of references to the instance. */ uint32_t volatile cRefs; /** Indicates that there is a pending cancel request. */ bool volatile fCancelled; /** The server socket. */ RTSOCKET hSocket; /** Thread currently listening for clients. */ RTTHREAD hListenThread; /** The name we bound the server to (native charset encoding). */ struct sockaddr_un Name; } RTLOCALIPCSERVERINT; /** Pointer to a local IPC server instance (POSIX). */ typedef RTLOCALIPCSERVERINT *PRTLOCALIPCSERVERINT; /** * Local IPC session instance, POSIX. */ typedef struct RTLOCALIPCSESSIONINT { /** The magic (RTLOCALIPCSESSION_MAGIC). */ uint32_t u32Magic; /** Critical section protecting the structure. */ RTCRITSECT CritSect; /** The number of references to the instance. */ uint32_t volatile cRefs; /** Indicates that there is a pending cancel request. */ bool volatile fCancelled; /** Set if this is the server side, clear if the client. */ bool fServerSide; /** The client socket. */ RTSOCKET hSocket; /** Thread currently doing read related activites. */ RTTHREAD hWriteThread; /** Thread currently doing write related activies. */ RTTHREAD hReadThread; } RTLOCALIPCSESSIONINT; /** Pointer to a local IPC session instance (Windows). */ typedef RTLOCALIPCSESSIONINT *PRTLOCALIPCSESSIONINT; /** Local IPC name prefix. */ #define RTLOCALIPC_POSIX_NAME_PREFIX "/tmp/.iprt-localipc-" /** * Validates the user specified name. * * @returns IPRT status code. * @param pszName The name to validate. * @param pcchName Where to return the length. */ static int rtLocalIpcPosixValidateName(const char *pszName, size_t *pcchName) { AssertPtrReturn(pszName, VERR_INVALID_POINTER); uint32_t cchName = 0; for (;;) { char ch = pszName[cchName]; if (!ch) break; AssertReturn(!RT_C_IS_CNTRL(ch), VERR_INVALID_NAME); AssertReturn((unsigned)ch < 0x80, VERR_INVALID_NAME); AssertReturn(ch != '\\', VERR_INVALID_NAME); AssertReturn(ch != '/', VERR_INVALID_NAME); cchName++; } *pcchName = cchName; AssertReturn(sizeof(RTLOCALIPC_POSIX_NAME_PREFIX) + cchName <= RT_SIZEOFMEMB(struct sockaddr_un, sun_path), VERR_FILENAME_TOO_LONG); AssertReturn(cchName, VERR_INVALID_NAME); return VINF_SUCCESS; } /** * Constructs a local (unix) domain socket name. * * @returns IPRT status code. * @param pAddr The address structure to construct the name in. * @param pcbAddr Where to return the address size. * @param pszName The user specified name (valid). * @param cchName The user specified name length. */ static int rtLocalIpcPosixConstructName(struct sockaddr_un *pAddr, uint8_t *pcbAddr, const char *pszName, size_t cchName) { AssertMsgReturn(cchName + sizeof(RTLOCALIPC_POSIX_NAME_PREFIX) <= sizeof(pAddr->sun_path), ("cchName=%zu sizeof(sun_path)=%zu\n", cchName, sizeof(pAddr->sun_path)), VERR_FILENAME_TOO_LONG); /** @todo Bother converting to local codeset/encoding?? */ RT_ZERO(*pAddr); #ifdef RT_OS_OS2 /* Size must be exactly right on OS/2. */ *pcbAddr = sizeof(*pAddr); #else *pcbAddr = RT_OFFSETOF(struct sockaddr_un, sun_path) + (uint8_t)cchName + sizeof(RTLOCALIPC_POSIX_NAME_PREFIX); #endif #ifdef HAVE_SUN_LEN_MEMBER pAddr->sun_len = *pcbAddr; #endif pAddr->sun_family = AF_LOCAL; memcpy(pAddr->sun_path, RTLOCALIPC_POSIX_NAME_PREFIX, sizeof(RTLOCALIPC_POSIX_NAME_PREFIX) - 1); memcpy(&pAddr->sun_path[sizeof(RTLOCALIPC_POSIX_NAME_PREFIX) - 1], pszName, cchName + 1); return VINF_SUCCESS; } RTDECL(int) RTLocalIpcServerCreate(PRTLOCALIPCSERVER phServer, const char *pszName, uint32_t fFlags) { /* * Parameter validation. */ AssertPtrReturn(phServer, VERR_INVALID_POINTER); *phServer = NIL_RTLOCALIPCSERVER; AssertReturn(!(fFlags & ~RTLOCALIPC_FLAGS_VALID_MASK), VERR_INVALID_FLAGS); size_t cchName; int rc = rtLocalIpcPosixValidateName(pszName, &cchName); if (RT_SUCCESS(rc)) { /* * Allocate memory for the instance and initialize it. */ PRTLOCALIPCSERVERINT pThis = (PRTLOCALIPCSERVERINT)RTMemAllocZ(sizeof(*pThis)); if (pThis) { pThis->u32Magic = RTLOCALIPCSERVER_MAGIC; pThis->fFlags = fFlags; pThis->cRefs = 1; pThis->fCancelled = false; pThis->hListenThread = NIL_RTTHREAD; rc = RTCritSectInit(&pThis->CritSect); if (RT_SUCCESS(rc)) { /* * Create the local (unix) socket and bind to it. */ rc = rtSocketCreate(&pThis->hSocket, AF_LOCAL, SOCK_STREAM, 0 /*iProtocol*/); if (RT_SUCCESS(rc)) { RTSocketSetInheritance(pThis->hSocket, false /*fInheritable*/); uint8_t cbAddr; rc = rtLocalIpcPosixConstructName(&pThis->Name, &cbAddr, pszName, cchName); if (RT_SUCCESS(rc)) { rc = rtSocketBindRawAddr(pThis->hSocket, &pThis->Name, cbAddr); if (rc == VERR_NET_ADDRESS_IN_USE) { unlink(pThis->Name.sun_path); rc = rtSocketBindRawAddr(pThis->hSocket, &pThis->Name, cbAddr); } if (RT_SUCCESS(rc)) { LogFlow(("RTLocalIpcServerCreate: Created %p (%s)\n", pThis, pThis->Name.sun_path)); *phServer = pThis; return VINF_SUCCESS; } } RTSocketRelease(pThis->hSocket); } RTCritSectDelete(&pThis->CritSect); } RTMemFree(pThis); } else rc = VERR_NO_MEMORY; } Log(("RTLocalIpcServerCreate: failed, rc=%Rrc\n", rc)); return rc; } /** * Retains a reference to the server instance. * * @returns * @param pThis The server instance. */ DECLINLINE(void) rtLocalIpcServerRetain(PRTLOCALIPCSERVERINT pThis) { uint32_t cRefs = ASMAtomicIncU32(&pThis->cRefs); Assert(cRefs < UINT32_MAX / 2 && cRefs); } /** * Server instance destructor. * * @returns VINF_OBJECT_DESTROYED * @param pThis The server instance. */ static int rtLocalIpcServerDtor(PRTLOCALIPCSERVERINT pThis) { pThis->u32Magic = ~RTLOCALIPCSERVER_MAGIC; if (RTSocketRelease(pThis->hSocket) == 0) Log(("rtLocalIpcServerDtor: Released socket\n")); else Log(("rtLocalIpcServerDtor: Socket still has references (impossible?)\n")); RTCritSectDelete(&pThis->CritSect); unlink(pThis->Name.sun_path); RTMemFree(pThis); return VINF_OBJECT_DESTROYED; } /** * Releases a reference to the server instance. * * @returns VINF_SUCCESS if only release, VINF_OBJECT_DESTROYED if destroyed. * @param pThis The server instance. */ DECLINLINE(int) rtLocalIpcServerRelease(PRTLOCALIPCSERVERINT pThis) { uint32_t cRefs = ASMAtomicDecU32(&pThis->cRefs); Assert(cRefs < UINT32_MAX / 2); if (!cRefs) return rtLocalIpcServerDtor(pThis); return VINF_SUCCESS; } /** * The core of RTLocalIpcServerCancel, used by both the destroy and cancel APIs. * * @returns IPRT status code * @param pThis The server instance. */ static int rtLocalIpcServerCancel(PRTLOCALIPCSERVERINT pThis) { RTCritSectEnter(&pThis->CritSect); pThis->fCancelled = true; Log(("rtLocalIpcServerCancel:\n")); if (pThis->hListenThread != NIL_RTTHREAD) RTThreadPoke(pThis->hListenThread); RTCritSectLeave(&pThis->CritSect); return VINF_SUCCESS; } RTDECL(int) RTLocalIpcServerDestroy(RTLOCALIPCSERVER hServer) { /* * Validate input. */ if (hServer == NIL_RTLOCALIPCSERVER) return VINF_SUCCESS; PRTLOCALIPCSERVERINT pThis = (PRTLOCALIPCSERVERINT)hServer; AssertPtrReturn(pThis, VERR_INVALID_HANDLE); AssertReturn(pThis->u32Magic == RTLOCALIPCSERVER_MAGIC, VERR_INVALID_HANDLE); /* * Invalidate the server, releasing the caller's reference to the instance * data and making sure any other thread in the listen API will wake up. */ AssertReturn(ASMAtomicCmpXchgU32(&pThis->u32Magic, ~RTLOCALIPCSERVER_MAGIC, RTLOCALIPCSERVER_MAGIC), VERR_WRONG_ORDER); rtLocalIpcServerCancel(pThis); return rtLocalIpcServerRelease(pThis); } RTDECL(int) RTLocalIpcServerCancel(RTLOCALIPCSERVER hServer) { /* * Validate input. */ PRTLOCALIPCSERVERINT pThis = (PRTLOCALIPCSERVERINT)hServer; AssertPtrReturn(pThis, VERR_INVALID_HANDLE); AssertReturn(pThis->u32Magic == RTLOCALIPCSERVER_MAGIC, VERR_INVALID_HANDLE); /* * Do the job. */ rtLocalIpcServerRetain(pThis); rtLocalIpcServerCancel(pThis); rtLocalIpcServerRelease(pThis); return VINF_SUCCESS; } RTDECL(int) RTLocalIpcServerListen(RTLOCALIPCSERVER hServer, PRTLOCALIPCSESSION phClientSession) { /* * Validate input. */ PRTLOCALIPCSERVERINT pThis = (PRTLOCALIPCSERVERINT)hServer; AssertPtrReturn(pThis, VERR_INVALID_HANDLE); AssertReturn(pThis->u32Magic == RTLOCALIPCSERVER_MAGIC, VERR_INVALID_HANDLE); /* * Begin listening. */ rtLocalIpcServerRetain(pThis); int rc = RTCritSectEnter(&pThis->CritSect); if (RT_SUCCESS(rc)) { if (pThis->hListenThread == NIL_RTTHREAD) { pThis->hListenThread = RTThreadSelf(); /* * The listening retry loop. */ for (;;) { if (pThis->fCancelled) { rc = VERR_CANCELLED; break; } rc = RTCritSectLeave(&pThis->CritSect); AssertRCBreak(rc); rc = rtSocketListen(pThis->hSocket, pThis->fFlags & RTLOCALIPC_FLAGS_MULTI_SESSION ? 10 : 0); if (RT_SUCCESS(rc)) { struct sockaddr_un Addr; size_t cbAddr = sizeof(Addr); RTSOCKET hClient; Log(("RTLocalIpcServerListen: Calling rtSocketAccept...\n")); rc = rtSocketAccept(pThis->hSocket, &hClient, (struct sockaddr *)&Addr, &cbAddr); Log(("RTLocalIpcServerListen: rtSocketAccept returns %Rrc.\n", rc)); int rc2 = RTCritSectEnter(&pThis->CritSect); AssertRCBreakStmt(rc2, rc = RT_SUCCESS(rc) ? rc2 : rc); if (RT_SUCCESS(rc)) { /* * Create a client session. */ PRTLOCALIPCSESSIONINT pSession = (PRTLOCALIPCSESSIONINT)RTMemAllocZ(sizeof(*pSession)); if (pSession) { pSession->u32Magic = RTLOCALIPCSESSION_MAGIC; pSession->cRefs = 1; pSession->fCancelled = false; pSession->fServerSide = true; pSession->hSocket = hClient; pSession->hReadThread = NIL_RTTHREAD; pSession->hWriteThread = NIL_RTTHREAD; rc = RTCritSectInit(&pSession->CritSect); if (RT_SUCCESS(rc)) { Log(("RTLocalIpcServerListen: Returning new client session: %p\n", pSession)); *phClientSession = pSession; break; } RTMemFree(pSession); } else rc = VERR_NO_MEMORY; } else if ( rc != VERR_INTERRUPTED && rc != VERR_TRY_AGAIN) break; } else { int rc2 = RTCritSectEnter(&pThis->CritSect); AssertRCBreakStmt(rc2, rc = RT_SUCCESS(rc) ? rc2 : rc); if ( rc != VERR_INTERRUPTED && rc != VERR_TRY_AGAIN) break; } } pThis->hListenThread = NIL_RTTHREAD; } else { AssertFailed(); rc = VERR_RESOURCE_BUSY; } int rc2 = RTCritSectLeave(&pThis->CritSect); AssertStmt(RT_SUCCESS(rc2), rc = RT_SUCCESS(rc) ? rc2 : rc); } rtLocalIpcServerRelease(pThis); Log(("RTLocalIpcServerListen: returns %Rrc\n", rc)); return rc; } RTDECL(int) RTLocalIpcSessionConnect(PRTLOCALIPCSESSION phSession, const char *pszName, uint32_t fFlags) { /* * Parameter validation. */ AssertPtrReturn(phSession, VERR_INVALID_POINTER); *phSession = NIL_RTLOCALIPCSESSION; AssertReturn(!fFlags, VERR_INVALID_FLAGS); size_t cchName; int rc = rtLocalIpcPosixValidateName(pszName, &cchName); if (RT_SUCCESS(rc)) { /* * Allocate memory for the instance and initialize it. */ PRTLOCALIPCSESSIONINT pThis = (PRTLOCALIPCSESSIONINT)RTMemAllocZ(sizeof(*pThis)); if (pThis) { pThis->u32Magic = RTLOCALIPCSESSION_MAGIC; pThis->cRefs = 1; pThis->fCancelled = false; pThis->fServerSide = false; pThis->hSocket = NIL_RTSOCKET; pThis->hReadThread = NIL_RTTHREAD; pThis->hWriteThread = NIL_RTTHREAD; rc = RTCritSectInit(&pThis->CritSect); if (RT_SUCCESS(rc)) { /* * Create the local (unix) socket and try connect to the server. */ rc = rtSocketCreate(&pThis->hSocket, AF_LOCAL, SOCK_STREAM, 0 /*iProtocol*/); if (RT_SUCCESS(rc)) { RTSocketSetInheritance(pThis->hSocket, false /*fInheritable*/); struct sockaddr_un Addr; uint8_t cbAddr; rc = rtLocalIpcPosixConstructName(&Addr, &cbAddr, pszName, cchName); if (RT_SUCCESS(rc)) { rc = rtSocketConnectRaw(pThis->hSocket, &Addr, cbAddr); if (RT_SUCCESS(rc)) { *phSession = pThis; Log(("RTLocalIpcSessionConnect: Returns new session %p\n", pThis)); return VINF_SUCCESS; } } RTCritSectDelete(&pThis->CritSect); } } RTMemFree(pThis); } else rc = VERR_NO_MEMORY; } Log(("RTLocalIpcSessionConnect: returns %Rrc\n", rc)); return rc; } /** * Retains a reference to the session instance. * * @param pThis The server instance. */ DECLINLINE(void) rtLocalIpcSessionRetain(PRTLOCALIPCSESSIONINT pThis) { uint32_t cRefs = ASMAtomicIncU32(&pThis->cRefs); Assert(cRefs < UINT32_MAX / 2 && cRefs); } /** * Session instance destructor. * * @returns VINF_OBJECT_DESTROYED * @param pThis The server instance. */ static int rtLocalIpcSessionDtor(PRTLOCALIPCSESSIONINT pThis) { pThis->u32Magic = ~RTLOCALIPCSESSION_MAGIC; if (RTSocketRelease(pThis->hSocket) == 0) Log(("rtLocalIpcSessionDtor: Released socket\n")); else Log(("rtLocalIpcSessionDtor: Socket still has references (impossible?)\n")); RTCritSectDelete(&pThis->CritSect); RTMemFree(pThis); return VINF_OBJECT_DESTROYED; } /** * Releases a reference to the session instance. * * @returns VINF_SUCCESS or VINF_OBJECT_DESTROYED as appropriate. * @param pThis The session instance. */ DECLINLINE(int) rtLocalIpcSessionRelease(PRTLOCALIPCSESSIONINT pThis) { uint32_t cRefs = ASMAtomicDecU32(&pThis->cRefs); Assert(cRefs < UINT32_MAX / 2); if (!cRefs) return rtLocalIpcSessionDtor(pThis); Log(("rtLocalIpcSessionRelease: %u refs left\n", cRefs)); return VINF_SUCCESS; } /** * The core of RTLocalIpcSessionCancel, used by both the destroy and cancel APIs. * * @returns IPRT status code * @param pThis The session instance. */ static int rtLocalIpcSessionCancel(PRTLOCALIPCSESSIONINT pThis) { RTCritSectEnter(&pThis->CritSect); pThis->fCancelled = true; Log(("rtLocalIpcSessionCancel:\n")); if (pThis->hReadThread != NIL_RTTHREAD) RTThreadPoke(pThis->hReadThread); if (pThis->hWriteThread != NIL_RTTHREAD) RTThreadPoke(pThis->hWriteThread); RTCritSectLeave(&pThis->CritSect); return VINF_SUCCESS; } RTDECL(int) RTLocalIpcSessionClose(RTLOCALIPCSESSION hSession) { /* * Validate input. */ if (hSession == NIL_RTLOCALIPCSESSION) return VINF_SUCCESS; PRTLOCALIPCSESSIONINT pThis = hSession; AssertPtrReturn(pThis, VERR_INVALID_HANDLE); AssertReturn(pThis->u32Magic == RTLOCALIPCSESSION_MAGIC, VERR_INVALID_HANDLE); /* * Invalidate the session, releasing the caller's reference to the instance * data and making sure any other thread in the listen API will wake up. */ AssertReturn(ASMAtomicCmpXchgU32(&pThis->u32Magic, ~RTLOCALIPCSESSION_MAGIC, RTLOCALIPCSESSION_MAGIC), VERR_WRONG_ORDER); Log(("RTLocalIpcSessionClose:\n")); rtLocalIpcSessionCancel(pThis); return rtLocalIpcSessionRelease(pThis); } RTDECL(int) RTLocalIpcSessionCancel(RTLOCALIPCSESSION hSession) { /* * Validate input. */ PRTLOCALIPCSESSIONINT pThis = hSession; AssertPtrReturn(pThis, VERR_INVALID_HANDLE); AssertReturn(pThis->u32Magic == RTLOCALIPCSESSION_MAGIC, VERR_INVALID_HANDLE); /* * Do the job. */ rtLocalIpcSessionRetain(pThis); rtLocalIpcSessionCancel(pThis); rtLocalIpcSessionRelease(pThis); return VINF_SUCCESS; } #if 0 /* maybe later */ /** * Checks if the socket has has a HUP condition. * * @returns true if HUP, false if no. * @param pThis The IPC session handle. */ static bool rtLocalIpcPosixHasHup(PRTLOCALIPCSESSIONINT pThis) { # ifndef RT_OS_OS2 struct pollfd PollFd; RT_ZERO(PollFd); PollFd.fd = RTSocketToNative(pThis->hSocket); PollFd.events = POLLHUP; return poll(&PollFd, 1, 0) >= 1 && (PollFd.revents & POLLHUP); # else /* RT_OS_OS2: */ return false; # endif } #endif RTDECL(int) RTLocalIpcSessionRead(RTLOCALIPCSESSION hSession, void *pvBuffer, size_t cbBuffer, size_t *pcbRead) { /* * Validate input. */ PRTLOCALIPCSESSIONINT pThis = hSession; AssertPtrReturn(pThis, VERR_INVALID_HANDLE); AssertReturn(pThis->u32Magic == RTLOCALIPCSESSION_MAGIC, VERR_INVALID_HANDLE); /* * Do the job. */ rtLocalIpcSessionRetain(pThis); int rc = RTCritSectEnter(&pThis->CritSect); if (RT_SUCCESS(rc)) { if (pThis->hReadThread == NIL_RTTHREAD) { pThis->hReadThread = RTThreadSelf(); for (;;) { if (!pThis->fCancelled) { rc = RTCritSectLeave(&pThis->CritSect); AssertRCBreak(rc); rc = RTSocketRead(pThis->hSocket, pvBuffer, cbBuffer, pcbRead); int rc2 = RTCritSectEnter(&pThis->CritSect); AssertRCBreakStmt(rc2, rc = RT_SUCCESS(rc) ? rc2 : rc); if ( rc == VERR_INTERRUPTED || rc == VERR_TRY_AGAIN) continue; } else rc = VERR_CANCELLED; break; } pThis->hReadThread = NIL_RTTHREAD; } int rc2 = RTCritSectLeave(&pThis->CritSect); AssertStmt(RT_SUCCESS(rc2), rc = RT_SUCCESS(rc) ? rc2 : rc); } rtLocalIpcSessionRelease(pThis); return rc; } RTDECL(int) RTLocalIpcSessionWrite(RTLOCALIPCSESSION hSession, const void *pvBuffer, size_t cbBuffer) { /* * Validate input. */ PRTLOCALIPCSESSIONINT pThis = hSession; AssertPtrReturn(pThis, VERR_INVALID_HANDLE); AssertReturn(pThis->u32Magic == RTLOCALIPCSESSION_MAGIC, VERR_INVALID_HANDLE); /* * Do the job. */ rtLocalIpcSessionRetain(pThis); int rc = RTCritSectEnter(&pThis->CritSect); if (RT_SUCCESS(rc)) { if (pThis->hWriteThread == NIL_RTTHREAD) { pThis->hWriteThread = RTThreadSelf(); for (;;) { if (!pThis->fCancelled) { rc = RTCritSectLeave(&pThis->CritSect); AssertRCBreak(rc); rc = RTSocketWrite(pThis->hSocket, pvBuffer, cbBuffer); int rc2 = RTCritSectEnter(&pThis->CritSect); AssertRCBreakStmt(rc2, rc = RT_SUCCESS(rc) ? rc2 : rc); if ( rc == VERR_INTERRUPTED || rc == VERR_TRY_AGAIN) continue; } else rc = VERR_CANCELLED; break; } pThis->hWriteThread = NIL_RTTHREAD; } int rc2 = RTCritSectLeave(&pThis->CritSect); AssertStmt(RT_SUCCESS(rc2), rc = RT_SUCCESS(rc) ? rc2 : rc); } rtLocalIpcSessionRelease(pThis); return rc; } RTDECL(int) RTLocalIpcSessionFlush(RTLOCALIPCSESSION hSession) { /* * Validate input. */ PRTLOCALIPCSESSIONINT pThis = hSession; AssertPtrReturn(pThis, VERR_INVALID_HANDLE); AssertReturn(pThis->u32Magic == RTLOCALIPCSESSION_MAGIC, VERR_INVALID_HANDLE); /* * This is a no-op because apparently write doesn't return until the * result is read. At least that's what the reply to a 2003-04-08 LKML * posting title "fsync() on unix domain sockets?" indicates. * * For conformity, make sure there isn't any active writes concurrent to this call. */ rtLocalIpcSessionRetain(pThis); int rc = RTCritSectEnter(&pThis->CritSect); if (RT_SUCCESS(rc)) { if (pThis->hWriteThread == NIL_RTTHREAD) rc = RTCritSectLeave(&pThis->CritSect); else { rc = RTCritSectLeave(&pThis->CritSect); if (RT_SUCCESS(rc)) rc = VERR_RESOURCE_BUSY; } } rtLocalIpcSessionRelease(pThis); return rc; } RTDECL(int) RTLocalIpcSessionWaitForData(RTLOCALIPCSESSION hSession, uint32_t cMillies) { /* * Validate input. */ PRTLOCALIPCSESSIONINT pThis = hSession; AssertPtrReturn(pThis, VERR_INVALID_HANDLE); AssertReturn(pThis->u32Magic == RTLOCALIPCSESSION_MAGIC, VERR_INVALID_HANDLE); /* * Do the job. */ rtLocalIpcSessionRetain(pThis); int rc = RTCritSectEnter(&pThis->CritSect); if (RT_SUCCESS(rc)) { if (pThis->hReadThread == NIL_RTTHREAD) { pThis->hReadThread = RTThreadSelf(); for (;;) { if (!pThis->fCancelled) { rc = RTCritSectLeave(&pThis->CritSect); AssertRCBreak(rc); uint32_t fEvents = 0; #ifdef RT_OS_OS2 /* This doesn't give us any error condition on hangup. */ Log(("RTLocalIpcSessionWaitForData: Calling RTSocketSelectOneEx...\n")); rc = RTSocketSelectOneEx(pThis->hSocket, RTPOLL_EVT_READ | RTPOLL_EVT_ERROR, &fEvents, cMillies); Log(("RTLocalIpcSessionWaitForData: RTSocketSelectOneEx returns %Rrc, fEvents=%#x\n", rc, fEvents)); #else /** @todo RTSocketPoll */ /* POLLHUP will be set on hangup. */ struct pollfd PollFd; RT_ZERO(PollFd); PollFd.fd = RTSocketToNative(pThis->hSocket); PollFd.events = POLLHUP | POLLERR | POLLIN; Log(("RTLocalIpcSessionWaitForData: Calling poll...\n")); int cFds = poll(&PollFd, 1, cMillies == RT_INDEFINITE_WAIT ? -1 : cMillies); if (cFds >= 1) { fEvents = PollFd.revents & (POLLHUP | POLLERR) ? RTPOLL_EVT_ERROR : RTPOLL_EVT_READ; rc = VINF_SUCCESS; } else if (rc == 0) rc = VERR_TIMEOUT; else rc = RTErrConvertFromErrno(errno); Log(("RTLocalIpcSessionWaitForData: poll returns %u (rc=%%d), revents=%#x\n", cFds, rc, PollFd.revents)); #endif int rc2 = RTCritSectEnter(&pThis->CritSect); AssertRCBreakStmt(rc2, rc = RT_SUCCESS(rc) ? rc2 : rc); if (RT_SUCCESS(rc)) { if (pThis->fCancelled) rc = VERR_CANCELLED; else if (fEvents & RTPOLL_EVT_ERROR) rc = VERR_BROKEN_PIPE; } else if ( rc == VERR_INTERRUPTED || rc == VERR_TRY_AGAIN) continue; } else rc = VERR_CANCELLED; break; } pThis->hReadThread = NIL_RTTHREAD; } int rc2 = RTCritSectLeave(&pThis->CritSect); AssertStmt(RT_SUCCESS(rc2), rc = RT_SUCCESS(rc) ? rc2 : rc); } rtLocalIpcSessionRelease(pThis); return rc; } RTDECL(int) RTLocalIpcSessionQueryProcess(RTLOCALIPCSESSION hSession, PRTPROCESS pProcess) { return VERR_NOT_SUPPORTED; } RTDECL(int) RTLocalIpcSessionQueryUserId(RTLOCALIPCSESSION hSession, PRTUID pUid) { return VERR_NOT_SUPPORTED; } RTDECL(int) RTLocalIpcSessionQueryGroupId(RTLOCALIPCSESSION hSession, PRTGID pGid) { return VERR_NOT_SUPPORTED; }