Changeset 44462 in vbox
- Timestamp:
- Jan 30, 2013 12:44:14 PM (12 years ago)
- svn:sync-xref-src-repo-rev:
- 83468
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/src/VBox/Runtime/r3/os2/pipe-os2.cpp
r39690 r44462 5 5 6 6 /* 7 * Copyright (C) 2010 Oracle Corporation7 * Copyright (C) 2010-2013 Oracle Corporation 8 8 * 9 9 * This file is part of VirtualBox Open Source Edition (OSE), as … … 29 29 * Header Files * 30 30 *******************************************************************************/ 31 #define INCL_ERRORS 32 #define INCL_DOSSEMAPHORES 33 #include <os2.h> 34 31 35 #include <iprt/pipe.h> 32 36 #include "internal/iprt.h" 33 37 38 #include <iprt/asm.h> 34 39 #include <iprt/assert.h> 40 #include <iprt/critsect.h> 35 41 #include <iprt/err.h> 42 #include <iprt/mem.h> 43 #include <iprt/string.h> 44 #include <iprt/poll.h> 45 #include <iprt/process.h> 46 #include <iprt/thread.h> 47 #include <iprt/time.h> 48 #include "internal/pipe.h" 49 #include "internal/magics.h" 50 51 52 /******************************************************************************* 53 * Defined Constants And Macros * 54 *******************************************************************************/ 55 /** The pipe buffer size we prefer. */ 56 #define RTPIPE_OS2_SIZE _32K 57 58 59 /******************************************************************************* 60 * Structures and Typedefs * 61 *******************************************************************************/ 62 typedef struct RTPIPEINTERNAL 63 { 64 /** Magic value (RTPIPE_MAGIC). */ 65 uint32_t u32Magic; 66 /** The pipe handle. */ 67 HPIPE hPipe; 68 /** Set if this is the read end, clear if it's the write end. */ 69 bool fRead; 70 /** Whether the pipe is in blocking or non-blocking mode. */ 71 bool fBlocking; 72 /** Set if the pipe is broken. */ 73 bool fBrokenPipe; 74 /** Usage counter. */ 75 uint32_t cUsers; 76 77 /** The event semaphore associated with the pipe. */ 78 HEV hev; 79 /** The handle of the poll set currently polling on this pipe. 80 * We can only have one poller at the time (lazy bird). */ 81 RTPOLLSET hPollSet; 82 /** Critical section protecting the above members. 83 * (Taking the lazy/simple approach.) */ 84 RTCRITSECT CritSect; 85 86 } RTPIPEINTERNAL; 87 88 89 /** 90 * Ensures that the pipe has a semaphore associated with it. 91 * 92 * @returns VBox status code. 93 * @param pThis The pipe. 94 */ 95 static int rtPipeOs2EnsureSem(RTPIPEINTERNAL *pThis) 96 { 97 if (pThis->hev != NULLHANDLE) 98 return VINF_SUCCESS; 99 100 HEV hev; 101 APIRET orc = DosCreateEventSem(NULL, &hev, DC_SEM_SHARED, FALSE); 102 if (orc == NO_ERROR) 103 { 104 orc = DosSetNPipeSem(pThis->hPipe, (HSEM)hev, 1); 105 if (orc == NO_ERROR) 106 { 107 pThis->hev = hev; 108 return VINF_SUCCESS; 109 } 110 111 DosCloseEventSem(hev); 112 } 113 return RTErrConvertFromOS2(orc); 114 } 36 115 37 116 38 117 RTDECL(int) RTPipeCreate(PRTPIPE phPipeRead, PRTPIPE phPipeWrite, uint32_t fFlags) 39 118 { 40 return VERR_NOT_IMPLEMENTED; 119 AssertPtrReturn(phPipeRead, VERR_INVALID_POINTER); 120 AssertPtrReturn(phPipeWrite, VERR_INVALID_POINTER); 121 AssertReturn(!(fFlags & ~RTPIPE_C_VALID_MASK), VERR_INVALID_PARAMETER); 122 123 /* 124 * Try create and connect a pipe pair. 125 */ 126 APIRET orc; 127 HPIPE hPipeR; 128 HFILE hPipeW; 129 int rc; 130 for (;;) 131 { 132 static volatile uint32_t g_iNextPipe = 0; 133 char szName[128]; 134 RTStrPrintf(szName, sizeof(szName), "\\pipe\\iprt-pipe-%u-%u", RTProcSelf(), ASMAtomicIncU32(&g_iNextPipe)); 135 136 /* 137 * Create the read end of the pipe. 138 */ 139 ULONG fPipeMode = 1 /*instance*/ | NP_TYPE_BYTE | NP_READMODE_BYTE | NP_NOWAIT; 140 ULONG fOpenMode = NP_ACCESS_DUPLEX | NP_WRITEBEHIND; 141 if (fFlags & RTPIPE_C_INHERIT_READ) 142 fOpenMode |= NP_INHERIT; 143 else 144 fOpenMode |= NP_NOINHERIT; 145 orc = DosCreateNPipe((PSZ)szName, &hPipeR, fOpenMode, fPipeMode, RTPIPE_OS2_SIZE, RTPIPE_OS2_SIZE, NP_DEFAULT_WAIT); 146 if (orc == NO_ERROR) 147 { 148 orc = DosConnectNPipe(hPipeR); 149 if (orc == ERROR_PIPE_NOT_CONNECTED || orc == NO_ERROR) 150 { 151 /* 152 * Connect to the pipe (the write end), attach sem below. 153 */ 154 ULONG ulAction = 0; 155 ULONG fOpenW = OPEN_ACTION_FAIL_IF_NEW | OPEN_ACTION_OPEN_IF_EXISTS; 156 ULONG fModeW = OPEN_ACCESS_WRITEONLY | OPEN_SHARE_DENYNONE | OPEN_FLAGS_FAIL_ON_ERROR; 157 if (!(fFlags & RTPIPE_C_INHERIT_WRITE)) 158 fModeW |= OPEN_FLAGS_NOINHERIT; 159 orc = DosOpen((PSZ)szName, &hPipeW, &ulAction, 0 /*cbFile*/, FILE_NORMAL, 160 fOpenW, fModeW, NULL /*peaop2*/); 161 if (orc == NO_ERROR) 162 break; 163 } 164 DosClose(hPipeR); 165 } 166 if ( orc != ERROR_PIPE_BUSY /* already exist - compatible */ 167 && orc != ERROR_ACCESS_DENIED /* already exist - incompatible (?) */) 168 return RTErrConvertFromOS2(orc); 169 /* else: try again with a new name */ 170 } 171 172 /* 173 * Create the two handles. 174 */ 175 RTPIPEINTERNAL *pThisR = (RTPIPEINTERNAL *)RTMemAllocZ(sizeof(RTPIPEINTERNAL)); 176 if (pThisR) 177 { 178 RTPIPEINTERNAL *pThisW = (RTPIPEINTERNAL *)RTMemAllocZ(sizeof(RTPIPEINTERNAL)); 179 if (pThisW) 180 { 181 /* Crit sects. */ 182 rc = RTCritSectInit(&pThisR->CritSect); 183 if (RT_SUCCESS(rc)) 184 { 185 rc = RTCritSectInit(&pThisW->CritSect); 186 if (RT_SUCCESS(rc)) 187 { 188 /* Initialize the structures. */ 189 pThisR->u32Magic = RTPIPE_MAGIC; 190 pThisW->u32Magic = RTPIPE_MAGIC; 191 pThisR->hPipe = hPipeR; 192 pThisW->hPipe = hPipeW; 193 pThisR->hev = NULLHANDLE; 194 pThisW->hev = NULLHANDLE; 195 pThisR->fRead = true; 196 pThisW->fRead = false; 197 pThisR->fBlocking = false; 198 pThisW->fBlocking = true; 199 //pThisR->fBrokenPipe = false; 200 //pThisW->fBrokenPipe = false; 201 //pThisR->cUsers = 0; 202 //pThisW->cUsers = 0; 203 pThisR->hPollSet = NIL_RTPOLLSET; 204 pThisW->hPollSet = NIL_RTPOLLSET; 205 206 *phPipeRead = pThisR; 207 *phPipeWrite = pThisW; 208 return VINF_SUCCESS; 209 } 210 211 RTCritSectDelete(&pThisR->CritSect); 212 } 213 RTMemFree(pThisW); 214 } 215 else 216 rc = VERR_NO_MEMORY; 217 RTMemFree(pThisR); 218 } 219 else 220 rc = VERR_NO_MEMORY; 221 222 /* Don't call DosDisConnectNPipe! */ 223 DosClose(hPipeW); 224 DosClose(hPipeR); 225 return rc; 41 226 } 42 227 … … 44 229 RTDECL(int) RTPipeClose(RTPIPE hPipe) 45 230 { 46 return VERR_NOT_IMPLEMENTED; 231 RTPIPEINTERNAL *pThis = hPipe; 232 if (pThis == NIL_RTPIPE) 233 return VINF_SUCCESS; 234 AssertPtrReturn(pThis, VERR_INVALID_PARAMETER); 235 AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE); 236 237 /* 238 * Do the cleanup. 239 */ 240 AssertReturn(ASMAtomicCmpXchgU32(&pThis->u32Magic, ~RTPIPE_MAGIC, RTPIPE_MAGIC), VERR_INVALID_HANDLE); 241 RTCritSectEnter(&pThis->CritSect); 242 Assert(pThis->cUsers == 0); 243 244 /* Don't call DosDisConnectNPipe! */ 245 DosClose(pThis->hPipe); 246 pThis->hPipe = (HPIPE)-1; 247 248 if (pThis->hev != NULLHANDLE) 249 { 250 DosCloseEventSem(pThis->hev); 251 pThis->hev = NULLHANDLE; 252 } 253 254 RTCritSectLeave(&pThis->CritSect); 255 RTCritSectDelete(&pThis->CritSect); 256 257 RTMemFree(pThis); 258 259 return VINF_SUCCESS; 47 260 } 48 261 … … 50 263 RTDECL(int) RTPipeFromNative(PRTPIPE phPipe, RTHCINTPTR hNativePipe, uint32_t fFlags) 51 264 { 52 return VERR_NOT_IMPLEMENTED; 53 } 54 265 AssertPtrReturn(phPipe, VERR_INVALID_POINTER); 266 AssertReturn(!(fFlags & ~RTPIPE_N_VALID_MASK), VERR_INVALID_PARAMETER); 267 AssertReturn(!!(fFlags & RTPIPE_N_READ) != !!(fFlags & RTPIPE_N_WRITE), VERR_INVALID_PARAMETER); 268 269 /* 270 * Get and validate the pipe handle info. 271 */ 272 HPIPE hNative = (HPIPE)hNativePipe; 273 ULONG ulType = 0; 274 ULONG ulAttr = 0; 275 APIRET orc = DosQueryHType(hNative, &ulType, &ulAttr); 276 AssertMsgReturn(orc == NO_ERROR, ("%d\n", orc), RTErrConvertFromOS2(orc)); 277 AssertReturn((ulType & 0x7) == HANDTYPE_PIPE, VERR_INVALID_HANDLE); 278 279 #if 0 280 union 281 { 282 PIPEINFO PipeInfo; 283 uint8_t abPadding[sizeof(PIPEINFO) + 127]; 284 } Buf; 285 orc = DosQueryNPipeInfo(hNative, 1, &Buf, sizeof(Buf)); 286 if (orc != NO_ERROR) 287 { 288 /* Sorry, anonymous pips are not supported. */ 289 AssertMsgFailed(("%d\n", orc)); 290 return VERR_INVALID_HANDLE; 291 } 292 AssertReturn(Buf.PipeInfo.cbMaxInst == 1, VERR_INVALID_HANDLE); 293 #endif 294 295 ULONG fPipeState = 0; 296 orc = DosQueryNPHState(hNative, &fPipeState); 297 if (orc != NO_ERROR) 298 { 299 /* Sorry, anonymous pips are not supported. */ 300 AssertMsgFailed(("%d\n", orc)); 301 return VERR_INVALID_HANDLE; 302 } 303 AssertReturn(!(fPipeState & NP_TYPE_MESSAGE), VERR_INVALID_HANDLE); 304 AssertReturn(!(fPipeState & NP_READMODE_MESSAGE), VERR_INVALID_HANDLE); 305 AssertReturn((fPipeState & 0xff) == 1, VERR_INVALID_HANDLE); 306 307 ULONG fFileState = 0; 308 orc = DosQueryFHState(hNative, &fFileState); 309 AssertMsgReturn(orc == NO_ERROR, ("%d\n", orc), VERR_INVALID_HANDLE); 310 AssertMsgReturn( (fFileState & 0x3) == (fFlags & RTPIPE_N_READ ? OPEN_ACCESS_READONLY : OPEN_ACCESS_WRITEONLY) 311 || (fFileState & 0x3) == OPEN_ACCESS_READWRITE 312 , ("%#x\n", fFileState), VERR_INVALID_HANDLE); 313 314 /* 315 * Looks kind of OK. Fix the inherit flag. 316 */ 317 orc = DosSetFHState(hNative, (fFileState & (OPEN_FLAGS_WRITE_THROUGH | OPEN_FLAGS_FAIL_ON_ERROR | OPEN_FLAGS_NO_CACHE)) 318 | (fFlags & RTPIPE_N_INHERIT ? 0 : OPEN_FLAGS_NOINHERIT)); 319 AssertMsgReturn(orc == NO_ERROR, ("%d\n", orc), RTErrConvertFromOS2(orc)); 320 321 322 /* 323 * Create a handle so we can try rtPipeQueryInfo on it 324 * and see if we need to duplicate it to make that call work. 325 */ 326 RTPIPEINTERNAL *pThis = (RTPIPEINTERNAL *)RTMemAllocZ(sizeof(RTPIPEINTERNAL)); 327 if (!pThis) 328 return VERR_NO_MEMORY; 329 int rc = RTCritSectInit(&pThis->CritSect); 330 if (RT_SUCCESS(rc)) 331 { 332 pThis->u32Magic = RTPIPE_MAGIC; 333 pThis->hPipe = hNative; 334 pThis->hev = NULLHANDLE; 335 pThis->fRead = !!(fFlags & RTPIPE_N_READ); 336 pThis->fBlocking = !(fPipeState & NP_NOWAIT); 337 //pThis->fBrokenPipe = false; 338 //pThis->cUsers = 0; 339 pThis->hPollSet = NIL_RTPOLLSET; 340 341 *phPipe = pThis; 342 return VINF_SUCCESS; 343 344 //RTCritSectDelete(&pThis->CritSect); 345 } 346 RTMemFree(pThis); 347 return rc; 348 } 55 349 56 350 RTDECL(RTHCINTPTR) RTPipeToNative(RTPIPE hPipe) 57 351 { 58 return -1; 352 RTPIPEINTERNAL *pThis = hPipe; 353 AssertPtrReturn(pThis, -1); 354 AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, -1); 355 356 return (RTHCINTPTR)pThis->hPipe; 357 } 358 359 /** 360 * Prepare blocking mode. 361 * 362 * @returns IPRT status code. 363 * @retval VERR_WRONG_ORDER if simultaneous non-blocking and blocking access is 364 * attempted. 365 * 366 * @param pThis The pipe handle. 367 * 368 * @remarks Caller owns the critical section. 369 */ 370 static int rtPipeTryBlocking(RTPIPEINTERNAL *pThis) 371 { 372 if (!pThis->fBlocking) 373 { 374 if (pThis->cUsers != 0) 375 return VERR_WRONG_ORDER; 376 377 APIRET orc = DosSetNPHState(pThis->hPipe, NP_WAIT | NP_READMODE_BYTE); 378 if (orc != NO_ERROR) 379 { 380 if (orc != ERROR_BROKEN_PIPE && orc != ERROR_PIPE_NOT_CONNECTED) 381 return RTErrConvertFromOS2(orc); 382 pThis->fBrokenPipe = true; 383 } 384 pThis->fBlocking = true; 385 } 386 387 pThis->cUsers++; 388 return VINF_SUCCESS; 389 } 390 391 392 /** 393 * Prepare non-blocking mode. 394 * 395 * @returns IPRT status code. 396 * @retval VERR_WRONG_ORDER if simultaneous non-blocking and blocking access is 397 * attempted. 398 * 399 * @param pThis The pipe handle. 400 */ 401 static int rtPipeTryNonBlocking(RTPIPEINTERNAL *pThis) 402 { 403 if (pThis->fBlocking) 404 { 405 if (pThis->cUsers != 0) 406 return VERR_WRONG_ORDER; 407 408 APIRET orc = DosSetNPHState(pThis->hPipe, NP_NOWAIT | NP_READMODE_BYTE); 409 if (orc != NO_ERROR) 410 { 411 if (orc != ERROR_BROKEN_PIPE && orc != ERROR_PIPE_NOT_CONNECTED) 412 return RTErrConvertFromOS2(orc); 413 pThis->fBrokenPipe = true; 414 } 415 pThis->fBlocking = false; 416 } 417 418 pThis->cUsers++; 419 return VINF_SUCCESS; 420 } 421 422 423 /** 424 * Checks if the read pipe has been broken. 425 * 426 * @returns true if broken, false if no. 427 * @param pThis The pipe handle (read). 428 */ 429 static bool rtPipeOs2IsBroken(RTPIPEINTERNAL *pThis) 430 { 431 Assert(pThis->fRead); 432 433 #if 0 434 /* 435 * Query it via the semaphore. Not sure how fast this is... 436 */ 437 PIPESEMSTATE aStates[3]; RT_ZERO(aStates); 438 APIRET orc = DosQueryNPipeSemState(pThis->hev, &aStates[0], sizeof(aStates)); 439 if (orc == NO_ERROR) 440 { 441 if (aStates[0].fStatus == NPSS_CLOSE) 442 return true; 443 if (aStates[0].fStatus == NPSS_RDATA) 444 return false; 445 } 446 AssertMsgFailed(("%d / %d\n", orc, aStates[0].fStatus)); 447 448 /* 449 * Fall back / alternative method. 450 */ 451 #endif 452 ULONG cbActual = 0; 453 ULONG ulState = 0; 454 AVAILDATA Avail = { 0, 0 }; 455 APIRET orc = DosPeekNPipe(pThis->hPipe, NULL, 0, &cbActual, &Avail, &ulState); 456 if (orc != NO_ERROR) 457 { 458 if (orc != ERROR_PIPE_BUSY) 459 AssertMsgFailed(("%d\n", orc)); 460 return false; 461 } 462 463 return ulState != NP_STATE_CONNECTED; 59 464 } 60 465 … … 62 467 RTDECL(int) RTPipeRead(RTPIPE hPipe, void *pvBuf, size_t cbToRead, size_t *pcbRead) 63 468 { 64 return VERR_NOT_IMPLEMENTED; 469 RTPIPEINTERNAL *pThis = hPipe; 470 AssertPtrReturn(pThis, VERR_INVALID_HANDLE); 471 AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE); 472 AssertReturn(pThis->fRead, VERR_ACCESS_DENIED); 473 AssertPtr(pcbRead); 474 AssertPtr(pvBuf); 475 476 int rc = RTCritSectEnter(&pThis->CritSect); 477 if (RT_SUCCESS(rc)) 478 { 479 rc = rtPipeTryNonBlocking(pThis); 480 if (RT_SUCCESS(rc)) 481 { 482 RTCritSectLeave(&pThis->CritSect); 483 484 ULONG cbActual = 0; 485 APIRET orc = DosRead(pThis->hPipe, pvBuf, cbToRead, &cbActual); 486 if (orc == NO_ERROR) 487 { 488 if (cbActual || !cbToRead || !rtPipeOs2IsBroken(pThis)) 489 *pcbRead = cbActual; 490 else 491 rc = VERR_BROKEN_PIPE; 492 } 493 else if (orc == ERROR_NO_DATA) 494 { 495 *pcbRead = 0; 496 rc = VINF_TRY_AGAIN; 497 } 498 else 499 rc = RTErrConvertFromOS2(orc); 500 501 RTCritSectEnter(&pThis->CritSect); 502 if (rc == VERR_BROKEN_PIPE) 503 pThis->fBrokenPipe = true; 504 pThis->cUsers--; 505 } 506 else 507 rc = VERR_WRONG_ORDER; 508 RTCritSectLeave(&pThis->CritSect); 509 } 510 return rc; 65 511 } 66 512 … … 68 514 RTDECL(int) RTPipeReadBlocking(RTPIPE hPipe, void *pvBuf, size_t cbToRead, size_t *pcbRead) 69 515 { 70 return VERR_NOT_IMPLEMENTED; 516 RTPIPEINTERNAL *pThis = hPipe; 517 AssertPtrReturn(pThis, VERR_INVALID_HANDLE); 518 AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE); 519 AssertReturn(pThis->fRead, VERR_ACCESS_DENIED); 520 AssertPtr(pvBuf); 521 522 int rc = RTCritSectEnter(&pThis->CritSect); 523 if (RT_SUCCESS(rc)) 524 { 525 rc = rtPipeTryBlocking(pThis); 526 if (RT_SUCCESS(rc)) 527 { 528 RTCritSectLeave(&pThis->CritSect); 529 530 size_t cbTotalRead = 0; 531 while (cbToRead > 0) 532 { 533 ULONG cbActual = 0; 534 APIRET orc = DosRead(pThis->hPipe, pvBuf, cbToRead, &cbActual); 535 if (orc != NO_ERROR) 536 { 537 rc = RTErrConvertFromOS2(orc); 538 break; 539 } 540 if (!cbActual && rtPipeOs2IsBroken(pThis)) 541 { 542 rc = VERR_BROKEN_PIPE; 543 break; 544 } 545 546 /* advance */ 547 pvBuf = (char *)pvBuf + cbActual; 548 cbTotalRead += cbActual; 549 cbToRead -= cbActual; 550 } 551 552 if (pcbRead) 553 { 554 *pcbRead = cbTotalRead; 555 if ( RT_FAILURE(rc) 556 && cbTotalRead) 557 rc = VINF_SUCCESS; 558 } 559 560 RTCritSectEnter(&pThis->CritSect); 561 if (rc == VERR_BROKEN_PIPE) 562 pThis->fBrokenPipe = true; 563 pThis->cUsers--; 564 } 565 else 566 rc = VERR_WRONG_ORDER; 567 RTCritSectLeave(&pThis->CritSect); 568 } 569 return rc; 570 } 571 572 573 /** 574 * Gets the available write buffer size of the pipe. 575 * 576 * @returns Number of bytes, 1 on failure. 577 * @param pThis The pipe handle. 578 */ 579 static ULONG rtPipeOs2GetSpace(RTPIPEINTERNAL *pThis) 580 { 581 Assert(!pThis->fRead); 582 583 #if 0 /* Not sure which is more efficient, neither are really optimal, I fear. */ 584 /* 585 * Query via semaphore state. 586 * This will walk the list of active named pipes... 587 */ 588 /** @todo Check how hev and hpipe are associated, if complicated, use the 589 * alternative method below. */ 590 PIPESEMSTATE aStates[3]; RT_ZERO(aStates); 591 APIRET orc = DosQueryNPipeSemState((HSEM)pThis->hev, &aStates[0], sizeof(aStates)); 592 if (orc == NO_ERROR) 593 { 594 if (aStates[0].fStatus == NPSS_WSPACE) 595 return aStates[0].usAvail; 596 if (aStates[1].fStatus == NPSS_WSPACE) 597 return aStates[1].usAvail; 598 return 0; 599 } 600 AssertMsgFailed(("%d / %d\n", orc, aStates[0].fStatus)); 601 602 #else 603 /* 604 * Query via the pipe info. 605 * This will have to lookup and store the pipe name. 606 */ 607 union 608 { 609 PIPEINFO PipeInfo; 610 uint8_t abPadding[sizeof(PIPEINFO) + 127]; 611 } Buf; 612 APIRET orc = DosQueryNPipeInfo(pThis->hPipe, 1, &Buf, sizeof(Buf)); 613 if (orc == NO_ERROR) 614 return Buf.PipeInfo.cbOut; 615 AssertMsgFailed(("%d\n", orc)); 616 #endif 617 618 return 1; 71 619 } 72 620 … … 74 622 RTDECL(int) RTPipeWrite(RTPIPE hPipe, const void *pvBuf, size_t cbToWrite, size_t *pcbWritten) 75 623 { 76 return VERR_NOT_IMPLEMENTED; 624 RTPIPEINTERNAL *pThis = hPipe; 625 AssertPtrReturn(pThis, VERR_INVALID_HANDLE); 626 AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE); 627 AssertReturn(!pThis->fRead, VERR_ACCESS_DENIED); 628 AssertPtr(pcbWritten); 629 AssertPtr(pvBuf); 630 631 int rc = RTCritSectEnter(&pThis->CritSect); 632 if (RT_SUCCESS(rc)) 633 { 634 rc = rtPipeTryNonBlocking(pThis); 635 if (RT_SUCCESS(rc)) 636 { 637 if (cbToWrite > 0) 638 { 639 ULONG cbActual = 0; 640 APIRET orc = DosWrite(pThis->hPipe, pvBuf, cbToWrite, &cbActual); 641 if (orc == NO_ERROR && cbActual == 0) 642 { 643 /* Retry with the request adjusted to the available buffer space. */ 644 ULONG cbAvail = rtPipeOs2GetSpace(pThis); 645 orc = DosWrite(pThis->hPipe, pvBuf, RT_MIN(cbAvail, cbToWrite), &cbActual); 646 } 647 648 if (orc == NO_ERROR) 649 { 650 *pcbWritten = cbActual; 651 if (cbActual == 0) 652 rc = VINF_TRY_AGAIN; 653 } 654 else 655 { 656 rc = RTErrConvertFromOS2(orc); 657 if (rc == VERR_PIPE_NOT_CONNECTED) 658 rc = VERR_BROKEN_PIPE; 659 } 660 } 661 else 662 *pcbWritten = 0; 663 664 if (rc == VERR_BROKEN_PIPE) 665 pThis->fBrokenPipe = true; 666 pThis->cUsers--; 667 } 668 else 669 rc = VERR_WRONG_ORDER; 670 RTCritSectLeave(&pThis->CritSect); 671 } 672 return rc; 77 673 } 78 674 … … 80 676 RTDECL(int) RTPipeWriteBlocking(RTPIPE hPipe, const void *pvBuf, size_t cbToWrite, size_t *pcbWritten) 81 677 { 82 return VERR_NOT_IMPLEMENTED; 678 RTPIPEINTERNAL *pThis = hPipe; 679 AssertPtrReturn(pThis, VERR_INVALID_HANDLE); 680 AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE); 681 AssertReturn(!pThis->fRead, VERR_ACCESS_DENIED); 682 AssertPtr(pvBuf); 683 AssertPtrNull(pcbWritten); 684 685 int rc = RTCritSectEnter(&pThis->CritSect); 686 if (RT_SUCCESS(rc)) 687 { 688 rc = rtPipeTryBlocking(pThis); 689 if (RT_SUCCESS(rc)) 690 { 691 RTCritSectLeave(&pThis->CritSect); 692 693 size_t cbTotalWritten = 0; 694 while (cbToWrite > 0) 695 { 696 ULONG cbActual = 0; 697 APIRET orc = DosWrite(pThis->hPipe, pvBuf, cbToWrite, &cbActual); 698 if (orc != NO_ERROR) 699 { 700 rc = RTErrConvertFromOS2(orc); 701 if (rc == VERR_PIPE_NOT_CONNECTED) 702 rc = VERR_BROKEN_PIPE; 703 break; 704 } 705 pvBuf = (char const *)pvBuf + cbActual; 706 cbToWrite -= cbActual; 707 cbTotalWritten += cbActual; 708 } 709 710 if (pcbWritten) 711 { 712 *pcbWritten = cbTotalWritten; 713 if ( RT_FAILURE(rc) 714 && cbTotalWritten) 715 rc = VINF_SUCCESS; 716 } 717 718 RTCritSectEnter(&pThis->CritSect); 719 if (rc == VERR_BROKEN_PIPE) 720 pThis->fBrokenPipe = true; 721 pThis->cUsers--; 722 } 723 else 724 rc = VERR_WRONG_ORDER; 725 RTCritSectLeave(&pThis->CritSect); 726 } 727 return rc; 83 728 } 84 729 … … 86 731 RTDECL(int) RTPipeFlush(RTPIPE hPipe) 87 732 { 88 return VERR_NOT_IMPLEMENTED; 733 RTPIPEINTERNAL *pThis = hPipe; 734 AssertPtrReturn(pThis, VERR_INVALID_HANDLE); 735 AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE); 736 AssertReturn(!pThis->fRead, VERR_ACCESS_DENIED); 737 738 APIRET orc = DosResetBuffer(pThis->hPipe); 739 if (orc != NO_ERROR) 740 { 741 int rc = RTErrConvertFromOS2(orc); 742 if (rc == VERR_BROKEN_PIPE) 743 { 744 RTCritSectEnter(&pThis->CritSect); 745 pThis->fBrokenPipe = true; 746 RTCritSectLeave(&pThis->CritSect); 747 } 748 return rc; 749 } 750 return VINF_SUCCESS; 89 751 } 90 752 … … 92 754 RTDECL(int) RTPipeSelectOne(RTPIPE hPipe, RTMSINTERVAL cMillies) 93 755 { 94 return VERR_NOT_IMPLEMENTED; 756 RTPIPEINTERNAL *pThis = hPipe; 757 AssertPtrReturn(pThis, VERR_INVALID_HANDLE); 758 AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE); 759 760 uint64_t const StartMsTS = RTTimeMilliTS(); 761 762 int rc = RTCritSectEnter(&pThis->CritSect); 763 if (RT_FAILURE(rc)) 764 return rc; 765 rc = rtPipeOs2EnsureSem(pThis); 766 if (RT_SUCCESS(rc)) 767 { 768 for (unsigned iLoop = 0;; iLoop++) 769 { 770 /* 771 * Check the handle state. 772 */ 773 PIPESEMSTATE aStates[4]; RT_ZERO(aStates); 774 APIRET orc = DosQueryNPipeSemState((HSEM)pThis->hev, &aStates[0], sizeof(aStates)); 775 if (orc != NO_ERROR) 776 { 777 rc = RTErrConvertFromOS2(orc); 778 break; 779 } 780 int i = 0; 781 if (pThis->fRead) 782 while (aStates[i].fStatus == NPSS_WSPACE) 783 i++; 784 else 785 while (aStates[i].fStatus == NPSS_RDATA) 786 i++; 787 if (aStates[i].fStatus == NPSS_CLOSE) 788 break; 789 Assert(aStates[i].fStatus == NPSS_WSPACE || aStates[i].fStatus == NPSS_RDATA || aStates[i].fStatus == NPSS_EOI); 790 if ( aStates[i].fStatus != NPSS_EOI 791 && aStates[0].usAvail > 0) 792 break; 793 794 /* 795 * Check for timeout. 796 */ 797 ULONG cMsMaxWait = SEM_INDEFINITE_WAIT; 798 if (cMillies != RT_INDEFINITE_WAIT) 799 { 800 uint64_t cElapsed = RTTimeMilliTS() - StartMsTS; 801 if (cElapsed >= cMillies) 802 { 803 rc = VERR_TIMEOUT; 804 break; 805 } 806 cMsMaxWait = cMillies - (uint32_t)cElapsed; 807 } 808 809 /* 810 * Wait. 811 */ 812 RTCritSectLeave(&pThis->CritSect); 813 orc = DosWaitEventSem(pThis->hev, cMsMaxWait); 814 RTCritSectEnter(&pThis->CritSect); 815 if (orc != NO_ERROR && orc != ERROR_TIMEOUT && orc != ERROR_SEM_TIMEOUT ) 816 { 817 rc = RTErrConvertFromOS2(orc); 818 break; 819 } 820 } 821 822 if (rc == VERR_BROKEN_PIPE) 823 pThis->fBrokenPipe = true; 824 } 825 826 RTCritSectLeave(&pThis->CritSect); 827 return rc; 95 828 } 96 829 … … 98 831 RTDECL(int) RTPipeQueryReadable(RTPIPE hPipe, size_t *pcbReadable) 99 832 { 100 return VERR_NOT_IMPLEMENTED; 101 } 102 833 RTPIPEINTERNAL *pThis = hPipe; 834 AssertPtrReturn(pThis, VERR_INVALID_HANDLE); 835 AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE); 836 AssertReturn(pThis->fRead, VERR_PIPE_NOT_READ); 837 AssertPtrReturn(pcbReadable, VERR_INVALID_POINTER); 838 839 int rc = RTCritSectEnter(&pThis->CritSect); 840 if (RT_FAILURE(rc)) 841 return rc; 842 843 ULONG cbActual = 0; 844 ULONG ulState = 0; 845 AVAILDATA Avail = { 0, 0 }; 846 APIRET orc = DosPeekNPipe(pThis->hPipe, NULL, 0, &cbActual, &Avail, &ulState); 847 if (orc == NO_ERROR) 848 { 849 if (Avail.cbpipe > 0 || ulState == NP_STATE_CONNECTED) 850 *pcbReadable = Avail.cbpipe; 851 else 852 rc = VERR_PIPE_NOT_CONNECTED; /*??*/ 853 } 854 else 855 rc = RTErrConvertFromOS2(orc); 856 857 RTCritSectLeave(&pThis->CritSect); 858 return rc; 859 } 860 861 862 #if 0 863 /** 864 * Internal RTPollSetAdd helper that returns the handle that should be added to 865 * the pollset. 866 * 867 * @returns Valid handle on success, INVALID_HANDLE_VALUE on failure. 868 * @param hPipe The pipe handle. 869 * @param fEvents The events we're polling for. 870 * @param ph where to put the primary handle. 871 */ 872 int rtPipePollGetHandle(RTPIPE hPipe, uint32_t fEvents, PHANDLE ph) 873 { 874 RTPIPEINTERNAL *pThis = hPipe; 875 AssertPtrReturn(pThis, VERR_INVALID_HANDLE); 876 AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE); 877 878 AssertReturn(!(fEvents & RTPOLL_EVT_READ) || pThis->fRead, VERR_INVALID_PARAMETER); 879 AssertReturn(!(fEvents & RTPOLL_EVT_WRITE) || !pThis->fRead, VERR_INVALID_PARAMETER); 880 881 /* Later: Try register an event handle with the pipe like on OS/2, there is 882 a file control for doing this obviously intended for the OS/2 subsys. 883 The question is whether this still exists on Vista and W7. */ 884 *ph = pThis->Overlapped.hEvent; 885 return VINF_SUCCESS; 886 } 887 888 889 /** 890 * Checks for pending events. 891 * 892 * @returns Event mask or 0. 893 * @param pThis The pipe handle. 894 * @param fEvents The desired events. 895 */ 896 static uint32_t rtPipePollCheck(RTPIPEINTERNAL *pThis, uint32_t fEvents) 897 { 898 uint32_t fRetEvents = 0; 899 if (pThis->fBrokenPipe) 900 fRetEvents |= RTPOLL_EVT_ERROR; 901 else if (pThis->fRead) 902 { 903 if (!pThis->fIOPending) 904 { 905 DWORD cbAvailable; 906 if (PeekNamedPipe(pThis->hPipe, NULL, 0, NULL, &cbAvailable, NULL)) 907 { 908 if ( (fEvents & RTPOLL_EVT_READ) 909 && cbAvailable > 0) 910 fRetEvents |= RTPOLL_EVT_READ; 911 } 912 else 913 { 914 if (GetLastError() == ERROR_BROKEN_PIPE) 915 pThis->fBrokenPipe = true; 916 fRetEvents |= RTPOLL_EVT_ERROR; 917 } 918 } 919 } 920 else 921 { 922 if (pThis->fIOPending) 923 { 924 rtPipeWriteCheckCompletion(pThis); 925 if (pThis->fBrokenPipe) 926 fRetEvents |= RTPOLL_EVT_ERROR; 927 } 928 if ( !pThis->fIOPending 929 && !fRetEvents) 930 { 931 FILE_PIPE_LOCAL_INFORMATION Info; 932 if (rtPipeQueryInfo(pThis, &Info)) 933 { 934 /* Check for broken pipe. */ 935 if (Info.NamedPipeState == FILE_PIPE_CLOSING_STATE) 936 { 937 fRetEvents = RTPOLL_EVT_ERROR; 938 pThis->fBrokenPipe = true; 939 } 940 941 /* Check if there is available buffer space. */ 942 if ( !fRetEvents 943 && (fEvents & RTPOLL_EVT_WRITE) 944 && ( Info.WriteQuotaAvailable > 0 945 || Info.OutboundQuota == 0) 946 ) 947 fRetEvents |= RTPOLL_EVT_WRITE; 948 } 949 else if (fEvents & RTPOLL_EVT_WRITE) 950 fRetEvents |= RTPOLL_EVT_WRITE; 951 } 952 } 953 954 return fRetEvents; 955 } 956 957 958 /** 959 * Internal RTPoll helper that polls the pipe handle and, if @a fNoWait is 960 * clear, starts whatever actions we've got running during the poll call. 961 * 962 * @returns 0 if no pending events, actions initiated if @a fNoWait is clear. 963 * Event mask (in @a fEvents) and no actions if the handle is ready 964 * already. 965 * UINT32_MAX (asserted) if the pipe handle is busy in I/O or a 966 * different poll set. 967 * 968 * @param hPipe The pipe handle. 969 * @param hPollSet The poll set handle (for access checks). 970 * @param fEvents The events we're polling for. 971 * @param fFinalEntry Set if this is the final entry for this handle 972 * in this poll set. This can be used for dealing 973 * with duplicate entries. 974 * @param fNoWait Set if it's a zero-wait poll call. Clear if 975 * we'll wait for an event to occur. 976 */ 977 uint32_t rtPipePollStart(RTPIPE hPipe, RTPOLLSET hPollSet, uint32_t fEvents, bool fFinalEntry, bool fNoWait) 978 { 979 /** @todo All this polling code could be optimized to make fewer system 980 * calls; like for instance the ResetEvent calls. */ 981 RTPIPEINTERNAL *pThis = hPipe; 982 AssertPtrReturn(pThis, UINT32_MAX); 983 AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, UINT32_MAX); 984 985 int rc = RTCritSectEnter(&pThis->CritSect); 986 AssertRCReturn(rc, UINT32_MAX); 987 988 /* Check that this is the only current use of this pipe. */ 989 uint32_t fRetEvents; 990 if ( pThis->cUsers == 0 991 || pThis->hPollSet == hPollSet) 992 { 993 /* Check what the current events are. */ 994 fRetEvents = rtPipePollCheck(pThis, fEvents); 995 if ( !fRetEvents 996 && !fNoWait) 997 { 998 /* Make sure the event semaphore has been reset. */ 999 if (!pThis->fIOPending) 1000 { 1001 rc = ResetEvent(pThis->Overlapped.hEvent); 1002 Assert(rc == TRUE); 1003 } 1004 1005 /* Kick off the zero byte read thing if applicable. */ 1006 if ( !pThis->fIOPending 1007 && pThis->fRead 1008 && (fEvents & RTPOLL_EVT_READ) 1009 ) 1010 { 1011 DWORD cbRead = 0; 1012 if (ReadFile(pThis->hPipe, pThis->abBuf, 0, &cbRead, &pThis->Overlapped)) 1013 fRetEvents = rtPipePollCheck(pThis, fEvents); 1014 else if (GetLastError() == ERROR_IO_PENDING) 1015 { 1016 pThis->fIOPending = true; 1017 pThis->fZeroByteRead = true; 1018 } 1019 else 1020 fRetEvents = RTPOLL_EVT_ERROR; 1021 } 1022 1023 /* If we're still set for the waiting, record the poll set and 1024 mark the pipe used. */ 1025 if (!fRetEvents) 1026 { 1027 pThis->cUsers++; 1028 pThis->hPollSet = hPollSet; 1029 } 1030 } 1031 } 1032 else 1033 { 1034 AssertFailed(); 1035 fRetEvents = UINT32_MAX; 1036 } 1037 1038 RTCritSectLeave(&pThis->CritSect); 1039 return fRetEvents; 1040 } 1041 1042 1043 /** 1044 * Called after a WaitForMultipleObjects returned in order to check for pending 1045 * events and stop whatever actions that rtPipePollStart() initiated. 1046 * 1047 * @returns Event mask or 0. 1048 * 1049 * @param hPipe The pipe handle. 1050 * @param fEvents The events we're polling for. 1051 * @param fFinalEntry Set if this is the final entry for this handle 1052 * in this poll set. This can be used for dealing 1053 * with duplicate entries. Only keep in mind that 1054 * this method is called in reverse order, so the 1055 * first call will have this set (when the entire 1056 * set was processed). 1057 * @param fHarvestEvents Set if we should check for pending events. 1058 */ 1059 uint32_t rtPipePollDone(RTPIPE hPipe, uint32_t fEvents, bool fFinalEntry, bool fHarvestEvents) 1060 { 1061 RTPIPEINTERNAL *pThis = hPipe; 1062 AssertPtrReturn(pThis, 0); 1063 AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, 0); 1064 1065 int rc = RTCritSectEnter(&pThis->CritSect); 1066 AssertRCReturn(rc, 0); 1067 1068 Assert(pThis->cUsers > 0); 1069 1070 1071 /* Cancel the zero byte read. */ 1072 uint32_t fRetEvents = 0; 1073 if (pThis->fZeroByteRead) 1074 { 1075 CancelIo(pThis->hPipe); 1076 DWORD cbRead = 0; 1077 if ( !GetOverlappedResult(pThis->hPipe, &pThis->Overlapped, &cbRead, TRUE /*fWait*/) 1078 && GetLastError() != ERROR_OPERATION_ABORTED) 1079 fRetEvents = RTPOLL_EVT_ERROR; 1080 1081 pThis->fIOPending = false; 1082 pThis->fZeroByteRead = false; 1083 } 1084 1085 /* harvest events. */ 1086 fRetEvents |= rtPipePollCheck(pThis, fEvents); 1087 1088 /* update counters. */ 1089 pThis->cUsers--; 1090 if (!pThis->cUsers) 1091 pThis->hPollSet = NIL_RTPOLLSET; 1092 1093 RTCritSectLeave(&pThis->CritSect); 1094 return fRetEvents; 1095 } 1096 #endif
Note:
See TracChangeset
for help on using the changeset viewer.