VirtualBox

source: vbox/trunk/src/VBox/NetworkServices/NAT/proxy_pollmgr.c@ 51300

Last change on this file since 51300 was 49323, checked in by vboxsync, 11 years ago

Few more missed -1 -> INVALID_SOCKET replacements.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Id Revision
File size: 17.0 KB
Line 
1/* -*- indent-tabs-mode: nil; -*- */
2#include "winutils.h"
3
4#include "proxy_pollmgr.h"
5#include "proxy.h"
6
7#ifndef RT_OS_WINDOWS
8#include <sys/socket.h>
9#include <netinet/in.h>
10#include <err.h>
11#include <errno.h>
12#include <poll.h>
13#include <stdio.h>
14#include <stdlib.h>
15#include <string.h>
16#include <time.h>
17#include <unistd.h>
18#else
19#include <iprt/err.h>
20#include <stdlib.h>
21#include <string.h>
22#include "winpoll.h"
23#endif
24
25#define POLLMGR_GARBAGE (-1)
26
27struct pollmgr {
28 struct pollfd *fds;
29 struct pollmgr_handler **handlers;
30 nfds_t capacity; /* allocated size of the arrays */
31 nfds_t nfds; /* part of the arrays in use */
32
33 /* channels (socketpair) for static slots */
34 SOCKET chan[POLLMGR_SLOT_STATIC_COUNT][2];
35#define POLLMGR_CHFD_RD 0 /* - pollmgr side */
36#define POLLMGR_CHFD_WR 1 /* - client side */
37} pollmgr;
38
39
40static void pollmgr_loop(void);
41
42static void pollmgr_add_at(int, struct pollmgr_handler *, SOCKET, int);
43static void pollmgr_refptr_delete(struct pollmgr_refptr *);
44
45
46/*
47 * We cannot portably peek at the length of the incoming datagram and
48 * pre-allocate pbuf chain to recvmsg() directly to it. On Linux it's
49 * possible to recv with MSG_PEEK|MSG_TRUC, but extra syscall is
50 * probably more expensive (haven't measured) than doing an extra copy
51 * of data, since typical UDP datagrams are small enough to avoid
52 * fragmentation.
53 *
54 * We can use shared buffer here since we read from sockets
55 * sequentially in a loop over pollfd.
56 */
57u8_t pollmgr_udpbuf[64 * 1024];
58
59
60int
61pollmgr_init(void)
62{
63 struct pollfd *newfds;
64 struct pollmgr_handler **newhdls;
65 nfds_t newcap;
66 int status;
67 nfds_t i;
68
69 pollmgr.fds = NULL;
70 pollmgr.handlers = NULL;
71 pollmgr.capacity = 0;
72 pollmgr.nfds = 0;
73
74 for (i = 0; i < POLLMGR_SLOT_STATIC_COUNT; ++i) {
75 pollmgr.chan[i][POLLMGR_CHFD_RD] = -1;
76 pollmgr.chan[i][POLLMGR_CHFD_WR] = -1;
77 }
78
79 for (i = 0; i < POLLMGR_SLOT_STATIC_COUNT; ++i) {
80#ifndef RT_OS_WINDOWS
81 status = socketpair(PF_LOCAL, SOCK_DGRAM, 0, pollmgr.chan[i]);
82 if (status < 0) {
83 perror("socketpair");
84 goto cleanup_close;
85 }
86#else
87 status = RTWinSocketPair(PF_INET, SOCK_DGRAM, 0, pollmgr.chan[i]);
88 AssertRCReturn(status, -1);
89
90 if (RT_FAILURE(status)) {
91 perror("socketpair");
92 goto cleanup_close;
93 }
94#endif
95 }
96
97
98 newcap = 16; /* XXX: magic */
99 LWIP_ASSERT1(newcap >= POLLMGR_SLOT_STATIC_COUNT);
100
101 newfds = (struct pollfd *)
102 malloc(newcap * sizeof(*pollmgr.fds));
103 if (newfds == NULL) {
104 perror("calloc");
105 goto cleanup_close;
106 }
107
108 newhdls = (struct pollmgr_handler **)
109 malloc(newcap * sizeof(*pollmgr.handlers));
110 if (newhdls == NULL) {
111 perror("malloc");
112 free(newfds);
113 goto cleanup_close;
114 }
115
116 pollmgr.capacity = newcap;
117 pollmgr.fds = newfds;
118 pollmgr.handlers = newhdls;
119
120 pollmgr.nfds = POLLMGR_SLOT_STATIC_COUNT;
121
122 for (i = 0; i < pollmgr.capacity; ++i) {
123 pollmgr.fds[i].fd = INVALID_SOCKET;
124 pollmgr.fds[i].events = 0;
125 pollmgr.fds[i].revents = 0;
126 }
127
128 return 0;
129
130 cleanup_close:
131 for (i = 0; i < POLLMGR_SLOT_STATIC_COUNT; ++i) {
132 SOCKET *chan = pollmgr.chan[i];
133 if (chan[POLLMGR_CHFD_RD] >= 0) {
134 closesocket(chan[POLLMGR_CHFD_RD]);
135 closesocket(chan[POLLMGR_CHFD_WR]);
136 }
137 }
138
139 return -1;
140}
141
142
143/*
144 * Must be called before pollmgr loop is started, so no locking.
145 */
146SOCKET
147pollmgr_add_chan(int slot, struct pollmgr_handler *handler)
148{
149 if (slot >= POLLMGR_SLOT_FIRST_DYNAMIC) {
150 handler->slot = -1;
151 return -1;
152 }
153
154 pollmgr_add_at(slot, handler, pollmgr.chan[slot][POLLMGR_CHFD_RD], POLLIN);
155 return pollmgr.chan[slot][POLLMGR_CHFD_WR];
156}
157
158
159/*
160 * Must be called from pollmgr loop (via callbacks), so no locking.
161 */
162int
163pollmgr_add(struct pollmgr_handler *handler, SOCKET fd, int events)
164{
165 int slot;
166
167 DPRINTF2(("%s: new fd %d\n", __func__, fd));
168
169 if (pollmgr.nfds == pollmgr.capacity) {
170 struct pollfd *newfds;
171 struct pollmgr_handler **newhdls;
172 nfds_t newcap;
173 nfds_t i;
174
175 newcap = pollmgr.capacity * 2;
176
177 newfds = (struct pollfd *)
178 realloc(pollmgr.fds, newcap * sizeof(*pollmgr.fds));
179 if (newfds == NULL) {
180 perror("realloc");
181 handler->slot = -1;
182 return -1;
183 }
184
185 pollmgr.fds = newfds; /* don't crash/leak if realloc(handlers) fails */
186 /* but don't update capacity yet! */
187
188 newhdls = (struct pollmgr_handler **)
189 realloc(pollmgr.handlers, newcap * sizeof(*pollmgr.handlers));
190 if (newhdls == NULL) {
191 perror("realloc");
192 /* if we failed to realloc here, then fds points to the
193 * new array, but we pretend we still has old capacity */
194 handler->slot = -1;
195 return -1;
196 }
197
198 pollmgr.handlers = newhdls;
199 pollmgr.capacity = newcap;
200
201 for (i = pollmgr.nfds; i < newcap; ++i) {
202 newfds[i].fd = INVALID_SOCKET;
203 newfds[i].events = 0;
204 newfds[i].revents = 0;
205 newhdls[i] = NULL;
206 }
207 }
208
209 slot = pollmgr.nfds;
210 ++pollmgr.nfds;
211
212 pollmgr_add_at(slot, handler, fd, events);
213 return slot;
214}
215
216
217static void
218pollmgr_add_at(int slot, struct pollmgr_handler *handler, SOCKET fd, int events)
219{
220 pollmgr.fds[slot].fd = fd;
221 pollmgr.fds[slot].events = events;
222 pollmgr.fds[slot].revents = 0;
223 pollmgr.handlers[slot] = handler;
224
225 handler->slot = slot;
226}
227
228
229ssize_t
230pollmgr_chan_send(int slot, void *buf, size_t nbytes)
231{
232 SOCKET fd;
233 ssize_t nsent;
234
235 if (slot >= POLLMGR_SLOT_FIRST_DYNAMIC) {
236 return -1;
237 }
238
239 fd = pollmgr.chan[slot][POLLMGR_CHFD_WR];
240 nsent = send(fd, buf, (int)nbytes, 0);
241 if (nsent == SOCKET_ERROR) {
242 warn("send on chan %d", slot);
243 return -1;
244 }
245 else if ((size_t)nsent != nbytes) {
246 warnx("send on chan %d: datagram truncated to %u bytes",
247 slot, (unsigned int)nsent);
248 return -1;
249 }
250
251 return nsent;
252}
253
254
255/**
256 * Receive a pointer sent over poll manager channel.
257 */
258void *
259pollmgr_chan_recv_ptr(struct pollmgr_handler *handler, SOCKET fd, int revents)
260{
261 void *ptr;
262 ssize_t nread;
263
264 if (revents & POLLNVAL) {
265 errx(EXIT_FAILURE, "chan %d: fd invalid", (int)handler->slot);
266 /* NOTREACHED */
267 }
268
269 if (revents & (POLLERR | POLLHUP)) {
270 errx(EXIT_FAILURE, "chan %d: fd error", (int)handler->slot);
271 /* NOTREACHED */
272 }
273
274 LWIP_ASSERT1(revents & POLLIN);
275 nread = recv(fd, (char *)&ptr, sizeof(ptr), 0);
276
277 if (nread == SOCKET_ERROR) {
278 err(EXIT_FAILURE, "chan %d: recv", (int)handler->slot);
279 /* NOTREACHED */
280 }
281 if (nread != sizeof(ptr)) {
282 errx(EXIT_FAILURE, "chan %d: recv: read %d bytes",
283 (int)handler->slot, (int)nread);
284 /* NOTREACHED */
285 }
286
287 return ptr;
288}
289
290
291void
292pollmgr_update_events(int slot, int events)
293{
294 LWIP_ASSERT1(slot >= POLLMGR_SLOT_FIRST_DYNAMIC);
295 LWIP_ASSERT1((nfds_t)slot < pollmgr.nfds);
296
297 pollmgr.fds[slot].events = events;
298}
299
300
301void
302pollmgr_del_slot(int slot)
303{
304 LWIP_ASSERT1(slot >= POLLMGR_SLOT_FIRST_DYNAMIC);
305
306 DPRINTF2(("%s(%d): fd %d ! DELETED\n",
307 __func__, slot, pollmgr.fds[slot].fd));
308
309 pollmgr.fds[slot].fd = INVALID_SOCKET; /* see poll loop */
310}
311
312
313void
314pollmgr_thread(void *ignored)
315{
316 LWIP_UNUSED_ARG(ignored);
317 pollmgr_loop();
318}
319
320
321static void
322pollmgr_loop(void)
323{
324 int nready;
325 SOCKET delfirst;
326 SOCKET *pdelprev;
327 int i;
328
329 for (;;) {
330#ifndef RT_OS_WINDOWS
331 nready = poll(pollmgr.fds, pollmgr.nfds, -1);
332#else
333 int rc = RTWinPoll(pollmgr.fds, pollmgr.nfds,RT_INDEFINITE_WAIT, &nready);
334 if (RT_FAILURE(rc)) {
335 err(EXIT_FAILURE, "poll"); /* XXX: what to do on error? */
336 /* NOTREACHED*/
337 }
338#endif
339
340 DPRINTF2(("%s: ready %d fd%s\n",
341 __func__, nready, (nready == 1 ? "" : "s")));
342
343 if (nready < 0) {
344 if (errno == EINTR) {
345 continue;
346 }
347
348 err(EXIT_FAILURE, "poll"); /* XXX: what to do on error? */
349 /* NOTREACHED*/
350 }
351 else if (nready == 0) { /* cannot happen, we wait forever (-1) */
352 continue; /* - but be defensive */
353 }
354
355
356 delfirst = INVALID_SOCKET;
357 pdelprev = &delfirst;
358
359 for (i = 0; (nfds_t)i < pollmgr.nfds && nready > 0; ++i) {
360 struct pollmgr_handler *handler;
361 SOCKET fd;
362 int revents, nevents;
363
364 fd = pollmgr.fds[i].fd;
365 revents = pollmgr.fds[i].revents;
366
367 /*
368 * Channel handlers can request deletion of dynamic slots
369 * by calling pollmgr_del_slot() that clobbers slot's fd.
370 */
371 if (fd == INVALID_SOCKET && i >= POLLMGR_SLOT_FIRST_DYNAMIC) {
372 /* adjust count if events were pending for that slot */
373 if (revents != 0) {
374 --nready;
375 }
376
377 /* pretend that slot handler requested deletion */
378 nevents = -1;
379 goto update_events;
380 }
381
382 if (revents == 0) {
383 continue; /* next fd */
384 }
385 --nready;
386
387 handler = pollmgr.handlers[i];
388
389 if (handler != NULL && handler->callback != NULL) {
390#if LWIP_PROXY_DEBUG /* DEBUG */
391 if (i < POLLMGR_SLOT_FIRST_DYNAMIC) {
392 if (revents == POLLIN) {
393 DPRINTF2(("%s: ch %d\n", __func__, i));
394 }
395 else {
396 DPRINTF2(("%s: ch %d @ revents 0x%x!\n",
397 __func__, i, revents));
398 }
399 }
400 else {
401 DPRINTF2(("%s: fd %d @ revents 0x%x\n",
402 __func__, fd, revents));
403 }
404#endif /* DEBUG */
405 nevents = (*handler->callback)(handler, fd, revents);
406 }
407 else {
408 DPRINTF0(("%s: invalid handler for fd %d: ", __func__, fd));
409 if (handler == NULL) {
410 DPRINTF0(("NULL\n"));
411 }
412 else {
413 DPRINTF0(("%p (callback = NULL)\n", (void *)handler));
414 }
415 nevents = -1; /* delete it */
416 }
417
418 update_events:
419 if (nevents >= 0) {
420 if (nevents != pollmgr.fds[i].events) {
421 DPRINTF2(("%s: fd %d ! nevents 0x%x\n",
422 __func__, fd, nevents));
423 }
424 pollmgr.fds[i].events = nevents;
425 }
426 else if (i < POLLMGR_SLOT_FIRST_DYNAMIC) {
427 /* Don't garbage-collect channels. */
428 DPRINTF2(("%s: fd %d ! DELETED (channel %d)\n",
429 __func__, fd, i));
430 pollmgr.fds[i].fd = INVALID_SOCKET;
431 pollmgr.fds[i].events = 0;
432 pollmgr.fds[i].revents = 0;
433 pollmgr.handlers[i] = NULL;
434 }
435 else {
436 DPRINTF2(("%s: fd %d ! DELETED\n", __func__, fd));
437
438 /* schedule for deletion (see g/c loop for details) */
439 *pdelprev = i; /* make previous entry point to us */
440 pdelprev = &pollmgr.fds[i].fd;
441
442 pollmgr.fds[i].fd = INVALID_SOCKET; /* end of list (for now) */
443 pollmgr.fds[i].events = POLLMGR_GARBAGE;
444 pollmgr.fds[i].revents = 0;
445 pollmgr.handlers[i] = NULL;
446 }
447 } /* processing loop */
448
449
450 /*
451 * Garbage collect and compact the array.
452 *
453 * We overload pollfd::fd of garbage entries to store the
454 * index of the next garbage entry. The garbage list is
455 * co-directional with the fds array. The index of the first
456 * entry is in "delfirst", the last entry "points to"
457 * INVALID_SOCKET.
458 *
459 * See update_events code for nevents < 0 at the end of the
460 * processing loop above.
461 */
462 while (delfirst != INVALID_SOCKET) {
463 const int last = pollmgr.nfds - 1;
464
465 /*
466 * We want a live entry in the last slot to swap into the
467 * freed slot, so make sure we have one.
468 */
469 if (pollmgr.fds[last].events == POLLMGR_GARBAGE /* garbage */
470 || pollmgr.fds[last].fd == INVALID_SOCKET) /* or killed */
471 {
472 /* drop garbage entry at the end of the array */
473 --pollmgr.nfds;
474
475 if (delfirst == last) {
476 /* congruent to delnext >= pollmgr.nfds test below */
477 delfirst = INVALID_SOCKET; /* done */
478 }
479 }
480 else {
481 const SOCKET delnext = pollmgr.fds[delfirst].fd;
482
483 /* copy live entry at the end to the first slot being freed */
484 pollmgr.fds[delfirst] = pollmgr.fds[last]; /* struct copy */
485 pollmgr.handlers[delfirst] = pollmgr.handlers[last];
486 pollmgr.handlers[delfirst]->slot = (int)delfirst;
487 --pollmgr.nfds;
488
489 if ((nfds_t)delnext >= pollmgr.nfds) {
490 delfirst = INVALID_SOCKET; /* done */
491 }
492 else {
493 delfirst = delnext;
494 }
495 }
496
497 pollmgr.fds[last].fd = INVALID_SOCKET;
498 pollmgr.fds[last].events = 0;
499 pollmgr.fds[last].revents = 0;
500 pollmgr.handlers[last] = NULL;
501 }
502 } /* poll loop */
503}
504
505
506/**
507 * Create strongly held refptr.
508 */
509struct pollmgr_refptr *
510pollmgr_refptr_create(struct pollmgr_handler *ptr)
511{
512 struct pollmgr_refptr *rp;
513
514 LWIP_ASSERT1(ptr != NULL);
515
516 rp = (struct pollmgr_refptr *)malloc(sizeof (*rp));
517 if (rp == NULL) {
518 return NULL;
519 }
520
521 sys_mutex_new(&rp->lock);
522 rp->ptr = ptr;
523 rp->strong = 1;
524 rp->weak = 0;
525
526 return rp;
527}
528
529
530static void
531pollmgr_refptr_delete(struct pollmgr_refptr *rp)
532{
533 if (rp == NULL) {
534 return;
535 }
536
537 LWIP_ASSERT1(rp->strong == 0);
538 LWIP_ASSERT1(rp->weak == 0);
539
540 sys_mutex_free(&rp->lock);
541 free(rp);
542}
543
544
545/**
546 * Add weak reference before "rp" is sent over a poll manager channel.
547 */
548void
549pollmgr_refptr_weak_ref(struct pollmgr_refptr *rp)
550{
551 sys_mutex_lock(&rp->lock);
552
553 LWIP_ASSERT1(rp->ptr != NULL);
554 LWIP_ASSERT1(rp->strong > 0);
555
556 ++rp->weak;
557
558 sys_mutex_unlock(&rp->lock);
559}
560
561
562/**
563 * Try to get the pointer from implicitely weak reference we've got
564 * from a channel.
565 *
566 * If we detect that the object is still strongly referenced, but no
567 * longer registered with the poll manager we abort strengthening
568 * conversion here b/c lwip thread callback is already scheduled to
569 * destruct the object.
570 */
571struct pollmgr_handler *
572pollmgr_refptr_get(struct pollmgr_refptr *rp)
573{
574 struct pollmgr_handler *handler;
575 size_t weak;
576
577 sys_mutex_lock(&rp->lock);
578
579 LWIP_ASSERT1(rp->weak > 0);
580 weak = --rp->weak;
581
582 handler = rp->ptr;
583 if (handler == NULL) {
584 LWIP_ASSERT1(rp->strong == 0);
585 sys_mutex_unlock(&rp->lock);
586 if (weak == 0) {
587 pollmgr_refptr_delete(rp);
588 }
589 return NULL;
590 }
591
592 LWIP_ASSERT1(rp->strong == 1);
593
594 /*
595 * Here we woild do:
596 *
597 * ++rp->strong;
598 *
599 * and then, after channel handler is done, we would decrement it
600 * back.
601 *
602 * Instead we check that the object is still registered with poll
603 * manager. If it is, there's no race with lwip thread trying to
604 * drop its strong reference, as lwip thread callback to destruct
605 * the object is always scheduled by its poll manager callback.
606 *
607 * Conversly, if we detect that the object is no longer registered
608 * with poll manager, we immediately abort. Since channel handler
609 * can't do anything useful anyway and would have to return
610 * immediately.
611 *
612 * Since channel handler would always find rp->strong as it had
613 * left it, just elide extra strong reference creation to avoid
614 * the whole back-and-forth.
615 */
616
617 if (handler->slot < 0) { /* no longer polling */
618 sys_mutex_unlock(&rp->lock);
619 return NULL;
620 }
621
622 sys_mutex_unlock(&rp->lock);
623 return handler;
624}
625
626
627/**
628 * Remove (the only) strong reference.
629 *
630 * If it were real strong/weak pointers, we should also call
631 * destructor for the referenced object, but
632 */
633void
634pollmgr_refptr_unref(struct pollmgr_refptr *rp)
635{
636 sys_mutex_lock(&rp->lock);
637
638 LWIP_ASSERT1(rp->strong == 1);
639 --rp->strong;
640
641 if (rp->strong > 0) {
642 sys_mutex_unlock(&rp->lock);
643 }
644 else {
645 size_t weak;
646
647 /* void *ptr = rp->ptr; */
648 rp->ptr = NULL;
649
650 /* delete ptr; // see doc comment */
651
652 weak = rp->weak;
653 sys_mutex_unlock(&rp->lock);
654 if (weak == 0) {
655 pollmgr_refptr_delete(rp);
656 }
657 }
658}
Note: See TracBrowser for help on using the repository browser.

© 2024 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette