/* $Id: pipe-win.cpp 28800 2010-04-27 08:22:32Z vboxsync $ */ /** @file * IPRT - Anonymous Pipes, Windows Implementation. */ /* * Copyright (C) 2010 Oracle Corporation * * This file is part of VirtualBox Open Source Edition (OSE), as * available from http://www.virtualbox.org. This file is free software; * you can redistribute it and/or modify it under the terms of the GNU * General Public License (GPL) as published by the Free Software * Foundation, in version 2 as it comes in the "COPYING" file of the * VirtualBox OSE distribution. VirtualBox OSE is distributed in the * hope that it will be useful, but WITHOUT ANY WARRANTY of any kind. * * The contents of this file may alternatively be used under the terms * of the Common Development and Distribution License Version 1.0 * (CDDL) only, as it comes in the "COPYING.CDDL" file of the * VirtualBox OSE distribution, in which case the provisions of the * CDDL are applicable instead of those of the GPL. * * You may elect to license modified versions of this file under the * terms and conditions of either the GPL or the CDDL or both. */ /******************************************************************************* * Header Files * *******************************************************************************/ #include #include #include "internal/iprt.h" #include #include #include #include #include #include #include #include #include #include #include "internal/pipe.h" #include "internal/magics.h" /******************************************************************************* * Defined Constants And Macros * *******************************************************************************/ /** The pipe buffer size we prefere. */ #define RTPIPE_NT_SIZE _64K /******************************************************************************* * Structures and Typedefs * *******************************************************************************/ typedef struct RTPIPEINTERNAL { /** Magic value (RTPIPE_MAGIC). */ uint32_t u32Magic; /** The pipe handle. */ HANDLE hPipe; /** Set if this is the read end, clear if it's the write end. */ bool fRead; /** Set if there is already pending I/O. */ bool fIOPending; /** Set if the zero byte read that the poll code using is pending. */ bool fZeroByteRead; /** Set if the pipe is broken. */ bool fBrokenPipe; /** Set if we've promised that the handle is writable. */ bool fPromisedWritable; /** Usage counter. */ uint32_t cUsers; /** The overlapped I/O structure we use. */ OVERLAPPED Overlapped; /** Bounce buffer for writes. */ uint8_t *pbBounceBuf; /** Amount of used buffer space. */ size_t cbBounceBufUsed; /** Amount of allocated buffer space. */ size_t cbBounceBufAlloc; /** The handle of the poll set currently polling on this pipe. * We can only have one poller at the time (lazy bird). */ RTPOLLSET hPollSet; /** Critical section protecting the above members. * (Taking the lazy/simple approach.) */ RTCRITSECT CritSect; /** Buffer for the zero byte read. */ uint8_t abBuf[8]; } RTPIPEINTERNAL; /* from ntdef.h */ typedef LONG NTSTATUS; /* from ntddk.h */ typedef struct _IO_STATUS_BLOCK { union { NTSTATUS Status; PVOID Pointer; }; ULONG_PTR Information; } IO_STATUS_BLOCK, *PIO_STATUS_BLOCK; typedef enum _FILE_INFORMATION_CLASS { FilePipeInformation = 23, FilePipeLocalInformation = 24, FilePipeRemoteInformation = 25, } FILE_INFORMATION_CLASS, *PFILE_INFORMATION_CLASS; /* from ntifs.h */ typedef struct _FILE_PIPE_LOCAL_INFORMATION { ULONG NamedPipeType; ULONG NamedPipeConfiguration; ULONG MaximumInstances; ULONG CurrentInstances; ULONG InboundQuota; ULONG ReadDataAvailable; ULONG OutboundQuota; ULONG WriteQuotaAvailable; ULONG NamedPipeState; ULONG NamedPipeEnd; } FILE_PIPE_LOCAL_INFORMATION, *PFILE_PIPE_LOCAL_INFORMATION; #define FILE_PIPE_DISCONNECTED_STATE 0x00000001 #define FILE_PIPE_LISTENING_STATE 0x00000002 #define FILE_PIPE_CONNECTED_STATE 0x00000003 #define FILE_PIPE_CLOSING_STATE 0x00000004 #define FILE_PIPE_INBOUND 0x00000000 #define FILE_PIPE_OUTBOUND 0x00000001 #define FILE_PIPE_FULL_DUPLEX 0x00000002 #define FILE_PIPE_CLIENT_END 0x00000000 #define FILE_PIPE_SERVER_END 0x00000001 extern "C" NTSYSAPI NTSTATUS WINAPI NtQueryInformationFile(HANDLE, PIO_STATUS_BLOCK, PVOID, LONG, FILE_INFORMATION_CLASS); /** * Wrapper for getting FILE_PIPE_LOCAL_INFORMATION via the NT API. * * @returns Success inidicator (true/false). * @param pThis The pipe. * @param pInfo The info structure. */ static bool rtPipeQueryInfo(RTPIPEINTERNAL *pThis, FILE_PIPE_LOCAL_INFORMATION *pInfo) { IO_STATUS_BLOCK Ios; RT_ZERO(Ios); RT_ZERO(*pInfo); NTSTATUS rcNt = NtQueryInformationFile(pThis->hPipe, &Ios, pInfo, sizeof(*pInfo), FilePipeLocalInformation); return rcNt >= 0; } RTDECL(int) RTPipeCreate(PRTPIPE phPipeRead, PRTPIPE phPipeWrite, uint32_t fFlags) { AssertPtrReturn(phPipeRead, VERR_INVALID_POINTER); AssertPtrReturn(phPipeWrite, VERR_INVALID_POINTER); AssertReturn(!(fFlags & ~RTPIPE_C_VALID_MASK), VERR_INVALID_PARAMETER); /* * Create the read end of the pipe. */ DWORD dwErr; HANDLE hPipeR; HANDLE hPipeW; int rc; for (;;) { static volatile uint32_t g_iNextPipe = 0; char szName[128]; RTStrPrintf(szName, sizeof(szName), "\\\\.\\pipe\\iprt-pipe-%u-%u", RTProcSelf(), ASMAtomicIncU32(&g_iNextPipe)); SECURITY_ATTRIBUTES SecurityAttributes; PSECURITY_ATTRIBUTES pSecurityAttributes = NULL; if (fFlags & RTPIPE_C_INHERIT_READ) { SecurityAttributes.nLength = sizeof(SecurityAttributes); SecurityAttributes.lpSecurityDescriptor = NULL; SecurityAttributes.bInheritHandle = TRUE; pSecurityAttributes = &SecurityAttributes; } DWORD dwOpenMode = PIPE_ACCESS_INBOUND | FILE_FLAG_OVERLAPPED; #ifdef FILE_FLAG_FIRST_PIPE_INSTANCE dwOpenMode |= FILE_FLAG_FIRST_PIPE_INSTANCE; #endif DWORD dwPipeMode = PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT; #ifdef PIPE_REJECT_REMOTE_CLIENTS dwPipeMode |= PIPE_REJECT_REMOTE_CLIENTS; #endif hPipeR = CreateNamedPipeA(szName, dwOpenMode, dwPipeMode, 1 /*nMaxInstances*/, RTPIPE_NT_SIZE, RTPIPE_NT_SIZE, NMPWAIT_USE_DEFAULT_WAIT, pSecurityAttributes); #ifdef PIPE_REJECT_REMOTE_CLIENTS if (hPipeR == INVALID_HANDLE_VALUE && GetLastError() == ERROR_INVALID_PARAMETER) { dwPipeMode &= ~PIPE_REJECT_REMOTE_CLIENTS; hPipeR = CreateNamedPipeA(szName, dwOpenMode, dwPipeMode, 1 /*nMaxInstances*/, RTPIPE_NT_SIZE, RTPIPE_NT_SIZE, NMPWAIT_USE_DEFAULT_WAIT, pSecurityAttributes); } #endif #ifdef FILE_FLAG_FIRST_PIPE_INSTANCE if (hPipeR == INVALID_HANDLE_VALUE && GetLastError() == ERROR_INVALID_PARAMETER) { dwOpenMode &= ~FILE_FLAG_FIRST_PIPE_INSTANCE; hPipeR = CreateNamedPipeA(szName, dwOpenMode, dwPipeMode, 1 /*nMaxInstances*/, RTPIPE_NT_SIZE, RTPIPE_NT_SIZE, NMPWAIT_USE_DEFAULT_WAIT, pSecurityAttributes); } #endif if (hPipeR != INVALID_HANDLE_VALUE) { /* * Connect to the pipe (the write end). * We add FILE_READ_ATTRIBUTES here to make sure we can query the * pipe state later on. */ pSecurityAttributes = NULL; if (fFlags & RTPIPE_C_INHERIT_WRITE) { SecurityAttributes.nLength = sizeof(SecurityAttributes); SecurityAttributes.lpSecurityDescriptor = NULL; SecurityAttributes.bInheritHandle = TRUE; pSecurityAttributes = &SecurityAttributes; } hPipeW = CreateFileA(szName, GENERIC_WRITE | FILE_READ_ATTRIBUTES /*dwDesiredAccess*/, 0 /*dwShareMode*/, pSecurityAttributes, OPEN_EXISTING /* dwCreationDisposition */, FILE_FLAG_OVERLAPPED /*dwFlagsAndAttributes*/, NULL /*hTemplateFile*/); if (hPipeW != INVALID_HANDLE_VALUE) break; dwErr = GetLastError(); CloseHandle(hPipeR); } else dwErr = GetLastError(); if ( dwErr != ERROR_PIPE_BUSY /* already exist - compatible */ && dwErr != ERROR_ACCESS_DENIED /* already exist - incompatible */) return RTErrConvertFromWin32(dwErr); /* else: try again with a new name */ } /* * Create the two handles. */ RTPIPEINTERNAL *pThisR = (RTPIPEINTERNAL *)RTMemAllocZ(sizeof(RTPIPEINTERNAL)); if (pThisR) { RTPIPEINTERNAL *pThisW = (RTPIPEINTERNAL *)RTMemAllocZ(sizeof(RTPIPEINTERNAL)); if (pThisW) { rc = RTCritSectInit(&pThisR->CritSect); if (RT_SUCCESS(rc)) { rc = RTCritSectInit(&pThisW->CritSect); if (RT_SUCCESS(rc)) { pThisR->Overlapped.hEvent = CreateEvent(NULL, TRUE /*fManualReset*/, TRUE /*fInitialState*/, NULL /*pName*/); if (pThisR->Overlapped.hEvent != NULL) { pThisW->Overlapped.hEvent = CreateEvent(NULL, TRUE /*fManualReset*/, TRUE /*fInitialState*/, NULL /*pName*/); if (pThisW->Overlapped.hEvent != NULL) { pThisR->u32Magic = RTPIPE_MAGIC; pThisW->u32Magic = RTPIPE_MAGIC; pThisR->hPipe = hPipeR; pThisW->hPipe = hPipeW; pThisR->fRead = true; pThisW->fRead = false; //pThisR->fIOPending = false; //pThisW->fIOPending = false; //pThisR->fZeroByteRead = false; //pThisW->fZeroByteRead = false; //pThisR->fBrokenPipe = false; //pThisW->fBrokenPipe = false; //pThisW->fPromisedWritable= false; //pThisR->fPromisedWritable= false; //pThisR->cUsers = 0; //pThisW->cUsers = 0; //pThisR->pbBounceBuf = NULL; //pThisW->pbBounceBuf = NULL; //pThisR->cbBounceBufUsed = 0; //pThisW->cbBounceBufUsed = 0; //pThisR->cbBounceBufAlloc= 0; //pThisW->cbBounceBufAlloc= 0; pThisR->hPollSet = NIL_RTPOLLSET; pThisW->hPollSet = NIL_RTPOLLSET; *phPipeRead = pThisR; *phPipeWrite = pThisW; return VINF_SUCCESS; } CloseHandle(pThisR->Overlapped.hEvent); } RTCritSectDelete(&pThisW->CritSect); } RTCritSectDelete(&pThisR->CritSect); } RTMemFree(pThisW); } else rc = VERR_NO_MEMORY; RTMemFree(pThisR); } else rc = VERR_NO_MEMORY; CloseHandle(hPipeR); CloseHandle(hPipeW); return rc; } /** * Common worker for handling I/O completion. * * This is used by RTPipeClose, RTPipeWrite and RTPipeWriteBlocking. * * @returns IPRT status code. * @param pThis The pipe instance handle. */ static int rtPipeWriteCheckCompletion(RTPIPEINTERNAL *pThis) { int rc; DWORD dwRc = WaitForSingleObject(pThis->Overlapped.hEvent, 0); if (dwRc == WAIT_OBJECT_0) { DWORD cbWritten = 0; if (GetOverlappedResult(pThis->hPipe, &pThis->Overlapped, &cbWritten, TRUE)) { for (;;) { if (cbWritten >= pThis->cbBounceBufUsed) { pThis->fIOPending = false; rc = VINF_SUCCESS; break; } /* resubmit the remainder of the buffer - can this actually happen? */ memmove(&pThis->pbBounceBuf[0], &pThis->pbBounceBuf[cbWritten], pThis->cbBounceBufUsed - cbWritten); rc = ResetEvent(pThis->Overlapped.hEvent); Assert(rc == TRUE); if (!WriteFile(pThis->hPipe, pThis->pbBounceBuf, (DWORD)pThis->cbBounceBufUsed, &cbWritten, &pThis->Overlapped)) { if (GetLastError() == ERROR_IO_PENDING) rc = VINF_TRY_AGAIN; else { pThis->fIOPending = false; if (GetLastError() == ERROR_NO_DATA) rc = VERR_BROKEN_PIPE; else rc = RTErrConvertFromWin32(GetLastError()); if (rc == VERR_BROKEN_PIPE) pThis->fBrokenPipe = true; } break; } Assert(cbWritten > 0); } } else { pThis->fIOPending = false; rc = RTErrConvertFromWin32(GetLastError()); } } else if (dwRc == WAIT_TIMEOUT) rc = VINF_TRY_AGAIN; else { pThis->fIOPending = false; if (dwRc == WAIT_ABANDONED) rc = VERR_INVALID_HANDLE; else rc = RTErrConvertFromWin32(GetLastError()); } return rc; } RTDECL(int) RTPipeClose(RTPIPE hPipe) { RTPIPEINTERNAL *pThis = hPipe; if (pThis == NIL_RTPIPE) return VINF_SUCCESS; AssertPtrReturn(pThis, VERR_INVALID_PARAMETER); AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE); /* * Do the cleanup. */ AssertReturn(ASMAtomicCmpXchgU32(&pThis->u32Magic, ~RTPIPE_MAGIC, RTPIPE_MAGIC), VERR_INVALID_HANDLE); RTCritSectEnter(&pThis->CritSect); Assert(pThis->cUsers == 0); if (!pThis->fRead && pThis->fIOPending) rtPipeWriteCheckCompletion(pThis); CloseHandle(pThis->hPipe); pThis->hPipe = INVALID_HANDLE_VALUE; CloseHandle(pThis->Overlapped.hEvent); pThis->Overlapped.hEvent = NULL; RTMemFree(pThis->pbBounceBuf); pThis->pbBounceBuf = NULL; RTCritSectLeave(&pThis->CritSect); RTCritSectDelete(&pThis->CritSect); RTMemFree(pThis); return VINF_SUCCESS; } RTDECL(int) RTPipeFromNative(PRTPIPE phPipe, RTHCINTPTR hNativePipe, uint32_t fFlags) { AssertPtrReturn(phPipe, VERR_INVALID_POINTER); AssertReturn(!(fFlags & ~RTPIPE_N_VALID_MASK), VERR_INVALID_PARAMETER); AssertReturn(!!(fFlags & RTPIPE_N_READ) != !!(fFlags & RTPIPE_N_WRITE), VERR_INVALID_PARAMETER); /* * Get and validate the pipe handle info. */ HANDLE hNative = (HANDLE)hNativePipe; AssertReturn(GetFileType(hNative) == FILE_TYPE_PIPE, VERR_INVALID_HANDLE); DWORD cMaxInstances; DWORD fInfo; if (!GetNamedPipeInfo(hNative, &fInfo, NULL, NULL, &cMaxInstances)) return RTErrConvertFromWin32(GetLastError()); AssertReturn(!(fInfo & PIPE_TYPE_MESSAGE), VERR_INVALID_HANDLE); AssertReturn(cMaxInstances == 1, VERR_INVALID_HANDLE); DWORD cInstances; DWORD fState; if (!GetNamedPipeHandleState(hNative, &fState, &cInstances, NULL, NULL, NULL, 0)) return RTErrConvertFromWin32(GetLastError()); AssertReturn(!(fState & PIPE_NOWAIT), VERR_INVALID_HANDLE); AssertReturn(!(fState & PIPE_READMODE_MESSAGE), VERR_INVALID_HANDLE); AssertReturn(cInstances <= 1, VERR_INVALID_HANDLE); /* * Looks kind of OK, create a handle so we can try rtPipeQueryInfo on it * and see if we need to duplicate it to make that call work. */ RTPIPEINTERNAL *pThis = (RTPIPEINTERNAL *)RTMemAllocZ(sizeof(RTPIPEINTERNAL)); if (!pThis) return VERR_NO_MEMORY; int rc = RTCritSectInit(&pThis->CritSect); if (RT_SUCCESS(rc)) { pThis->Overlapped.hEvent = CreateEvent(NULL, TRUE /*fManualReset*/, TRUE /*fInitialState*/, NULL /*pName*/); if (pThis->Overlapped.hEvent != NULL) { pThis->u32Magic = RTPIPE_MAGIC; pThis->hPipe = hNative; pThis->fRead = !!(fFlags & RTPIPE_N_READ); //pThis->fIOPending = false; //pThis->fZeroByteRead = false; //pThis->fBrokenPipe = false; //pThisR->fPromisedWritable= false; //pThis->cUsers = 0; //pThis->pbBounceBuf = NULL; //pThis->cbBounceBufUsed = 0; //pThis->cbBounceBufAlloc= 0; pThis->hPollSet = NIL_RTPOLLSET; HANDLE hNative2 = INVALID_HANDLE_VALUE; FILE_PIPE_LOCAL_INFORMATION Info; if (rtPipeQueryInfo(pThis, &Info)) rc = VINF_SUCCESS; else { if (DuplicateHandle(GetCurrentProcess() /*hSrcProcess*/, hNative /*hSrcHandle*/, GetCurrentProcess() /*hDstProcess*/, &hNative2 /*phDstHandle*/, pThis->fRead ? GENERIC_READ : GENERIC_WRITE | FILE_READ_ATTRIBUTES /*dwDesiredAccess*/, !!(fFlags & RTPIPE_N_INHERIT) /*fInheritHandle*/, 0 /*dwOptions*/)) { pThis->hPipe = hNative2; if (rtPipeQueryInfo(pThis, &Info)) rc = VINF_SUCCESS; else { rc = VERR_ACCESS_DENIED; CloseHandle(hNative2); } } else hNative2 = INVALID_HANDLE_VALUE; } if (RT_SUCCESS(rc)) { /* * Verify the pipe state and correct the inheritability. */ AssertStmt( Info.NamedPipeState == FILE_PIPE_CONNECTED_STATE || Info.NamedPipeState == FILE_PIPE_CLOSING_STATE || Info.NamedPipeState == FILE_PIPE_DISCONNECTED_STATE, VERR_INVALID_HANDLE); AssertStmt( Info.NamedPipeConfiguration == ( Info.NamedPipeEnd == FILE_PIPE_SERVER_END ? (pThis->fRead ? FILE_PIPE_INBOUND : FILE_PIPE_OUTBOUND) : (pThis->fRead ? FILE_PIPE_OUTBOUND : FILE_PIPE_INBOUND) ), VERR_INVALID_HANDLE); if ( RT_SUCCESS(rc) && hNative2 == INVALID_HANDLE_VALUE && !SetHandleInformation(hNative, HANDLE_FLAG_INHERIT /*dwMask*/, fFlags & RTPIPE_N_INHERIT ? HANDLE_FLAG_INHERIT : 0)) { rc = RTErrConvertFromWin32(GetLastError()); AssertMsgFailed(("%Rrc\n", rc)); } if (RT_SUCCESS(rc)) { /* * Ok, we're good! */ if (hNative2 != INVALID_HANDLE_VALUE) CloseHandle(hNative); *phPipe = pThis; return VINF_SUCCESS; } } /* Bail out. */ if (hNative2 != INVALID_HANDLE_VALUE) CloseHandle(hNative2); CloseHandle(pThis->Overlapped.hEvent); } RTCritSectDelete(&pThis->CritSect); } RTMemFree(pThis); return rc; } RTDECL(RTHCINTPTR) RTPipeToNative(RTPIPE hPipe) { RTPIPEINTERNAL *pThis = hPipe; AssertPtrReturn(pThis, -1); AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, -1); return (RTHCINTPTR)pThis->hPipe; } RTDECL(int) RTPipeRead(RTPIPE hPipe, void *pvBuf, size_t cbToRead, size_t *pcbRead) { RTPIPEINTERNAL *pThis = hPipe; AssertPtrReturn(pThis, VERR_INVALID_HANDLE); AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE); AssertReturn(pThis->fRead, VERR_ACCESS_DENIED); AssertPtr(pcbRead); AssertPtr(pvBuf); int rc = RTCritSectEnter(&pThis->CritSect); if (RT_SUCCESS(rc)) { /* No concurrent readers, sorry. */ if (pThis->cUsers == 0) { pThis->cUsers++; /* * Kick of a an overlapped read. It should return immedately if * there is bytes in the buffer. If not, we'll cancel it and see * what we get back. */ rc = ResetEvent(pThis->Overlapped.hEvent); Assert(rc == TRUE); DWORD cbRead = 0; if ( cbToRead == 0 || ReadFile(pThis->hPipe, pvBuf, cbToRead <= ~(DWORD)0 ? (DWORD)cbToRead : ~(DWORD)0, &cbRead, &pThis->Overlapped)) { *pcbRead = cbRead; rc = VINF_SUCCESS; } else if (GetLastError() == ERROR_IO_PENDING) { pThis->fIOPending = true; RTCritSectLeave(&pThis->CritSect); if (!CancelIo(pThis->hPipe)) WaitForSingleObject(pThis->Overlapped.hEvent, INFINITE); if (GetOverlappedResult(pThis->hPipe, &pThis->Overlapped, &cbRead, TRUE /*fWait*/)) { *pcbRead = cbRead; rc = VINF_SUCCESS; } else if (GetLastError() == ERROR_OPERATION_ABORTED) { *pcbRead = 0; rc = VINF_TRY_AGAIN; } else rc = RTErrConvertFromWin32(GetLastError()); RTCritSectEnter(&pThis->CritSect); pThis->fIOPending = false; } else rc = RTErrConvertFromWin32(GetLastError()); if (rc == VERR_BROKEN_PIPE) pThis->fBrokenPipe = true; pThis->cUsers--; } else rc = VERR_WRONG_ORDER; RTCritSectLeave(&pThis->CritSect); } return rc; } RTDECL(int) RTPipeReadBlocking(RTPIPE hPipe, void *pvBuf, size_t cbToRead, size_t *pcbRead) { RTPIPEINTERNAL *pThis = hPipe; AssertPtrReturn(pThis, VERR_INVALID_HANDLE); AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE); AssertReturn(pThis->fRead, VERR_ACCESS_DENIED); AssertPtr(pvBuf); int rc = RTCritSectEnter(&pThis->CritSect); if (RT_SUCCESS(rc)) { /* No concurrent readers, sorry. */ if (pThis->cUsers == 0) { pThis->cUsers++; size_t cbTotalRead = 0; while (cbToRead > 0) { /* * Kick of a an overlapped read. It should return immedately if * there is bytes in the buffer. If not, we'll cancel it and see * what we get back. */ rc = ResetEvent(pThis->Overlapped.hEvent); Assert(rc == TRUE); DWORD cbRead = 0; pThis->fIOPending = true; RTCritSectLeave(&pThis->CritSect); if (ReadFile(pThis->hPipe, pvBuf, cbToRead <= ~(DWORD)0 ? (DWORD)cbToRead : ~(DWORD)0, &cbRead, &pThis->Overlapped)) rc = VINF_SUCCESS; else if (GetLastError() == ERROR_IO_PENDING) { WaitForSingleObject(pThis->Overlapped.hEvent, INFINITE); if (GetOverlappedResult(pThis->hPipe, &pThis->Overlapped, &cbRead, TRUE /*fWait*/)) rc = VINF_SUCCESS; else rc = RTErrConvertFromWin32(GetLastError()); } else rc = RTErrConvertFromWin32(GetLastError()); RTCritSectEnter(&pThis->CritSect); pThis->fIOPending = false; if (RT_FAILURE(rc)) break; /* advance */ cbToRead -= cbRead; cbTotalRead += cbRead; pvBuf = (uint8_t *)pvBuf + cbRead; } if (rc == VERR_BROKEN_PIPE) pThis->fBrokenPipe = true; if (pcbRead) { *pcbRead = cbTotalRead; if ( RT_FAILURE(rc) && cbTotalRead && rc != VERR_INVALID_POINTER) rc = VINF_SUCCESS; } pThis->cUsers--; } else rc = VERR_WRONG_ORDER; RTCritSectLeave(&pThis->CritSect); } return rc; } RTDECL(int) RTPipeWrite(RTPIPE hPipe, const void *pvBuf, size_t cbToWrite, size_t *pcbWritten) { RTPIPEINTERNAL *pThis = hPipe; AssertPtrReturn(pThis, VERR_INVALID_HANDLE); AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE); AssertReturn(!pThis->fRead, VERR_ACCESS_DENIED); AssertPtr(pcbWritten); AssertPtr(pvBuf); int rc = RTCritSectEnter(&pThis->CritSect); if (RT_SUCCESS(rc)) { /* No concurrent readers, sorry. */ if (pThis->cUsers == 0) { pThis->cUsers++; /* If I/O is pending, check if it has completed. */ if (pThis->fIOPending) rc = rtPipeWriteCheckCompletion(pThis); else rc = VINF_SUCCESS; if (rc == VINF_SUCCESS) { Assert(!pThis->fIOPending); /* Adjust the number of bytes to write to fit into the current buffer quota, unless we've promissed stuff in RTPipeSelectOne. WriteQuotaAvailable better not be zero when it shouldn't!! */ FILE_PIPE_LOCAL_INFORMATION Info; if ( !pThis->fPromisedWritable && cbToWrite > 0 && rtPipeQueryInfo(pThis, &Info)) { if (Info.NamedPipeState == FILE_PIPE_CLOSING_STATE) rc = VERR_BROKEN_PIPE; else if ( cbToWrite >= Info.WriteQuotaAvailable && Info.OutboundQuota != 0) { cbToWrite = Info.WriteQuotaAvailable; if (!cbToWrite) rc = VINF_TRY_AGAIN; } } pThis->fPromisedWritable = false; /* Do the bounce buffering. */ if ( pThis->cbBounceBufAlloc < cbToWrite && pThis->cbBounceBufAlloc < RTPIPE_NT_SIZE) { if (cbToWrite > RTPIPE_NT_SIZE) cbToWrite = RTPIPE_NT_SIZE; void *pv = RTMemRealloc(pThis->pbBounceBuf, RT_ALIGN_Z(cbToWrite, _1K)); if (pv) { pThis->pbBounceBuf = (uint8_t *)pv; pThis->cbBounceBufAlloc = RT_ALIGN_Z(cbToWrite, _1K); } else rc = VERR_NO_MEMORY; } else if (cbToWrite > RTPIPE_NT_SIZE) cbToWrite = RTPIPE_NT_SIZE; if (RT_SUCCESS(rc) && cbToWrite) { memcpy(pThis->pbBounceBuf, pvBuf, cbToWrite); pThis->cbBounceBufUsed = (uint32_t)cbToWrite; /* Submit the write. */ rc = ResetEvent(pThis->Overlapped.hEvent); Assert(rc == TRUE); DWORD cbWritten = 0; if (WriteFile(pThis->hPipe, pThis->pbBounceBuf, (DWORD)pThis->cbBounceBufUsed, &cbWritten, &pThis->Overlapped)) { *pcbWritten = cbWritten; rc = VINF_SUCCESS; } else if (GetLastError() == ERROR_IO_PENDING) { *pcbWritten = cbToWrite; pThis->fIOPending = true; rc = VINF_SUCCESS; } else if (GetLastError() == ERROR_NO_DATA) rc = VERR_BROKEN_PIPE; else rc = RTErrConvertFromWin32(GetLastError()); } else if (RT_SUCCESS(rc)) *pcbWritten = 0; } else if (RT_SUCCESS(rc)) *pcbWritten = 0; if (rc == VERR_BROKEN_PIPE) pThis->fBrokenPipe = true; pThis->cUsers--; } else rc = VERR_WRONG_ORDER; RTCritSectLeave(&pThis->CritSect); } return rc; } RTDECL(int) RTPipeWriteBlocking(RTPIPE hPipe, const void *pvBuf, size_t cbToWrite, size_t *pcbWritten) { RTPIPEINTERNAL *pThis = hPipe; AssertPtrReturn(pThis, VERR_INVALID_HANDLE); AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE); AssertReturn(!pThis->fRead, VERR_ACCESS_DENIED); AssertPtr(pvBuf); AssertPtrNull(pcbWritten); int rc = RTCritSectEnter(&pThis->CritSect); if (RT_SUCCESS(rc)) { /* No concurrent readers, sorry. */ if (pThis->cUsers == 0) { pThis->cUsers++; /* * If I/O is pending, wait for it to complete. */ if (pThis->fIOPending) { rc = rtPipeWriteCheckCompletion(pThis); while (rc == VINF_TRY_AGAIN) { Assert(pThis->fIOPending); HANDLE hEvent = pThis->Overlapped.hEvent; RTCritSectLeave(&pThis->CritSect); WaitForSingleObject(pThis->Overlapped.hEvent, INFINITE); RTCritSectEnter(&pThis->CritSect); } } if (RT_SUCCESS(rc)) { Assert(!pThis->fIOPending); pThis->fPromisedWritable = false; /* * Try write everything. * No bounce buffering, cUsers protects us. */ size_t cbTotalWritten = 0; while (cbToWrite > 0) { rc = ResetEvent(pThis->Overlapped.hEvent); Assert(rc == TRUE); pThis->fIOPending = true; RTCritSectLeave(&pThis->CritSect); DWORD cbWritten = 0; if (WriteFile(pThis->hPipe, pvBuf, cbToWrite <= ~(DWORD)0 ? (DWORD)cbToWrite : ~(DWORD)0, &cbWritten, &pThis->Overlapped)) rc = VINF_SUCCESS; else if (GetLastError() == ERROR_IO_PENDING) { WaitForSingleObject(pThis->Overlapped.hEvent, INFINITE); if (GetOverlappedResult(pThis->hPipe, &pThis->Overlapped, &cbWritten, TRUE /*fWait*/)) rc = VINF_SUCCESS; else rc = RTErrConvertFromWin32(GetLastError()); } else if (GetLastError() == ERROR_NO_DATA) rc = VERR_BROKEN_PIPE; else rc = RTErrConvertFromWin32(GetLastError()); RTCritSectEnter(&pThis->CritSect); pThis->fIOPending = false; if (RT_FAILURE(rc)) break; /* advance */ pvBuf = (char const *)pvBuf + cbWritten; cbTotalWritten += cbWritten; cbToWrite -= cbWritten; } if (pcbWritten) { *pcbWritten = cbTotalWritten; if ( RT_FAILURE(rc) && cbTotalWritten && rc != VERR_INVALID_POINTER) rc = VINF_SUCCESS; } } if (rc == VERR_BROKEN_PIPE) pThis->fBrokenPipe = true; pThis->cUsers--; } else rc = VERR_WRONG_ORDER; RTCritSectLeave(&pThis->CritSect); } return rc; #if 1 return VERR_NOT_IMPLEMENTED; #else int rc = rtPipeTryBlocking(pThis); if (RT_SUCCESS(rc)) { size_t cbTotalWritten = 0; while (cbToWrite > 0) { ssize_t cbWritten = write(pThis->fd, pvBuf, RT_MIN(cbToWrite, SSIZE_MAX)); if (cbWritten < 0) { rc = RTErrConvertFromErrno(errno); break; } /* advance */ pvBuf = (char const *)pvBuf + cbWritten; cbTotalWritten += cbWritten; cbToWrite -= cbWritten; } if (pcbWritten) { *pcbWritten = cbTotalWritten; if ( RT_FAILURE(rc) && cbTotalWritten && rc != VERR_INVALID_POINTER) rc = VINF_SUCCESS; } ASMAtomicDecU32(&pThis->u32State); } return rc; #endif } RTDECL(int) RTPipeFlush(RTPIPE hPipe) { RTPIPEINTERNAL *pThis = hPipe; AssertPtrReturn(pThis, VERR_INVALID_HANDLE); AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE); AssertReturn(!pThis->fRead, VERR_ACCESS_DENIED); if (!FlushFileBuffers(pThis->hPipe)) { int rc = RTErrConvertFromWin32(GetLastError()); if (rc == VERR_BROKEN_PIPE) pThis->fBrokenPipe = true; return rc; } return VINF_SUCCESS; } RTDECL(int) RTPipeSelectOne(RTPIPE hPipe, RTMSINTERVAL cMillies) { RTPIPEINTERNAL *pThis = hPipe; AssertPtrReturn(pThis, VERR_INVALID_HANDLE); AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE); uint64_t const StartMsTS = RTTimeMilliTS(); int rc = RTCritSectEnter(&pThis->CritSect); if (RT_FAILURE(rc)) return rc; for (unsigned iLoop = 0;; iLoop++) { HANDLE hWait = INVALID_HANDLE_VALUE; if (pThis->fRead) { if (pThis->fIOPending) hWait = pThis->Overlapped.hEvent; else { /* Peek at the pipe buffer and see how many bytes it contains. */ DWORD cbAvailable; if ( PeekNamedPipe(pThis->hPipe, NULL, 0, NULL, &cbAvailable, NULL) && cbAvailable > 0) { rc = VINF_SUCCESS; break; } /* Start a zero byte read operation that we can wait on. */ if (cMillies == 0) { rc = VERR_TIMEOUT; break; } AssertBreakStmt(pThis->cUsers == 0, rc = VERR_INTERNAL_ERROR_5); rc = ResetEvent(pThis->Overlapped.hEvent); Assert(rc == TRUE); DWORD cbRead = 0; if (ReadFile(pThis->hPipe, pThis->abBuf, 0, &cbRead, &pThis->Overlapped)) { rc = VINF_SUCCESS; if (iLoop > 10) RTThreadYield(); } else if (GetLastError() == ERROR_IO_PENDING) { pThis->cUsers++; pThis->fIOPending = true; pThis->fZeroByteRead = true; hWait = pThis->Overlapped.hEvent; } else rc = RTErrConvertFromWin32(GetLastError()); } } else { if (pThis->fIOPending) { rc = rtPipeWriteCheckCompletion(pThis); if (RT_FAILURE(rc)) break; } if (pThis->fIOPending) hWait = pThis->Overlapped.hEvent; else { FILE_PIPE_LOCAL_INFORMATION Info; if (rtPipeQueryInfo(pThis, &Info)) { /* Check for broken pipe. */ if (Info.NamedPipeState == FILE_PIPE_CLOSING_STATE) { rc = VERR_BROKEN_PIPE; break; } /* Check for available write buffer space. */ else if (Info.WriteQuotaAvailable > 0) { pThis->fPromisedWritable = false; rc = VINF_SUCCESS; break; } /* delayed buffer alloc or timeout: phony promise later: See if we still can associate a semaphore with the pipe, like on OS/2. */ else if ( Info.OutboundQuota == 0 || cMillies) { pThis->fPromisedWritable = true; rc = VINF_SUCCESS; break; } } else { pThis->fPromisedWritable = true; rc = VINF_SUCCESS; break; } } } if (RT_FAILURE(rc)) break; /* * Check for timeout. */ DWORD cMsMaxWait = INFINITE; if ( cMillies != RT_INDEFINITE_WAIT && ( hWait != INVALID_HANDLE_VALUE || iLoop > 10) ) { uint64_t cElapsed = RTTimeMilliTS() - StartMsTS; if (cElapsed >= cMillies) { rc = VERR_TIMEOUT; break; } cMsMaxWait = cMillies - (uint32_t)cElapsed; } /* * Wait. */ if (hWait != INVALID_HANDLE_VALUE) { RTCritSectLeave(&pThis->CritSect); DWORD dwRc = WaitForSingleObject(hWait, cMsMaxWait); if (dwRc == WAIT_OBJECT_0) rc = VINF_SUCCESS; else if (dwRc == WAIT_TIMEOUT) rc = VERR_TIMEOUT; else if (dwRc == WAIT_ABANDONED) rc = VERR_INVALID_HANDLE; else rc = RTErrConvertFromWin32(GetLastError()); if ( RT_FAILURE(rc) && pThis->u32Magic != RTPIPE_MAGIC) return rc; RTCritSectEnter(&pThis->CritSect); if (pThis->fZeroByteRead) { pThis->cUsers--; pThis->fIOPending = false; if (rc != VINF_SUCCESS) CancelIo(pThis->hPipe); DWORD cbRead = 0; GetOverlappedResult(pThis->hPipe, &pThis->Overlapped, &cbRead, TRUE /*fWait*/); } if (RT_FAILURE(rc)) break; } } if (rc == VERR_BROKEN_PIPE) pThis->fBrokenPipe = true; RTCritSectLeave(&pThis->CritSect); return rc; } /** * Internal RTPollSetAdd helper that returns the handle that should be added to * the pollset. * * @returns Valid handle on success, INVALID_HANDLE_VALUE on failure. * @param hPipe The pipe handle. * @param fEvents The events we're polling for. * @param ph wher to put the primary handle. */ int rtPipePollGetHandle(RTPIPE hPipe, uint32_t fEvents, PHANDLE ph) { RTPIPEINTERNAL *pThis = hPipe; AssertPtrReturn(pThis, VERR_INVALID_HANDLE); AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE); AssertReturn(!(fEvents & RTPOLL_EVT_READ) || pThis->fRead, VERR_INVALID_PARAMETER); AssertReturn(!(fEvents & RTPOLL_EVT_WRITE) || !pThis->fRead, VERR_INVALID_PARAMETER); /* Later: Try register an event handle with the pipe like on OS/2, there is a file control for doing this obviously intended for the OS/2 subsys. The question is whether this still exists on Vista and W7. */ *ph = pThis->Overlapped.hEvent; return VINF_SUCCESS; } /** * Checks for pending events. * * @returns Event mask or 0. * @param pThis The pipe handle. * @param fEvents The desired events. */ static uint32_t rtPipePollCheck(RTPIPEINTERNAL *pThis, uint32_t fEvents) { uint32_t fRetEvents = 0; if (pThis->fBrokenPipe) fRetEvents |= RTPOLL_EVT_ERROR; else if (pThis->fRead) { if (!pThis->fIOPending) { DWORD cbAvailable; if (PeekNamedPipe(pThis->hPipe, NULL, 0, NULL, &cbAvailable, NULL)) { if ( (fEvents & RTPOLL_EVT_READ) && cbAvailable > 0) fRetEvents |= RTPOLL_EVT_READ; } else { if (GetLastError() == ERROR_BROKEN_PIPE) pThis->fBrokenPipe = true; fRetEvents |= RTPOLL_EVT_ERROR; } } } else { if (pThis->fIOPending) { rtPipeWriteCheckCompletion(pThis); if (pThis->fBrokenPipe) fRetEvents |= RTPOLL_EVT_ERROR; } if ( !pThis->fIOPending && !fRetEvents) { FILE_PIPE_LOCAL_INFORMATION Info; if (rtPipeQueryInfo(pThis, &Info)) { /* Check for broken pipe. */ if (Info.NamedPipeState == FILE_PIPE_CLOSING_STATE) { fRetEvents = RTPOLL_EVT_ERROR; pThis->fBrokenPipe = true; } /* Check if there is available buffer space. */ if ( !fRetEvents && (fEvents & RTPOLL_EVT_WRITE) && ( Info.WriteQuotaAvailable > 0 || Info.OutboundQuota == 0) ) fRetEvents |= RTPOLL_EVT_WRITE; } else if (fEvents & RTPOLL_EVT_WRITE) fRetEvents |= RTPOLL_EVT_WRITE; } } return fRetEvents; } /** * Internal RTPoll helper that polls the pipe handle and, if @a fNoWait is * clear, starts whatever actions we've got running during the poll call. * * @returns 0 if no pending events, actions initiated if @a fNoWait is clear. * Event mask (in @a fEvents) and no actions if the handle is ready * already. * UINT32_MAX (asserted) if the pipe handle is busy in I/O or a * different poll set. * * @param hPipe The pipe handle. * @param hPollSet The poll set handle (for access checks). * @param fEvents The events we're polling for. * @param fFinalEntry Set if this is the final entry for this handle * in this poll set. This can be used for dealing * with duplicate entries. * @param fNoWait Set if it's a zero-wait poll call. Clear if * we'll wait for an event to occur. */ uint32_t rtPipePollStart(RTPIPE hPipe, RTPOLLSET hPollSet, uint32_t fEvents, bool fFinalEntry, bool fNoWait) { /** @todo All this polling code could be optimized to make fewer system * calls; like for instance the ResetEvent calls. */ RTPIPEINTERNAL *pThis = hPipe; AssertPtrReturn(pThis, UINT32_MAX); AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, UINT32_MAX); int rc = RTCritSectEnter(&pThis->CritSect); AssertRCReturn(rc, UINT32_MAX); /* Check that this is the only current use of this pipe. */ uint32_t fRetEvents; if ( pThis->cUsers == 0 || pThis->hPollSet == hPollSet) { /* Check what the current events are. */ fRetEvents = rtPipePollCheck(pThis, fEvents); if ( !fRetEvents && !fNoWait) { /* Make sure the event semaphore has been reset. */ if (!pThis->fIOPending) { rc = ResetEvent(pThis->Overlapped.hEvent); Assert(rc == TRUE); } /* Kick off the zero byte read thing if applicable. */ if ( !pThis->fIOPending && pThis->fRead && (fEvents & RTPOLL_EVT_READ) ) { DWORD cbRead = 0; if (ReadFile(pThis->hPipe, pThis->abBuf, 0, &cbRead, &pThis->Overlapped)) fRetEvents = rtPipePollCheck(pThis, fEvents); else if (GetLastError() == ERROR_IO_PENDING) { pThis->fIOPending = true; pThis->fZeroByteRead = true; } else fRetEvents = RTPOLL_EVT_ERROR; } /* If we're still set for the waiting, record the poll set and mark the pipe used. */ if (!fRetEvents) { pThis->cUsers++; pThis->hPollSet = hPollSet; } } } else { AssertFailed(); fRetEvents = UINT32_MAX; } RTCritSectLeave(&pThis->CritSect); return fRetEvents; } /** * Called after a WaitForMultipleObjects returned in order to check for pending * events and stop whatever actions that rtPipePollStart() initiated. * * @returns Event mask or 0. * * @param hPipe The pipe handle. * @param fEvents The events we're polling for. * @param fFinalEntry Set if this is the final entry for this handle * in this poll set. This can be used for dealing * with duplicate entries. Only keep in mind that * this method is called in reverse order, so the * first call will have this set (when the entire * set was processed). */ uint32_t rtPipePollDone(RTPIPE hPipe, uint32_t fEvents, bool fFinalEntry) { RTPIPEINTERNAL *pThis = hPipe; AssertPtrReturn(pThis, 0); AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, 0); int rc = RTCritSectEnter(&pThis->CritSect); AssertRCReturn(rc, 0); Assert(pThis->cUsers > 0); /* Cancel the zero byte read. */ uint32_t fRetEvents = 0; if (pThis->fZeroByteRead) { CancelIo(pThis->hPipe); DWORD cbRead = 0; if ( !GetOverlappedResult(pThis->hPipe, &pThis->Overlapped, &cbRead, TRUE /*fWait*/) && GetLastError() != ERROR_OPERATION_ABORTED) fRetEvents = RTPOLL_EVT_ERROR; pThis->fIOPending = false; pThis->fZeroByteRead = false; } /* harvest events. */ fRetEvents |= rtPipePollCheck(pThis, fEvents); /* update counters. */ pThis->cUsers--; if (!pThis->cUsers) pThis->hPollSet = NIL_RTPOLLSET; RTCritSectLeave(&pThis->CritSect); return fRetEvents; }