VirtualBox

source: vbox/trunk/src/VBox/Runtime/r3/posix/localipc-posix.cpp@ 82878

Last change on this file since 82878 was 77258, checked in by vboxsync, 6 years ago

localipc-posix.cpp: Memory leak fix in error handling code path

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
File size: 35.1 KB
Line 
1/* $Id: localipc-posix.cpp 77258 2019-02-11 13:15:32Z vboxsync $ */
2/** @file
3 * IPRT - Local IPC Server & Client, Posix.
4 */
5
6/*
7 * Copyright (C) 2006-2019 Oracle Corporation
8 *
9 * This file is part of VirtualBox Open Source Edition (OSE), as
10 * available from http://www.virtualbox.org. This file is free software;
11 * you can redistribute it and/or modify it under the terms of the GNU
12 * General Public License (GPL) as published by the Free Software
13 * Foundation, in version 2 as it comes in the "COPYING" file of the
14 * VirtualBox OSE distribution. VirtualBox OSE is distributed in the
15 * hope that it will be useful, but WITHOUT ANY WARRANTY of any kind.
16 *
17 * The contents of this file may alternatively be used under the terms
18 * of the Common Development and Distribution License Version 1.0
19 * (CDDL) only, as it comes in the "COPYING.CDDL" file of the
20 * VirtualBox OSE distribution, in which case the provisions of the
21 * CDDL are applicable instead of those of the GPL.
22 *
23 * You may elect to license modified versions of this file under the
24 * terms and conditions of either the GPL or the CDDL or both.
25 */
26
27
28/*********************************************************************************************************************************
29* Header Files *
30*********************************************************************************************************************************/
31#define LOG_GROUP RTLOGGROUP_LOCALIPC
32#include "internal/iprt.h"
33#include <iprt/localipc.h>
34
35#include <iprt/asm.h>
36#include <iprt/assert.h>
37#include <iprt/ctype.h>
38#include <iprt/critsect.h>
39#include <iprt/err.h>
40#include <iprt/mem.h>
41#include <iprt/log.h>
42#include <iprt/poll.h>
43#include <iprt/socket.h>
44#include <iprt/string.h>
45#include <iprt/time.h>
46
47#include <sys/types.h>
48#include <sys/socket.h>
49#include <sys/un.h>
50#ifndef RT_OS_OS2
51# include <sys/poll.h>
52#endif
53#include <errno.h>
54#include <fcntl.h>
55#include <signal.h>
56#include <unistd.h>
57
58#include "internal/magics.h"
59#include "internal/path.h"
60#include "internal/socket.h"
61
62
63/*********************************************************************************************************************************
64* Structures and Typedefs *
65*********************************************************************************************************************************/
66/**
67 * Local IPC service instance, POSIX.
68 */
69typedef struct RTLOCALIPCSERVERINT
70{
71 /** The magic (RTLOCALIPCSERVER_MAGIC). */
72 uint32_t u32Magic;
73 /** The creation flags. */
74 uint32_t fFlags;
75 /** Critical section protecting the structure. */
76 RTCRITSECT CritSect;
77 /** The number of references to the instance. */
78 uint32_t volatile cRefs;
79 /** Indicates that there is a pending cancel request. */
80 bool volatile fCancelled;
81 /** The server socket. */
82 RTSOCKET hSocket;
83 /** Thread currently listening for clients. */
84 RTTHREAD hListenThread;
85 /** The name we bound the server to (native charset encoding). */
86 struct sockaddr_un Name;
87} RTLOCALIPCSERVERINT;
88/** Pointer to a local IPC server instance (POSIX). */
89typedef RTLOCALIPCSERVERINT *PRTLOCALIPCSERVERINT;
90
91
92/**
93 * Local IPC session instance, POSIX.
94 */
95typedef struct RTLOCALIPCSESSIONINT
96{
97 /** The magic (RTLOCALIPCSESSION_MAGIC). */
98 uint32_t u32Magic;
99 /** Critical section protecting the structure. */
100 RTCRITSECT CritSect;
101 /** The number of references to the instance. */
102 uint32_t volatile cRefs;
103 /** Indicates that there is a pending cancel request. */
104 bool volatile fCancelled;
105 /** Set if this is the server side, clear if the client. */
106 bool fServerSide;
107 /** The client socket. */
108 RTSOCKET hSocket;
109 /** Thread currently doing read related activites. */
110 RTTHREAD hWriteThread;
111 /** Thread currently doing write related activies. */
112 RTTHREAD hReadThread;
113} RTLOCALIPCSESSIONINT;
114/** Pointer to a local IPC session instance (Windows). */
115typedef RTLOCALIPCSESSIONINT *PRTLOCALIPCSESSIONINT;
116
117
118/** Local IPC name prefix for portable names. */
119#define RTLOCALIPC_POSIX_NAME_PREFIX "/tmp/.iprt-localipc-"
120
121
122/**
123 * Validates the user specified name.
124 *
125 * @returns IPRT status code.
126 * @param pszName The name to validate.
127 * @param fNative Whether it's a native name or a portable name.
128 */
129static int rtLocalIpcPosixValidateName(const char *pszName, bool fNative)
130{
131 AssertPtrReturn(pszName, VERR_INVALID_POINTER);
132 AssertReturn(*pszName, VERR_INVALID_NAME);
133
134 if (!fNative)
135 {
136 for (;;)
137 {
138 char ch = *pszName++;
139 if (!ch)
140 break;
141 AssertReturn(!RT_C_IS_CNTRL(ch), VERR_INVALID_NAME);
142 AssertReturn((unsigned)ch < 0x80, VERR_INVALID_NAME);
143 AssertReturn(ch != '\\', VERR_INVALID_NAME);
144 AssertReturn(ch != '/', VERR_INVALID_NAME);
145 }
146 }
147 else
148 {
149 int rc = RTStrValidateEncoding(pszName);
150 AssertRCReturn(rc, rc);
151 }
152
153 return VINF_SUCCESS;
154}
155
156
157/**
158 * Constructs a local (unix) domain socket name.
159 *
160 * @returns IPRT status code.
161 * @param pAddr The address structure to construct the name in.
162 * @param pcbAddr Where to return the address size.
163 * @param pszName The user specified name (valid).
164 * @param fNative Whether it's a native name or a portable name.
165 */
166static int rtLocalIpcPosixConstructName(struct sockaddr_un *pAddr, uint8_t *pcbAddr, const char *pszName, bool fNative)
167{
168 const char *pszNativeName;
169 int rc = rtPathToNative(&pszNativeName, pszName, NULL /*pszBasePath not support*/);
170 if (RT_SUCCESS(rc))
171 {
172 size_t cchNativeName = strlen(pszNativeName);
173 size_t cbFull = !fNative ? cchNativeName + sizeof(RTLOCALIPC_POSIX_NAME_PREFIX) : cchNativeName + 1;
174 if (cbFull <= sizeof(pAddr->sun_path))
175 {
176 RT_ZERO(*pAddr);
177#ifdef RT_OS_OS2 /* Size must be exactly right on OS/2. */
178 *pcbAddr = sizeof(*pAddr);
179#else
180 *pcbAddr = RT_UOFFSETOF(struct sockaddr_un, sun_path) + (uint8_t)cbFull;
181#endif
182#ifdef HAVE_SUN_LEN_MEMBER
183 pAddr->sun_len = *pcbAddr;
184#endif
185 pAddr->sun_family = AF_LOCAL;
186
187 if (!fNative)
188 {
189 memcpy(pAddr->sun_path, RTLOCALIPC_POSIX_NAME_PREFIX, sizeof(RTLOCALIPC_POSIX_NAME_PREFIX) - 1);
190 memcpy(&pAddr->sun_path[sizeof(RTLOCALIPC_POSIX_NAME_PREFIX) - 1], pszNativeName, cchNativeName + 1);
191 }
192 else
193 memcpy(pAddr->sun_path, pszNativeName, cchNativeName + 1);
194 }
195 else
196 rc = VERR_FILENAME_TOO_LONG;
197 rtPathFreeNative(pszNativeName, pszName);
198 }
199 return rc;
200}
201
202
203
204RTDECL(int) RTLocalIpcServerCreate(PRTLOCALIPCSERVER phServer, const char *pszName, uint32_t fFlags)
205{
206 /*
207 * Parameter validation.
208 */
209 AssertPtrReturn(phServer, VERR_INVALID_POINTER);
210 *phServer = NIL_RTLOCALIPCSERVER;
211 AssertReturn(!(fFlags & ~RTLOCALIPC_FLAGS_VALID_MASK), VERR_INVALID_FLAGS);
212 int rc = rtLocalIpcPosixValidateName(pszName, RT_BOOL(fFlags & RTLOCALIPC_FLAGS_NATIVE_NAME));
213 if (RT_SUCCESS(rc))
214 {
215 /*
216 * Allocate memory for the instance and initialize it.
217 */
218 PRTLOCALIPCSERVERINT pThis = (PRTLOCALIPCSERVERINT)RTMemAllocZ(sizeof(*pThis));
219 if (pThis)
220 {
221 pThis->u32Magic = RTLOCALIPCSERVER_MAGIC;
222 pThis->fFlags = fFlags;
223 pThis->cRefs = 1;
224 pThis->fCancelled = false;
225 pThis->hListenThread = NIL_RTTHREAD;
226 rc = RTCritSectInit(&pThis->CritSect);
227 if (RT_SUCCESS(rc))
228 {
229 /*
230 * Create the local (unix) socket and bind to it.
231 */
232 rc = rtSocketCreate(&pThis->hSocket, AF_LOCAL, SOCK_STREAM, 0 /*iProtocol*/);
233 if (RT_SUCCESS(rc))
234 {
235 RTSocketSetInheritance(pThis->hSocket, false /*fInheritable*/);
236 signal(SIGPIPE, SIG_IGN); /* Required on solaris, at least. */
237
238 uint8_t cbAddr;
239 rc = rtLocalIpcPosixConstructName(&pThis->Name, &cbAddr, pszName,
240 RT_BOOL(fFlags & RTLOCALIPC_FLAGS_NATIVE_NAME));
241 if (RT_SUCCESS(rc))
242 {
243 rc = rtSocketBindRawAddr(pThis->hSocket, &pThis->Name, cbAddr);
244 if (rc == VERR_NET_ADDRESS_IN_USE)
245 {
246 unlink(pThis->Name.sun_path);
247 rc = rtSocketBindRawAddr(pThis->hSocket, &pThis->Name, cbAddr);
248 }
249 if (RT_SUCCESS(rc))
250 {
251 rc = rtSocketListen(pThis->hSocket, 16);
252 if (RT_SUCCESS(rc))
253 {
254 LogFlow(("RTLocalIpcServerCreate: Created %p (%s)\n", pThis, pThis->Name.sun_path));
255 *phServer = pThis;
256 return VINF_SUCCESS;
257 }
258 unlink(pThis->Name.sun_path);
259 }
260 }
261 RTSocketRelease(pThis->hSocket);
262 }
263 RTCritSectDelete(&pThis->CritSect);
264 }
265 RTMemFree(pThis);
266 }
267 else
268 rc = VERR_NO_MEMORY;
269 }
270 Log(("RTLocalIpcServerCreate: failed, rc=%Rrc\n", rc));
271 return rc;
272}
273
274
275/**
276 * Retains a reference to the server instance.
277 *
278 * @returns
279 * @param pThis The server instance.
280 */
281DECLINLINE(void) rtLocalIpcServerRetain(PRTLOCALIPCSERVERINT pThis)
282{
283 uint32_t cRefs = ASMAtomicIncU32(&pThis->cRefs);
284 Assert(cRefs < UINT32_MAX / 2 && cRefs); RT_NOREF_PV(cRefs);
285}
286
287
288/**
289 * Server instance destructor.
290 *
291 * @returns VINF_OBJECT_DESTROYED
292 * @param pThis The server instance.
293 */
294static int rtLocalIpcServerDtor(PRTLOCALIPCSERVERINT pThis)
295{
296 pThis->u32Magic = ~RTLOCALIPCSERVER_MAGIC;
297 if (RTSocketRelease(pThis->hSocket) == 0)
298 Log(("rtLocalIpcServerDtor: Released socket\n"));
299 else
300 Log(("rtLocalIpcServerDtor: Socket still has references (impossible?)\n"));
301 RTCritSectDelete(&pThis->CritSect);
302 unlink(pThis->Name.sun_path);
303 RTMemFree(pThis);
304 return VINF_OBJECT_DESTROYED;
305}
306
307
308/**
309 * Releases a reference to the server instance.
310 *
311 * @returns VINF_SUCCESS if only release, VINF_OBJECT_DESTROYED if destroyed.
312 * @param pThis The server instance.
313 */
314DECLINLINE(int) rtLocalIpcServerRelease(PRTLOCALIPCSERVERINT pThis)
315{
316 uint32_t cRefs = ASMAtomicDecU32(&pThis->cRefs);
317 Assert(cRefs < UINT32_MAX / 2);
318 if (!cRefs)
319 return rtLocalIpcServerDtor(pThis);
320 return VINF_SUCCESS;
321}
322
323
324/**
325 * The core of RTLocalIpcServerCancel, used by both the destroy and cancel APIs.
326 *
327 * @returns IPRT status code
328 * @param pThis The server instance.
329 */
330static int rtLocalIpcServerCancel(PRTLOCALIPCSERVERINT pThis)
331{
332 RTCritSectEnter(&pThis->CritSect);
333 pThis->fCancelled = true;
334 Log(("rtLocalIpcServerCancel:\n"));
335 if (pThis->hListenThread != NIL_RTTHREAD)
336 RTThreadPoke(pThis->hListenThread);
337 RTCritSectLeave(&pThis->CritSect);
338 return VINF_SUCCESS;
339}
340
341
342
343RTDECL(int) RTLocalIpcServerDestroy(RTLOCALIPCSERVER hServer)
344{
345 /*
346 * Validate input.
347 */
348 if (hServer == NIL_RTLOCALIPCSERVER)
349 return VINF_SUCCESS;
350 PRTLOCALIPCSERVERINT pThis = (PRTLOCALIPCSERVERINT)hServer;
351 AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
352 AssertReturn(pThis->u32Magic == RTLOCALIPCSERVER_MAGIC, VERR_INVALID_HANDLE);
353
354 /*
355 * Invalidate the server, releasing the caller's reference to the instance
356 * data and making sure any other thread in the listen API will wake up.
357 */
358 AssertReturn(ASMAtomicCmpXchgU32(&pThis->u32Magic, ~RTLOCALIPCSERVER_MAGIC, RTLOCALIPCSERVER_MAGIC), VERR_WRONG_ORDER);
359
360 rtLocalIpcServerCancel(pThis);
361 return rtLocalIpcServerRelease(pThis);
362}
363
364
365RTDECL(int) RTLocalIpcServerCancel(RTLOCALIPCSERVER hServer)
366{
367 /*
368 * Validate input.
369 */
370 PRTLOCALIPCSERVERINT pThis = (PRTLOCALIPCSERVERINT)hServer;
371 AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
372 AssertReturn(pThis->u32Magic == RTLOCALIPCSERVER_MAGIC, VERR_INVALID_HANDLE);
373
374 /*
375 * Do the job.
376 */
377 rtLocalIpcServerRetain(pThis);
378 rtLocalIpcServerCancel(pThis);
379 rtLocalIpcServerRelease(pThis);
380 return VINF_SUCCESS;
381}
382
383
384RTDECL(int) RTLocalIpcServerListen(RTLOCALIPCSERVER hServer, PRTLOCALIPCSESSION phClientSession)
385{
386 /*
387 * Validate input.
388 */
389 PRTLOCALIPCSERVERINT pThis = (PRTLOCALIPCSERVERINT)hServer;
390 AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
391 AssertReturn(pThis->u32Magic == RTLOCALIPCSERVER_MAGIC, VERR_INVALID_HANDLE);
392
393 /*
394 * Begin listening.
395 */
396 rtLocalIpcServerRetain(pThis);
397 int rc = RTCritSectEnter(&pThis->CritSect);
398 if (RT_SUCCESS(rc))
399 {
400 if (pThis->hListenThread == NIL_RTTHREAD)
401 {
402 pThis->hListenThread = RTThreadSelf();
403
404 /*
405 * The listening retry loop.
406 */
407 for (;;)
408 {
409 if (!pThis->fCancelled)
410 {
411 rc = RTCritSectLeave(&pThis->CritSect);
412 AssertRCBreak(rc);
413
414 struct sockaddr_un Addr;
415 size_t cbAddr = sizeof(Addr);
416 RTSOCKET hClient;
417 Log(("RTLocalIpcServerListen: Calling rtSocketAccept...\n"));
418 rc = rtSocketAccept(pThis->hSocket, &hClient, (struct sockaddr *)&Addr, &cbAddr);
419 Log(("RTLocalIpcServerListen: rtSocketAccept returns %Rrc.\n", rc));
420
421 int rc2 = RTCritSectEnter(&pThis->CritSect);
422 AssertRCBreakStmt(rc2, rc = RT_SUCCESS(rc) ? rc2 : rc);
423
424 if (RT_SUCCESS(rc))
425 {
426 /*
427 * Create a client session.
428 */
429 PRTLOCALIPCSESSIONINT pSession = (PRTLOCALIPCSESSIONINT)RTMemAllocZ(sizeof(*pSession));
430 if (pSession)
431 {
432 pSession->u32Magic = RTLOCALIPCSESSION_MAGIC;
433 pSession->cRefs = 1;
434 pSession->fCancelled = false;
435 pSession->fServerSide = true;
436 pSession->hSocket = hClient;
437 pSession->hReadThread = NIL_RTTHREAD;
438 pSession->hWriteThread = NIL_RTTHREAD;
439 rc = RTCritSectInit(&pSession->CritSect);
440 if (RT_SUCCESS(rc))
441 {
442 Log(("RTLocalIpcServerListen: Returning new client session: %p\n", pSession));
443 *phClientSession = pSession;
444 break;
445 }
446
447 RTMemFree(pSession);
448 }
449 else
450 rc = VERR_NO_MEMORY;
451 }
452 else if ( rc != VERR_INTERRUPTED
453 && rc != VERR_TRY_AGAIN)
454 break;
455 }
456 else
457 {
458 rc = VERR_CANCELLED;
459 break;
460 }
461 }
462
463 pThis->hListenThread = NIL_RTTHREAD;
464 }
465 else
466 {
467 AssertFailed();
468 rc = VERR_RESOURCE_BUSY;
469 }
470 int rc2 = RTCritSectLeave(&pThis->CritSect);
471 AssertStmt(RT_SUCCESS(rc2), rc = RT_SUCCESS(rc) ? rc2 : rc);
472 }
473 rtLocalIpcServerRelease(pThis);
474
475 Log(("RTLocalIpcServerListen: returns %Rrc\n", rc));
476 return rc;
477}
478
479
480RTDECL(int) RTLocalIpcSessionConnect(PRTLOCALIPCSESSION phSession, const char *pszName, uint32_t fFlags)
481{
482 /*
483 * Parameter validation.
484 */
485 AssertPtrReturn(phSession, VERR_INVALID_POINTER);
486 *phSession = NIL_RTLOCALIPCSESSION;
487
488 AssertReturn(!(fFlags & ~RTLOCALIPC_C_FLAGS_VALID_MASK), VERR_INVALID_FLAGS);
489
490 int rc = rtLocalIpcPosixValidateName(pszName, RT_BOOL(fFlags & RTLOCALIPC_C_FLAGS_NATIVE_NAME));
491 if (RT_SUCCESS(rc))
492 {
493 /*
494 * Allocate memory for the instance and initialize it.
495 */
496 PRTLOCALIPCSESSIONINT pThis = (PRTLOCALIPCSESSIONINT)RTMemAllocZ(sizeof(*pThis));
497 if (pThis)
498 {
499 pThis->u32Magic = RTLOCALIPCSESSION_MAGIC;
500 pThis->cRefs = 1;
501 pThis->fCancelled = false;
502 pThis->fServerSide = false;
503 pThis->hSocket = NIL_RTSOCKET;
504 pThis->hReadThread = NIL_RTTHREAD;
505 pThis->hWriteThread = NIL_RTTHREAD;
506 rc = RTCritSectInit(&pThis->CritSect);
507 if (RT_SUCCESS(rc))
508 {
509 /*
510 * Create the local (unix) socket and try connect to the server.
511 */
512 rc = rtSocketCreate(&pThis->hSocket, AF_LOCAL, SOCK_STREAM, 0 /*iProtocol*/);
513 if (RT_SUCCESS(rc))
514 {
515 RTSocketSetInheritance(pThis->hSocket, false /*fInheritable*/);
516 signal(SIGPIPE, SIG_IGN); /* Required on solaris, at least. */
517
518 struct sockaddr_un Addr;
519 uint8_t cbAddr;
520 rc = rtLocalIpcPosixConstructName(&Addr, &cbAddr, pszName, RT_BOOL(fFlags & RTLOCALIPC_C_FLAGS_NATIVE_NAME));
521 if (RT_SUCCESS(rc))
522 {
523 rc = rtSocketConnectRaw(pThis->hSocket, &Addr, cbAddr);
524 if (RT_SUCCESS(rc))
525 {
526 *phSession = pThis;
527 Log(("RTLocalIpcSessionConnect: Returns new session %p\n", pThis));
528 return VINF_SUCCESS;
529 }
530 }
531 RTSocketRelease(pThis->hSocket);
532 }
533 RTCritSectDelete(&pThis->CritSect);
534 }
535 RTMemFree(pThis);
536 }
537 else
538 rc = VERR_NO_MEMORY;
539 }
540 Log(("RTLocalIpcSessionConnect: returns %Rrc\n", rc));
541 return rc;
542}
543
544
545/**
546 * Retains a reference to the session instance.
547 *
548 * @param pThis The server instance.
549 */
550DECLINLINE(void) rtLocalIpcSessionRetain(PRTLOCALIPCSESSIONINT pThis)
551{
552 uint32_t cRefs = ASMAtomicIncU32(&pThis->cRefs);
553 Assert(cRefs < UINT32_MAX / 2 && cRefs); RT_NOREF_PV(cRefs);
554}
555
556
557RTDECL(uint32_t) RTLocalIpcSessionRetain(RTLOCALIPCSESSION hSession)
558{
559 PRTLOCALIPCSESSIONINT pThis = (PRTLOCALIPCSESSIONINT)hSession;
560 AssertPtrReturn(pThis, UINT32_MAX);
561 AssertReturn(pThis->u32Magic == RTLOCALIPCSESSION_MAGIC, UINT32_MAX);
562
563 uint32_t cRefs = ASMAtomicIncU32(&pThis->cRefs);
564 Assert(cRefs < UINT32_MAX / 2 && cRefs);
565 return cRefs;
566}
567
568
569/**
570 * Session instance destructor.
571 *
572 * @returns VINF_OBJECT_DESTROYED
573 * @param pThis The server instance.
574 */
575static int rtLocalIpcSessionDtor(PRTLOCALIPCSESSIONINT pThis)
576{
577 pThis->u32Magic = ~RTLOCALIPCSESSION_MAGIC;
578 if (RTSocketRelease(pThis->hSocket) == 0)
579 Log(("rtLocalIpcSessionDtor: Released socket\n"));
580 else
581 Log(("rtLocalIpcSessionDtor: Socket still has references (impossible?)\n"));
582 RTCritSectDelete(&pThis->CritSect);
583 RTMemFree(pThis);
584 return VINF_OBJECT_DESTROYED;
585}
586
587
588/**
589 * Releases a reference to the session instance.
590 *
591 * @returns VINF_SUCCESS or VINF_OBJECT_DESTROYED as appropriate.
592 * @param pThis The session instance.
593 */
594DECLINLINE(int) rtLocalIpcSessionRelease(PRTLOCALIPCSESSIONINT pThis)
595{
596 uint32_t cRefs = ASMAtomicDecU32(&pThis->cRefs);
597 Assert(cRefs < UINT32_MAX / 2);
598 if (!cRefs)
599 return rtLocalIpcSessionDtor(pThis);
600 Log(("rtLocalIpcSessionRelease: %u refs left\n", cRefs));
601 return VINF_SUCCESS;
602}
603
604
605RTDECL(uint32_t) RTLocalIpcSessionRelease(RTLOCALIPCSESSION hSession)
606{
607 if (hSession == NIL_RTLOCALIPCSESSION)
608 return 0;
609
610 PRTLOCALIPCSESSIONINT pThis = (PRTLOCALIPCSESSIONINT)hSession;
611 AssertPtrReturn(pThis, UINT32_MAX);
612 AssertReturn(pThis->u32Magic == RTLOCALIPCSESSION_MAGIC, UINT32_MAX);
613
614 uint32_t cRefs = ASMAtomicDecU32(&pThis->cRefs);
615 Assert(cRefs < UINT32_MAX / 2);
616 if (cRefs)
617 Log(("RTLocalIpcSessionRelease: %u refs left\n", cRefs));
618 else
619 rtLocalIpcSessionDtor(pThis);
620 return cRefs;
621}
622
623
624/**
625 * The core of RTLocalIpcSessionCancel, used by both the destroy and cancel APIs.
626 *
627 * @returns IPRT status code
628 * @param pThis The session instance.
629 */
630static int rtLocalIpcSessionCancel(PRTLOCALIPCSESSIONINT pThis)
631{
632 RTCritSectEnter(&pThis->CritSect);
633 pThis->fCancelled = true;
634 Log(("rtLocalIpcSessionCancel:\n"));
635 if (pThis->hReadThread != NIL_RTTHREAD)
636 RTThreadPoke(pThis->hReadThread);
637 if (pThis->hWriteThread != NIL_RTTHREAD)
638 RTThreadPoke(pThis->hWriteThread);
639 RTCritSectLeave(&pThis->CritSect);
640 return VINF_SUCCESS;
641}
642
643
644RTDECL(int) RTLocalIpcSessionClose(RTLOCALIPCSESSION hSession)
645{
646 /*
647 * Validate input.
648 */
649 if (hSession == NIL_RTLOCALIPCSESSION)
650 return VINF_SUCCESS;
651 PRTLOCALIPCSESSIONINT pThis = hSession;
652 AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
653 AssertReturn(pThis->u32Magic == RTLOCALIPCSESSION_MAGIC, VERR_INVALID_HANDLE);
654
655 /*
656 * Invalidate the session, releasing the caller's reference to the instance
657 * data and making sure any other thread in the listen API will wake up.
658 */
659 Log(("RTLocalIpcSessionClose:\n"));
660
661 rtLocalIpcSessionCancel(pThis);
662 return rtLocalIpcSessionRelease(pThis);
663}
664
665
666RTDECL(int) RTLocalIpcSessionCancel(RTLOCALIPCSESSION hSession)
667{
668 /*
669 * Validate input.
670 */
671 PRTLOCALIPCSESSIONINT pThis = hSession;
672 AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
673 AssertReturn(pThis->u32Magic == RTLOCALIPCSESSION_MAGIC, VERR_INVALID_HANDLE);
674
675 /*
676 * Do the job.
677 */
678 rtLocalIpcSessionRetain(pThis);
679 rtLocalIpcSessionCancel(pThis);
680 rtLocalIpcSessionRelease(pThis);
681 return VINF_SUCCESS;
682}
683
684
685/**
686 * Checks if the socket has has a HUP condition after reading zero bytes.
687 *
688 * @returns true if HUP, false if no.
689 * @param pThis The IPC session handle.
690 */
691static bool rtLocalIpcPosixHasHup(PRTLOCALIPCSESSIONINT pThis)
692{
693 int fdNative = RTSocketToNative(pThis->hSocket);
694
695#if !defined(RT_OS_OS2) && !defined(RT_OS_SOLARIS)
696 struct pollfd PollFd;
697 RT_ZERO(PollFd);
698 PollFd.fd = fdNative;
699 PollFd.events = POLLHUP | POLLERR;
700 if (poll(&PollFd, 1, 0) <= 0)
701 return false;
702 if (!(PollFd.revents & (POLLHUP | POLLERR)))
703 return false;
704#else /* RT_OS_OS2 || RT_OS_SOLARIS */
705 /*
706 * OS/2: No native poll, do zero byte send to check for EPIPE.
707 * Solaris: We don't get POLLHUP.
708 */
709 uint8_t bDummy;
710 ssize_t rcSend = send(fdNative, &bDummy, 0, 0);
711 if (rcSend >= 0 || (errno != EPIPE && errno != ECONNRESET))
712 return false;
713#endif /* RT_OS_OS2 || RT_OS_SOLARIS */
714
715 /*
716 * We've established EPIPE. Now make sure there aren't any last bytes to
717 * read that came in between the recv made by the caller and the disconnect.
718 */
719 uint8_t bPeek;
720 ssize_t rcRecv = recv(fdNative, &bPeek, 1, MSG_DONTWAIT | MSG_PEEK);
721 return rcRecv <= 0;
722}
723
724
725RTDECL(int) RTLocalIpcSessionRead(RTLOCALIPCSESSION hSession, void *pvBuf, size_t cbToRead, size_t *pcbRead)
726{
727 /*
728 * Validate input.
729 */
730 PRTLOCALIPCSESSIONINT pThis = hSession;
731 AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
732 AssertReturn(pThis->u32Magic == RTLOCALIPCSESSION_MAGIC, VERR_INVALID_HANDLE);
733
734 /*
735 * Do the job.
736 */
737 rtLocalIpcSessionRetain(pThis);
738
739 int rc = RTCritSectEnter(&pThis->CritSect);
740 if (RT_SUCCESS(rc))
741 {
742 if (pThis->hReadThread == NIL_RTTHREAD)
743 {
744 pThis->hReadThread = RTThreadSelf();
745
746 for (;;)
747 {
748 if (!pThis->fCancelled)
749 {
750 rc = RTCritSectLeave(&pThis->CritSect);
751 AssertRCBreak(rc);
752
753 rc = RTSocketRead(pThis->hSocket, pvBuf, cbToRead, pcbRead);
754
755 /* Detect broken pipe. */
756 if (rc == VINF_SUCCESS)
757 {
758 if (!pcbRead || *pcbRead)
759 { /* likely */ }
760 else if (rtLocalIpcPosixHasHup(pThis))
761 rc = VERR_BROKEN_PIPE;
762 }
763 else if (rc == VERR_NET_CONNECTION_RESET_BY_PEER || rc == VERR_NET_SHUTDOWN)
764 rc = VERR_BROKEN_PIPE;
765
766 int rc2 = RTCritSectEnter(&pThis->CritSect);
767 AssertRCBreakStmt(rc2, rc = RT_SUCCESS(rc) ? rc2 : rc);
768
769 if ( rc == VERR_INTERRUPTED
770 || rc == VERR_TRY_AGAIN)
771 continue;
772 }
773 else
774 rc = VERR_CANCELLED;
775 break;
776 }
777
778 pThis->hReadThread = NIL_RTTHREAD;
779 }
780 int rc2 = RTCritSectLeave(&pThis->CritSect);
781 AssertStmt(RT_SUCCESS(rc2), rc = RT_SUCCESS(rc) ? rc2 : rc);
782 }
783
784 rtLocalIpcSessionRelease(pThis);
785 return rc;
786}
787
788
789RTDECL(int) RTLocalIpcSessionReadNB(RTLOCALIPCSESSION hSession, void *pvBuf, size_t cbToRead, size_t *pcbRead)
790{
791 /*
792 * Validate input.
793 */
794 PRTLOCALIPCSESSIONINT pThis = hSession;
795 AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
796 AssertReturn(pThis->u32Magic == RTLOCALIPCSESSION_MAGIC, VERR_INVALID_HANDLE);
797
798 /*
799 * Do the job.
800 */
801 rtLocalIpcSessionRetain(pThis);
802
803 int rc = RTCritSectEnter(&pThis->CritSect);
804 if (RT_SUCCESS(rc))
805 {
806 if (pThis->hReadThread == NIL_RTTHREAD)
807 {
808 pThis->hReadThread = RTThreadSelf(); /* not really required, but whatever. */
809
810 for (;;)
811 {
812 if (!pThis->fCancelled)
813 {
814 rc = RTSocketReadNB(pThis->hSocket, pvBuf, cbToRead, pcbRead);
815
816 /* Detect broken pipe. */
817 if (rc == VINF_SUCCESS)
818 {
819 if (!pcbRead || *pcbRead)
820 { /* likely */ }
821 else if (rtLocalIpcPosixHasHup(pThis))
822 rc = VERR_BROKEN_PIPE;
823 }
824 else if (rc == VERR_NET_CONNECTION_RESET_BY_PEER || rc == VERR_NET_SHUTDOWN)
825 rc = VERR_BROKEN_PIPE;
826
827 if (rc == VERR_INTERRUPTED)
828 continue;
829 }
830 else
831 rc = VERR_CANCELLED;
832 break;
833 }
834
835 pThis->hReadThread = NIL_RTTHREAD;
836 }
837 int rc2 = RTCritSectLeave(&pThis->CritSect);
838 AssertStmt(RT_SUCCESS(rc2), rc = RT_SUCCESS(rc) ? rc2 : rc);
839 }
840
841 rtLocalIpcSessionRelease(pThis);
842 return rc;
843}
844
845
846RTDECL(int) RTLocalIpcSessionWrite(RTLOCALIPCSESSION hSession, const void *pvBuf, size_t cbToWrite)
847{
848 /*
849 * Validate input.
850 */
851 PRTLOCALIPCSESSIONINT pThis = hSession;
852 AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
853 AssertReturn(pThis->u32Magic == RTLOCALIPCSESSION_MAGIC, VERR_INVALID_HANDLE);
854
855 /*
856 * Do the job.
857 */
858 rtLocalIpcSessionRetain(pThis);
859
860 int rc = RTCritSectEnter(&pThis->CritSect);
861 if (RT_SUCCESS(rc))
862 {
863 if (pThis->hWriteThread == NIL_RTTHREAD)
864 {
865 pThis->hWriteThread = RTThreadSelf();
866
867 for (;;)
868 {
869 if (!pThis->fCancelled)
870 {
871 rc = RTCritSectLeave(&pThis->CritSect);
872 AssertRCBreak(rc);
873
874 rc = RTSocketWrite(pThis->hSocket, pvBuf, cbToWrite);
875
876 int rc2 = RTCritSectEnter(&pThis->CritSect);
877 AssertRCBreakStmt(rc2, rc = RT_SUCCESS(rc) ? rc2 : rc);
878
879 if ( rc == VERR_INTERRUPTED
880 || rc == VERR_TRY_AGAIN)
881 continue;
882 }
883 else
884 rc = VERR_CANCELLED;
885 break;
886 }
887
888 pThis->hWriteThread = NIL_RTTHREAD;
889 }
890 int rc2 = RTCritSectLeave(&pThis->CritSect);
891 AssertStmt(RT_SUCCESS(rc2), rc = RT_SUCCESS(rc) ? rc2 : rc);
892 }
893
894 rtLocalIpcSessionRelease(pThis);
895 return rc;
896}
897
898
899RTDECL(int) RTLocalIpcSessionFlush(RTLOCALIPCSESSION hSession)
900{
901 /*
902 * Validate input.
903 */
904 PRTLOCALIPCSESSIONINT pThis = hSession;
905 AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
906 AssertReturn(pThis->u32Magic == RTLOCALIPCSESSION_MAGIC, VERR_INVALID_HANDLE);
907
908 /*
909 * This is a no-op because apparently write doesn't return until the
910 * result is read. At least that's what the reply to a 2003-04-08 LKML
911 * posting title "fsync() on unix domain sockets?" indicates.
912 *
913 * For conformity, make sure there isn't any active writes concurrent to this call.
914 */
915 rtLocalIpcSessionRetain(pThis);
916
917 int rc = RTCritSectEnter(&pThis->CritSect);
918 if (RT_SUCCESS(rc))
919 {
920 if (pThis->hWriteThread == NIL_RTTHREAD)
921 rc = RTCritSectLeave(&pThis->CritSect);
922 else
923 {
924 rc = RTCritSectLeave(&pThis->CritSect);
925 if (RT_SUCCESS(rc))
926 rc = VERR_RESOURCE_BUSY;
927 }
928 }
929
930 rtLocalIpcSessionRelease(pThis);
931 return rc;
932}
933
934
935RTDECL(int) RTLocalIpcSessionWaitForData(RTLOCALIPCSESSION hSession, uint32_t cMillies)
936{
937 /*
938 * Validate input.
939 */
940 PRTLOCALIPCSESSIONINT pThis = hSession;
941 AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
942 AssertReturn(pThis->u32Magic == RTLOCALIPCSESSION_MAGIC, VERR_INVALID_HANDLE);
943
944 /*
945 * Do the job.
946 */
947 rtLocalIpcSessionRetain(pThis);
948
949 int rc = RTCritSectEnter(&pThis->CritSect);
950 if (RT_SUCCESS(rc))
951 {
952 if (pThis->hReadThread == NIL_RTTHREAD)
953 {
954 pThis->hReadThread = RTThreadSelf();
955 uint64_t const msStart = RTTimeMilliTS();
956 RTMSINTERVAL const cMsOriginalTimeout = cMillies;
957
958 for (;;)
959 {
960 if (!pThis->fCancelled)
961 {
962 rc = RTCritSectLeave(&pThis->CritSect);
963 AssertRCBreak(rc);
964
965 uint32_t fEvents = 0;
966#ifdef RT_OS_OS2
967 /* This doesn't give us any error condition on hangup, so use HUP check. */
968 Log(("RTLocalIpcSessionWaitForData: Calling RTSocketSelectOneEx...\n"));
969 rc = RTSocketSelectOneEx(pThis->hSocket, RTPOLL_EVT_READ | RTPOLL_EVT_ERROR, &fEvents, cMillies);
970 Log(("RTLocalIpcSessionWaitForData: RTSocketSelectOneEx returns %Rrc, fEvents=%#x\n", rc, fEvents));
971 if (RT_SUCCESS(rc) && fEvents == RTPOLL_EVT_READ && rtLocalIpcPosixHasHup(pThis))
972 rc = VERR_BROKEN_PIPE;
973#else
974/** @todo RTSocketPoll? */
975 /* POLLHUP will be set on hangup. */
976 struct pollfd PollFd;
977 RT_ZERO(PollFd);
978 PollFd.fd = RTSocketToNative(pThis->hSocket);
979 PollFd.events = POLLHUP | POLLERR | POLLIN;
980 Log(("RTLocalIpcSessionWaitForData: Calling poll...\n"));
981 int cFds = poll(&PollFd, 1, cMillies == RT_INDEFINITE_WAIT ? -1 : cMillies);
982 if (cFds >= 1)
983 {
984 /* Linux & Darwin sets both POLLIN and POLLHUP when the pipe is
985 broken and but no more data to read. Google hints at NetBSD
986 returning more sane values (POLLIN till no more data, then
987 POLLHUP). Solairs OTOH, doesn't ever seem to return POLLHUP. */
988 fEvents = RTPOLL_EVT_READ;
989 if ( (PollFd.revents & (POLLHUP | POLLERR))
990 && !(PollFd.revents & POLLIN))
991 fEvents = RTPOLL_EVT_ERROR;
992# if defined(RT_OS_SOLARIS)
993 else if (PollFd.revents & POLLIN)
994# else
995 else if ((PollFd.revents & (POLLIN | POLLHUP)) == (POLLIN | POLLHUP))
996# endif
997 {
998 /* Check if there is actually data available. */
999 uint8_t bPeek;
1000 ssize_t rcRecv = recv(PollFd.fd, &bPeek, 1, MSG_DONTWAIT | MSG_PEEK);
1001 if (rcRecv <= 0)
1002 fEvents = RTPOLL_EVT_ERROR;
1003 }
1004 rc = VINF_SUCCESS;
1005 }
1006 else if (rc == 0)
1007 rc = VERR_TIMEOUT;
1008 else
1009 rc = RTErrConvertFromErrno(errno);
1010 Log(("RTLocalIpcSessionWaitForData: poll returns %u (rc=%d), revents=%#x\n", cFds, rc, PollFd.revents));
1011#endif
1012
1013 int rc2 = RTCritSectEnter(&pThis->CritSect);
1014 AssertRCBreakStmt(rc2, rc = RT_SUCCESS(rc) ? rc2 : rc);
1015
1016 if (RT_SUCCESS(rc))
1017 {
1018 if (pThis->fCancelled)
1019 rc = VERR_CANCELLED;
1020 else if (fEvents & RTPOLL_EVT_ERROR)
1021 rc = VERR_BROKEN_PIPE;
1022 }
1023 else if ( rc == VERR_INTERRUPTED
1024 || rc == VERR_TRY_AGAIN)
1025 {
1026 /* Recalc cMillies. */
1027 if (cMsOriginalTimeout != RT_INDEFINITE_WAIT)
1028 {
1029 uint64_t cMsElapsed = RTTimeMilliTS() - msStart;
1030 cMillies = cMsElapsed >= cMsOriginalTimeout ? 0 : cMsOriginalTimeout - (RTMSINTERVAL)cMsElapsed;
1031 }
1032 continue;
1033 }
1034 }
1035 else
1036 rc = VERR_CANCELLED;
1037 break;
1038 }
1039
1040 pThis->hReadThread = NIL_RTTHREAD;
1041 }
1042 int rc2 = RTCritSectLeave(&pThis->CritSect);
1043 AssertStmt(RT_SUCCESS(rc2), rc = RT_SUCCESS(rc) ? rc2 : rc);
1044 }
1045
1046 rtLocalIpcSessionRelease(pThis);
1047 return rc;
1048}
1049
1050
1051RTDECL(int) RTLocalIpcSessionQueryProcess(RTLOCALIPCSESSION hSession, PRTPROCESS pProcess)
1052{
1053 RT_NOREF_PV(hSession); RT_NOREF_PV(pProcess);
1054 return VERR_NOT_SUPPORTED;
1055}
1056
1057
1058RTDECL(int) RTLocalIpcSessionQueryUserId(RTLOCALIPCSESSION hSession, PRTUID pUid)
1059{
1060 RT_NOREF_PV(hSession); RT_NOREF_PV(pUid);
1061 return VERR_NOT_SUPPORTED;
1062}
1063
1064
1065RTDECL(int) RTLocalIpcSessionQueryGroupId(RTLOCALIPCSESSION hSession, PRTGID pGid)
1066{
1067 RT_NOREF_PV(hSession); RT_NOREF_PV(pGid);
1068 return VERR_NOT_SUPPORTED;
1069}
1070
Note: See TracBrowser for help on using the repository browser.

© 2024 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette