VirtualBox

source: vbox/trunk/src/VBox/Runtime/r3/posix/pipe-posix.cpp@ 27667

Last change on this file since 27667 was 27614, checked in by vboxsync, 15 years ago

iprt: Implemented RTPipeFromNative on posix.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
File size: 16.6 KB
Line 
1/* $Id: pipe-posix.cpp 27614 2010-03-23 01:29:07Z vboxsync $ */
2/** @file
3 * IPRT - Anonymous Pipes, POSIX Implementation.
4 */
5
6/*
7 * Copyright (C) 2010 Sun Microsystems, Inc.
8 *
9 * This file is part of VirtualBox Open Source Edition (OSE), as
10 * available from http://www.virtualbox.org. This file is free software;
11 * you can redistribute it and/or modify it under the terms of the GNU
12 * General Public License (GPL) as published by the Free Software
13 * Foundation, in version 2 as it comes in the "COPYING" file of the
14 * VirtualBox OSE distribution. VirtualBox OSE is distributed in the
15 * hope that it will be useful, but WITHOUT ANY WARRANTY of any kind.
16 *
17 * The contents of this file may alternatively be used under the terms
18 * of the Common Development and Distribution License Version 1.0
19 * (CDDL) only, as it comes in the "COPYING.CDDL" file of the
20 * VirtualBox OSE distribution, in which case the provisions of the
21 * CDDL are applicable instead of those of the GPL.
22 *
23 * You may elect to license modified versions of this file under the
24 * terms and conditions of either the GPL or the CDDL or both.
25 *
26 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa
27 * Clara, CA 95054 USA or visit http://www.sun.com if you need
28 * additional information or have any questions.
29 */
30
31
32/*******************************************************************************
33* Header Files *
34*******************************************************************************/
35#include <iprt/pipe.h>
36#include "internal/iprt.h"
37
38#include <iprt/asm.h>
39#include <iprt/assert.h>
40#include <iprt/err.h>
41#include <iprt/mem.h>
42#include <iprt/string.h>
43#include <iprt/thread.h>
44#include "internal/magics.h"
45
46#include <errno.h>
47#include <fcntl.h>
48#include <limits.h>
49#include <unistd.h>
50#include <sys/poll.h>
51#include <sys/stat.h>
52#include <signal.h>
53
54
55/*******************************************************************************
56* Structures and Typedefs *
57*******************************************************************************/
58typedef struct RTPIPEINTERNAL
59{
60 /** Magic value (RTPIPE_MAGIC). */
61 uint32_t u32Magic;
62 /** The file descriptor. */
63 int fd;
64 /** Set if this is the read end, clear if it's the write end. */
65 bool fRead;
66 /** Atomically operated state variable.
67 *
68 * - Bits 0 thru 29 - Users of the new mode.
69 * - Bit 30 - The pipe mode, set indicates blocking.
70 * - Bit 31 - Set when we're switching the mode.
71 */
72 uint32_t volatile u32State;
73} RTPIPEINTERNAL;
74
75
76/*******************************************************************************
77* Defined Constants And Macros *
78*******************************************************************************/
79/** @name RTPIPEINTERNAL::u32State defines
80 * @{ */
81#define RTPIPE_POSIX_BLOCKING UINT32_C(0x40000000)
82#define RTPIPE_POSIX_SWITCHING UINT32_C(0x80000000)
83#define RTPIPE_POSIX_SWITCHING_BIT 31
84#define RTPIPE_POSIX_USERS_MASK UINT32_C(0x3fffffff)
85/** @} */
86
87
88RTDECL(int) RTPipeCreate(PRTPIPE phPipeRead, PRTPIPE phPipeWrite, uint32_t fFlags)
89{
90 AssertPtrReturn(phPipeRead, VERR_INVALID_POINTER);
91 AssertPtrReturn(phPipeWrite, VERR_INVALID_POINTER);
92 AssertReturn(!(fFlags & ~RTPIPE_C_VALID_MASK), VERR_INVALID_PARAMETER);
93
94 /*
95 * Create the pipe and set the close-on-exec flag if requested.
96 */
97 int aFds[2] = {-1, -1};
98 if (pipe(aFds))
99 return RTErrConvertFromErrno(errno);
100
101 int rc = VINF_SUCCESS;
102 if (!(fFlags & RTPIPE_C_INHERIT_READ))
103 {
104 if (fcntl(aFds[0], F_SETFD, FD_CLOEXEC))
105 rc = RTErrConvertFromErrno(errno);
106 }
107
108 if (!(fFlags & RTPIPE_C_INHERIT_WRITE))
109 {
110 if (fcntl(aFds[1], F_SETFD, FD_CLOEXEC))
111 rc = RTErrConvertFromErrno(errno);
112 }
113
114 if (RT_SUCCESS(rc))
115 {
116 /*
117 * Create the two handles.
118 */
119 RTPIPEINTERNAL *pThisR = (RTPIPEINTERNAL *)RTMemAlloc(sizeof(RTPIPEINTERNAL));
120 if (pThisR)
121 {
122 RTPIPEINTERNAL *pThisW = (RTPIPEINTERNAL *)RTMemAlloc(sizeof(RTPIPEINTERNAL));
123 if (pThisW)
124 {
125 pThisR->u32Magic = RTPIPE_MAGIC;
126 pThisW->u32Magic = RTPIPE_MAGIC;
127 pThisR->fd = aFds[0];
128 pThisW->fd = aFds[1];
129 pThisR->fRead = true;
130 pThisW->fRead = false;
131 pThisR->u32State = RTPIPE_POSIX_BLOCKING;
132 pThisW->u32State = RTPIPE_POSIX_BLOCKING;
133
134 *phPipeRead = pThisR;
135 *phPipeWrite = pThisW;
136
137 /*
138 * Before we leave, make sure to shut up SIGPIPE.
139 */
140 signal(SIGPIPE, SIG_IGN);
141 return VINF_SUCCESS;
142 }
143
144 RTMemFree(pThisR);
145 rc = VERR_NO_MEMORY;
146 }
147 else
148 rc = VERR_NO_MEMORY;
149 }
150
151 close(aFds[0]);
152 close(aFds[1]);
153 return rc;
154}
155
156
157RTDECL(int) RTPipeClose(RTPIPE hPipe)
158{
159 RTPIPEINTERNAL *pThis = hPipe;
160 if (pThis == NIL_RTPIPE)
161 return VINF_SUCCESS;
162 AssertPtrReturn(pThis, VERR_INVALID_PARAMETER);
163 AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE);
164
165 /*
166 * Do the cleanup.
167 */
168 AssertReturn(ASMAtomicCmpXchgU32(&pThis->u32Magic, ~RTPIPE_MAGIC, RTPIPE_MAGIC), VERR_INVALID_HANDLE);
169
170 int fd = pThis->fd;
171 pThis->fd = -1;
172 close(fd);
173
174 if (ASMAtomicReadU32(&pThis->u32State) & RTPIPE_POSIX_USERS_MASK)
175 {
176 AssertFailed();
177 RTThreadSleep(1);
178 }
179
180 RTMemFree(pThis);
181
182 return VINF_SUCCESS;
183}
184
185RTDECL(int) RTPipeFromNative(PRTPIPE phPipe, RTHCINTPTR hNativePipe, uint32_t fFlags)
186{
187 AssertPtrReturn(phPipe, VERR_INVALID_POINTER);
188 AssertReturn(!(fFlags & ~RTPIPE_N_VALID_MASK), VERR_INVALID_PARAMETER);
189 AssertReturn(!!(fFlags & RTPIPE_N_READ) != !!(fFlags & RTPIPE_N_WRITE), VERR_INVALID_PARAMETER);
190
191 /*
192 * Get and validate the pipe handle info.
193 */
194 int hNative = (int)hNativePipe;
195 struct stat st;
196 AssertReturn(fstat(hNative, &st) == 0, RTErrConvertFromErrno(errno));
197 AssertMsgReturn(S_ISFIFO(st.st_mode) || S_ISSOCK(st.st_mode), ("%#x (%o)\n", st.st_mode, st.st_mode), VERR_INVALID_HANDLE);
198
199 int fFd = fcntl(hNative, F_GETFL, 0);
200 AssertReturn(fFd != -1, VERR_INVALID_HANDLE);
201 AssertMsgReturn((fFd & O_ACCMODE) == (fFlags & RTPIPE_N_READ ? O_RDONLY : O_WRONLY), ("%#x\n", fFd), VERR_INVALID_HANDLE);
202
203 /*
204 * Create the handle.
205 */
206 RTPIPEINTERNAL *pThis = (RTPIPEINTERNAL *)RTMemAlloc(sizeof(RTPIPEINTERNAL));
207 if (!pThis)
208 return VERR_NO_MEMORY;
209
210 pThis->u32Magic = RTPIPE_MAGIC;
211 pThis->fd = hNative;
212 pThis->fRead = !!(fFlags & RTPIPE_N_READ);
213 pThis->u32State = fFd & O_NONBLOCK ? 0 : RTPIPE_POSIX_BLOCKING;
214
215 /*
216 * Fix up inheritability and shut up SIGPIPE and we're done.
217 */
218 if (fcntl(hNative, F_SETFD, fFlags & RTPIPE_N_INHERIT ? 0 : FD_CLOEXEC) == 0)
219 {
220 signal(SIGPIPE, SIG_IGN);
221 *phPipe = pThis;
222 return VINF_SUCCESS;
223 }
224
225 int rc = RTErrConvertFromErrno(errno);
226 RTMemFree(pThis);
227 return rc;
228}
229
230
231RTDECL(RTHCINTPTR) RTPipeToNative(RTPIPE hPipe)
232{
233 RTPIPEINTERNAL *pThis = hPipe;
234 AssertPtrReturn(pThis, -1);
235 AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, -1);
236
237 return pThis->fd;
238}
239
240
241/**
242 * Prepare blocking mode.
243 *
244 * @returns VINF_SUCCESS
245 * @retval VERR_WRONG_ORDER
246 * @retval VERR_INTERNAL_ERROR_4
247 *
248 * @param pThis The pipe handle.
249 */
250static int rtPipeTryBlocking(RTPIPEINTERNAL *pThis)
251{
252 /*
253 * Update the state.
254 */
255 for (;;)
256 {
257 uint32_t u32State = ASMAtomicReadU32(&pThis->u32State);
258 uint32_t const u32StateOld = u32State;
259 uint32_t const cUsers = (u32State & RTPIPE_POSIX_USERS_MASK);
260
261 if (u32State & RTPIPE_POSIX_BLOCKING)
262 {
263 AssertReturn(cUsers < RTPIPE_POSIX_USERS_MASK / 2, VERR_INTERNAL_ERROR_4);
264 u32State &= ~RTPIPE_POSIX_USERS_MASK;
265 u32State |= cUsers + 1;
266 if (ASMAtomicCmpXchgU32(&pThis->u32State, u32State, u32StateOld))
267 {
268 if (u32State & RTPIPE_POSIX_SWITCHING)
269 break;
270 return VINF_SUCCESS;
271 }
272 }
273 else if (cUsers == 0)
274 {
275 u32State = 1 | RTPIPE_POSIX_SWITCHING | RTPIPE_POSIX_BLOCKING;
276 if (ASMAtomicCmpXchgU32(&pThis->u32State, u32State, u32StateOld))
277 break;
278 }
279 else
280 return VERR_WRONG_ORDER;
281 ASMNopPause();
282 }
283
284 /*
285 * Do the switching.
286 */
287 int fFlags = fcntl(pThis->fd, F_GETFL, 0);
288 if (fFlags != -1)
289 {
290 if ( !(fFlags & O_NONBLOCK)
291 || fcntl(pThis->fd, F_SETFL, fFlags & ~O_NONBLOCK) != -1)
292 {
293 ASMAtomicBitClear(&pThis->u32State, RTPIPE_POSIX_SWITCHING_BIT);
294 return VINF_SUCCESS;
295 }
296 }
297
298 ASMAtomicDecU32(&pThis->u32State);
299 return RTErrConvertFromErrno(errno);
300}
301
302
303/**
304 * Prepare non-blocking mode.
305 *
306 * @returns VINF_SUCCESS
307 * @retval VERR_WRONG_ORDER
308 * @retval VERR_INTERNAL_ERROR_4
309 *
310 * @param pThis The pipe handle.
311 */
312static int rtPipeTryNonBlocking(RTPIPEINTERNAL *pThis)
313{
314 /*
315 * Update the state.
316 */
317 for (;;)
318 {
319 uint32_t u32State = ASMAtomicReadU32(&pThis->u32State);
320 uint32_t const u32StateOld = u32State;
321 uint32_t const cUsers = (u32State & RTPIPE_POSIX_USERS_MASK);
322
323 if (!(u32State & RTPIPE_POSIX_BLOCKING))
324 {
325 AssertReturn(cUsers < RTPIPE_POSIX_USERS_MASK / 2, VERR_INTERNAL_ERROR_4);
326 u32State &= ~RTPIPE_POSIX_USERS_MASK;
327 u32State |= cUsers + 1;
328 if (ASMAtomicCmpXchgU32(&pThis->u32State, u32State, u32StateOld))
329 {
330 if (u32State & RTPIPE_POSIX_SWITCHING)
331 break;
332 return VINF_SUCCESS;
333 }
334 }
335 else if (cUsers == 0)
336 {
337 u32State = 1 | RTPIPE_POSIX_SWITCHING;
338 if (ASMAtomicCmpXchgU32(&pThis->u32State, u32State, u32StateOld))
339 break;
340 }
341 else
342 return VERR_WRONG_ORDER;
343 ASMNopPause();
344 }
345
346 /*
347 * Do the switching.
348 */
349 int fFlags = fcntl(pThis->fd, F_GETFL, 0);
350 if (fFlags != -1)
351 {
352 if ( (fFlags & O_NONBLOCK)
353 || fcntl(pThis->fd, F_SETFL, fFlags | O_NONBLOCK) != -1)
354 {
355 ASMAtomicBitClear(&pThis->u32State, RTPIPE_POSIX_SWITCHING_BIT);
356 return VINF_SUCCESS;
357 }
358 }
359
360 ASMAtomicDecU32(&pThis->u32State);
361 return RTErrConvertFromErrno(errno);
362}
363
364
365/**
366 * Checks if the read pipe has a HUP condition.
367 *
368 * @returns true if HUP, false if no.
369 * @param pThis The pipe handle (read).
370 */
371static bool rtPipePosixHasHup(RTPIPEINTERNAL *pThis)
372{
373 Assert(pThis->fRead);
374
375 struct pollfd PollFd;
376 RT_ZERO(PollFd);
377 PollFd.fd = pThis->fd;
378 PollFd.events = POLLHUP;
379 return poll(&PollFd, 1, 0) >= 1
380 && (PollFd.revents & POLLHUP);
381}
382
383
384RTDECL(int) RTPipeRead(RTPIPE hPipe, void *pvBuf, size_t cbToRead, size_t *pcbRead)
385{
386 RTPIPEINTERNAL *pThis = hPipe;
387 AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
388 AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE);
389 AssertReturn(pThis->fRead, VERR_ACCESS_DENIED);
390 AssertPtr(pcbRead);
391 AssertPtr(pvBuf);
392
393 int rc = rtPipeTryNonBlocking(pThis);
394 if (RT_SUCCESS(rc))
395 {
396 ssize_t cbRead = read(pThis->fd, pvBuf, RT_MIN(cbToRead, SSIZE_MAX));
397 if (cbRead >= 0)
398 {
399 if (cbRead || !cbToRead || !rtPipePosixHasHup(pThis))
400 *pcbRead = cbRead;
401 else
402 rc = VERR_BROKEN_PIPE;
403 }
404 else if (errno == EAGAIN)
405 {
406 *pcbRead = 0;
407 rc = VINF_TRY_AGAIN;
408 }
409 else
410 rc = RTErrConvertFromErrno(errno);
411
412 ASMAtomicDecU32(&pThis->u32State);
413 }
414 return rc;
415}
416
417
418RTDECL(int) RTPipeReadBlocking(RTPIPE hPipe, void *pvBuf, size_t cbToRead, size_t *pcbRead)
419{
420 RTPIPEINTERNAL *pThis = hPipe;
421 AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
422 AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE);
423 AssertReturn(pThis->fRead, VERR_ACCESS_DENIED);
424 AssertPtr(pvBuf);
425
426 int rc = rtPipeTryBlocking(pThis);
427 if (RT_SUCCESS(rc))
428 {
429 size_t cbTotalRead = 0;;
430 while (cbToRead > 0)
431 {
432 ssize_t cbRead = read(pThis->fd, pvBuf, RT_MIN(cbToRead, SSIZE_MAX));
433 if (cbRead < 0)
434 {
435 rc = RTErrConvertFromErrno(errno);
436 break;
437 }
438 if (!cbRead && cbToRead > 0 && rtPipePosixHasHup(pThis))
439 {
440 rc = VERR_BROKEN_PIPE;
441 break;
442 }
443
444 /* advance */
445 pvBuf = (char *)pvBuf + cbRead;
446 cbTotalRead += cbRead;
447 cbToRead -= cbRead;
448 }
449
450 if (pcbRead)
451 {
452 *pcbRead = cbTotalRead;
453 if ( RT_FAILURE(rc)
454 && cbTotalRead
455 && rc != VERR_INVALID_POINTER)
456 rc = VINF_SUCCESS;
457 }
458
459 ASMAtomicDecU32(&pThis->u32State);
460 }
461 return rc;
462}
463
464
465RTDECL(int) RTPipeWrite(RTPIPE hPipe, const void *pvBuf, size_t cbToWrite, size_t *pcbWritten)
466{
467 RTPIPEINTERNAL *pThis = hPipe;
468 AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
469 AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE);
470 AssertReturn(!pThis->fRead, VERR_ACCESS_DENIED);
471 AssertPtr(pcbWritten);
472 AssertPtr(pvBuf);
473
474 int rc = rtPipeTryNonBlocking(pThis);
475 if (RT_SUCCESS(rc))
476 {
477 if (cbToWrite)
478 {
479 ssize_t cbWritten = write(pThis->fd, pvBuf, RT_MIN(cbToWrite, SSIZE_MAX));
480 if (cbWritten >= 0)
481 *pcbWritten = cbWritten;
482 else if (errno == EAGAIN)
483 {
484 *pcbWritten = 0;
485 rc = VINF_TRY_AGAIN;
486 }
487 else
488 rc = RTErrConvertFromErrno(errno);
489 }
490 else
491 *pcbWritten = 0;
492
493 ASMAtomicDecU32(&pThis->u32State);
494 }
495 return rc;
496}
497
498
499RTDECL(int) RTPipeWriteBlocking(RTPIPE hPipe, const void *pvBuf, size_t cbToWrite, size_t *pcbWritten)
500{
501 RTPIPEINTERNAL *pThis = hPipe;
502 AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
503 AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE);
504 AssertReturn(!pThis->fRead, VERR_ACCESS_DENIED);
505 AssertPtr(pvBuf);
506 AssertPtrNull(pcbWritten);
507
508 int rc = rtPipeTryBlocking(pThis);
509 if (RT_SUCCESS(rc))
510 {
511 size_t cbTotalWritten = 0;
512 while (cbToWrite > 0)
513 {
514 ssize_t cbWritten = write(pThis->fd, pvBuf, RT_MIN(cbToWrite, SSIZE_MAX));
515 if (cbWritten < 0)
516 {
517 rc = RTErrConvertFromErrno(errno);
518 break;
519 }
520
521 /* advance */
522 pvBuf = (char const *)pvBuf + cbWritten;
523 cbTotalWritten += cbWritten;
524 cbToWrite -= cbWritten;
525 }
526
527 if (pcbWritten)
528 {
529 *pcbWritten = cbTotalWritten;
530 if ( RT_FAILURE(rc)
531 && cbTotalWritten
532 && rc != VERR_INVALID_POINTER)
533 rc = VINF_SUCCESS;
534 }
535
536 ASMAtomicDecU32(&pThis->u32State);
537 }
538 return rc;
539}
540
541
542RTDECL(int) RTPipeFlush(RTPIPE hPipe)
543{
544 RTPIPEINTERNAL *pThis = hPipe;
545 AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
546 AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE);
547 AssertReturn(!pThis->fRead, VERR_ACCESS_DENIED);
548
549 if (fsync(pThis->fd))
550 {
551 if (errno == EINVAL || errno == ENOTSUP)
552 return VERR_NOT_SUPPORTED;
553 return RTErrConvertFromErrno(errno);
554 }
555 return VINF_SUCCESS;
556}
557
558
559RTDECL(int) RTPipeSelectOne(RTPIPE hPipe, RTMSINTERVAL cMillies)
560{
561 RTPIPEINTERNAL *pThis = hPipe;
562 AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
563 AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE);
564
565 struct pollfd PollFd;
566 RT_ZERO(PollFd);
567 PollFd.fd = pThis->fd;
568 PollFd.events = POLLHUP | POLLERR;
569 if (pThis->fRead)
570 PollFd.events |= POLLIN | POLLPRI;
571 else
572 PollFd.events |= POLLOUT;
573
574 int timeout;
575 if ( cMillies == RT_INDEFINITE_WAIT
576 || cMillies >= INT_MAX /* lazy bird */)
577 timeout = -1;
578 else
579 timeout = cMillies;
580
581 int rc = poll(&PollFd, 1, 0);
582 if (rc == -1)
583 return RTErrConvertFromErrno(errno);
584 return rc > 0 ? VINF_SUCCESS : VERR_TIMEOUT;
585}
586
Note: See TracBrowser for help on using the repository browser.

© 2025 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette