/* -*- indent-tabs-mode: nil; -*- */ #include "winutils.h" #include "proxy_pollmgr.h" #include "proxy.h" #ifndef RT_OS_WINDOWS #include #include #include #include #include #include #include #include #include #include #else #include #include #include #include "winpoll.h" #endif #define POLLMGR_GARBAGE (-1) struct pollmgr { struct pollfd *fds; struct pollmgr_handler **handlers; nfds_t capacity; /* allocated size of the arrays */ nfds_t nfds; /* part of the arrays in use */ /* channels (socketpair) for static slots */ SOCKET chan[POLLMGR_SLOT_STATIC_COUNT][2]; #define POLLMGR_CHFD_RD 0 /* - pollmgr side */ #define POLLMGR_CHFD_WR 1 /* - client side */ } pollmgr; static void pollmgr_loop(void); static void pollmgr_add_at(int, struct pollmgr_handler *, SOCKET, int); static void pollmgr_refptr_delete(struct pollmgr_refptr *); /* * We cannot portably peek at the length of the incoming datagram and * pre-allocate pbuf chain to recvmsg() directly to it. On Linux it's * possible to recv with MSG_PEEK|MSG_TRUC, but extra syscall is * probably more expensive (haven't measured) than doing an extra copy * of data, since typical UDP datagrams are small enough to avoid * fragmentation. * * We can use shared buffer here since we read from sockets * sequentially in a loop over pollfd. */ u8_t pollmgr_udpbuf[64 * 1024]; int pollmgr_init(void) { struct pollfd *newfds; struct pollmgr_handler **newhdls; nfds_t newcap; int status; nfds_t i; pollmgr.fds = NULL; pollmgr.handlers = NULL; pollmgr.capacity = 0; pollmgr.nfds = 0; for (i = 0; i < POLLMGR_SLOT_STATIC_COUNT; ++i) { pollmgr.chan[i][POLLMGR_CHFD_RD] = -1; pollmgr.chan[i][POLLMGR_CHFD_WR] = -1; } for (i = 0; i < POLLMGR_SLOT_STATIC_COUNT; ++i) { #ifndef RT_OS_WINDOWS status = socketpair(PF_LOCAL, SOCK_DGRAM, 0, pollmgr.chan[i]); if (status < 0) { perror("socketpair"); goto cleanup_close; } #else status = RTWinSocketPair(PF_INET, SOCK_DGRAM, 0, pollmgr.chan[i]); AssertRCReturn(status, -1); if (RT_FAILURE(status)) { perror("socketpair"); goto cleanup_close; } #endif } newcap = 16; /* XXX: magic */ LWIP_ASSERT1(newcap >= POLLMGR_SLOT_STATIC_COUNT); newfds = (struct pollfd *) malloc(newcap * sizeof(*pollmgr.fds)); if (newfds == NULL) { perror("calloc"); goto cleanup_close; } newhdls = (struct pollmgr_handler **) malloc(newcap * sizeof(*pollmgr.handlers)); if (newhdls == NULL) { perror("malloc"); free(newfds); goto cleanup_close; } pollmgr.capacity = newcap; pollmgr.fds = newfds; pollmgr.handlers = newhdls; pollmgr.nfds = POLLMGR_SLOT_STATIC_COUNT; for (i = 0; i < pollmgr.capacity; ++i) { pollmgr.fds[i].fd = INVALID_SOCKET; pollmgr.fds[i].events = 0; pollmgr.fds[i].revents = 0; } return 0; cleanup_close: for (i = 0; i < POLLMGR_SLOT_STATIC_COUNT; ++i) { SOCKET *chan = pollmgr.chan[i]; if (chan[POLLMGR_CHFD_RD] >= 0) { closesocket(chan[POLLMGR_CHFD_RD]); closesocket(chan[POLLMGR_CHFD_WR]); } } return -1; } /* * Must be called before pollmgr loop is started, so no locking. */ SOCKET pollmgr_add_chan(int slot, struct pollmgr_handler *handler) { if (slot >= POLLMGR_SLOT_FIRST_DYNAMIC) { handler->slot = -1; return -1; } pollmgr_add_at(slot, handler, pollmgr.chan[slot][POLLMGR_CHFD_RD], POLLIN); return pollmgr.chan[slot][POLLMGR_CHFD_WR]; } /* * Must be called from pollmgr loop (via callbacks), so no locking. */ int pollmgr_add(struct pollmgr_handler *handler, SOCKET fd, int events) { int slot; DPRINTF2(("%s: new fd %d\n", __func__, fd)); if (pollmgr.nfds == pollmgr.capacity) { struct pollfd *newfds; struct pollmgr_handler **newhdls; nfds_t newcap; nfds_t i; newcap = pollmgr.capacity * 2; newfds = (struct pollfd *) realloc(pollmgr.fds, newcap * sizeof(*pollmgr.fds)); if (newfds == NULL) { perror("realloc"); handler->slot = -1; return -1; } pollmgr.fds = newfds; /* don't crash/leak if realloc(handlers) fails */ /* but don't update capacity yet! */ newhdls = (struct pollmgr_handler **) realloc(pollmgr.handlers, newcap * sizeof(*pollmgr.handlers)); if (newhdls == NULL) { perror("realloc"); /* if we failed to realloc here, then fds points to the * new array, but we pretend we still has old capacity */ handler->slot = -1; return -1; } pollmgr.handlers = newhdls; pollmgr.capacity = newcap; for (i = pollmgr.nfds; i < newcap; ++i) { newfds[i].fd = INVALID_SOCKET; newfds[i].events = 0; newfds[i].revents = 0; newhdls[i] = NULL; } } slot = pollmgr.nfds; ++pollmgr.nfds; pollmgr_add_at(slot, handler, fd, events); return slot; } static void pollmgr_add_at(int slot, struct pollmgr_handler *handler, SOCKET fd, int events) { pollmgr.fds[slot].fd = fd; pollmgr.fds[slot].events = events; pollmgr.fds[slot].revents = 0; pollmgr.handlers[slot] = handler; handler->slot = slot; } ssize_t pollmgr_chan_send(int slot, void *buf, size_t nbytes) { SOCKET fd; ssize_t nsent; if (slot >= POLLMGR_SLOT_FIRST_DYNAMIC) { return -1; } fd = pollmgr.chan[slot][POLLMGR_CHFD_WR]; nsent = send(fd, buf, (int)nbytes, 0); if (nsent == SOCKET_ERROR) { warn("send on chan %d", slot); return -1; } else if ((size_t)nsent != nbytes) { warnx("send on chan %d: datagram truncated to %u bytes", slot, (unsigned int)nsent); return -1; } return nsent; } /** * Receive a pointer sent over poll manager channel. */ void * pollmgr_chan_recv_ptr(struct pollmgr_handler *handler, SOCKET fd, int revents) { void *ptr; ssize_t nread; if (revents & POLLNVAL) { errx(EXIT_FAILURE, "chan %d: fd invalid", (int)handler->slot); /* NOTREACHED */ } if (revents & (POLLERR | POLLHUP)) { errx(EXIT_FAILURE, "chan %d: fd error", (int)handler->slot); /* NOTREACHED */ } LWIP_ASSERT1(revents & POLLIN); nread = recv(fd, (char *)&ptr, sizeof(ptr), 0); if (nread == SOCKET_ERROR) { err(EXIT_FAILURE, "chan %d: recv", (int)handler->slot); /* NOTREACHED */ } if (nread != sizeof(ptr)) { errx(EXIT_FAILURE, "chan %d: recv: read %d bytes", (int)handler->slot, (int)nread); /* NOTREACHED */ } return ptr; } void pollmgr_update_events(int slot, int events) { LWIP_ASSERT1(slot >= POLLMGR_SLOT_FIRST_DYNAMIC); LWIP_ASSERT1((nfds_t)slot < pollmgr.nfds); pollmgr.fds[slot].events = events; } void pollmgr_del_slot(int slot) { LWIP_ASSERT1(slot >= POLLMGR_SLOT_FIRST_DYNAMIC); DPRINTF2(("%s(%d): fd %d ! DELETED\n", __func__, slot, pollmgr.fds[slot].fd)); pollmgr.fds[slot].fd = INVALID_SOCKET; /* see poll loop */ } void pollmgr_thread(void *ignored) { LWIP_UNUSED_ARG(ignored); pollmgr_loop(); } static void pollmgr_loop(void) { int nready; SOCKET delfirst; SOCKET *pdelprev; int i; for (;;) { #ifndef RT_OS_WINDOWS nready = poll(pollmgr.fds, pollmgr.nfds, -1); #else int rc = RTWinPoll(pollmgr.fds, pollmgr.nfds,RT_INDEFINITE_WAIT, &nready); if (RT_FAILURE(rc)) { err(EXIT_FAILURE, "poll"); /* XXX: what to do on error? */ /* NOTREACHED*/ } #endif DPRINTF2(("%s: ready %d fd%s\n", __func__, nready, (nready == 1 ? "" : "s"))); if (nready < 0) { if (errno == EINTR) { continue; } err(EXIT_FAILURE, "poll"); /* XXX: what to do on error? */ /* NOTREACHED*/ } else if (nready == 0) { /* cannot happen, we wait forever (-1) */ continue; /* - but be defensive */ } delfirst = INVALID_SOCKET; pdelprev = &delfirst; for (i = 0; (nfds_t)i < pollmgr.nfds && nready > 0; ++i) { struct pollmgr_handler *handler; SOCKET fd; int revents, nevents; fd = pollmgr.fds[i].fd; revents = pollmgr.fds[i].revents; /* * Channel handlers can request deletion of dynamic slots * by calling pollmgr_del_slot() that clobbers slot's fd. */ if (fd == INVALID_SOCKET && i >= POLLMGR_SLOT_FIRST_DYNAMIC) { /* adjust count if events were pending for that slot */ if (revents != 0) { --nready; } /* pretend that slot handler requested deletion */ nevents = -1; goto update_events; } if (revents == 0) { continue; /* next fd */ } --nready; handler = pollmgr.handlers[i]; if (handler != NULL && handler->callback != NULL) { #if LWIP_PROXY_DEBUG /* DEBUG */ if (i < POLLMGR_SLOT_FIRST_DYNAMIC) { if (revents == POLLIN) { DPRINTF2(("%s: ch %d\n", __func__, i)); } else { DPRINTF2(("%s: ch %d @ revents 0x%x!\n", __func__, i, revents)); } } else { DPRINTF2(("%s: fd %d @ revents 0x%x\n", __func__, fd, revents)); } #endif /* DEBUG */ nevents = (*handler->callback)(handler, fd, revents); } else { DPRINTF0(("%s: invalid handler for fd %d: ", __func__, fd)); if (handler == NULL) { DPRINTF0(("NULL\n")); } else { DPRINTF0(("%p (callback = NULL)\n", (void *)handler)); } nevents = -1; /* delete it */ } update_events: if (nevents >= 0) { if (nevents != pollmgr.fds[i].events) { DPRINTF2(("%s: fd %d ! nevents 0x%x\n", __func__, fd, nevents)); } pollmgr.fds[i].events = nevents; } else if (i < POLLMGR_SLOT_FIRST_DYNAMIC) { /* Don't garbage-collect channels. */ DPRINTF2(("%s: fd %d ! DELETED (channel %d)\n", __func__, fd, i)); pollmgr.fds[i].fd = INVALID_SOCKET; pollmgr.fds[i].events = 0; pollmgr.fds[i].revents = 0; pollmgr.handlers[i] = NULL; } else { DPRINTF2(("%s: fd %d ! DELETED\n", __func__, fd)); /* schedule for deletion (see g/c loop for details) */ *pdelprev = i; /* make previous entry point to us */ pdelprev = &pollmgr.fds[i].fd; pollmgr.fds[i].fd = INVALID_SOCKET; /* end of list (for now) */ pollmgr.fds[i].events = POLLMGR_GARBAGE; pollmgr.fds[i].revents = 0; pollmgr.handlers[i] = NULL; } } /* processing loop */ /* * Garbage collect and compact the array. * * We overload pollfd::fd of garbage entries to store the * index of the next garbage entry. The garbage list is * co-directional with the fds array. The index of the first * entry is in "delfirst", the last entry "points to" * INVALID_SOCKET. * * See update_events code for nevents < 0 at the end of the * processing loop above. */ while (delfirst != INVALID_SOCKET) { const int last = pollmgr.nfds - 1; /* * We want a live entry in the last slot to swap into the * freed slot, so make sure we have one. */ if (pollmgr.fds[last].events == POLLMGR_GARBAGE /* garbage */ || pollmgr.fds[last].fd == INVALID_SOCKET) /* or killed */ { /* drop garbage entry at the end of the array */ --pollmgr.nfds; if (delfirst == last) { /* congruent to delnext >= pollmgr.nfds test below */ delfirst = INVALID_SOCKET; /* done */ } } else { const SOCKET delnext = pollmgr.fds[delfirst].fd; /* copy live entry at the end to the first slot being freed */ pollmgr.fds[delfirst] = pollmgr.fds[last]; /* struct copy */ pollmgr.handlers[delfirst] = pollmgr.handlers[last]; pollmgr.handlers[delfirst]->slot = (int)delfirst; --pollmgr.nfds; if ((nfds_t)delnext >= pollmgr.nfds) { delfirst = INVALID_SOCKET; /* done */ } else { delfirst = delnext; } } pollmgr.fds[last].fd = INVALID_SOCKET; pollmgr.fds[last].events = 0; pollmgr.fds[last].revents = 0; pollmgr.handlers[last] = NULL; } } /* poll loop */ } /** * Create strongly held refptr. */ struct pollmgr_refptr * pollmgr_refptr_create(struct pollmgr_handler *ptr) { struct pollmgr_refptr *rp; LWIP_ASSERT1(ptr != NULL); rp = (struct pollmgr_refptr *)malloc(sizeof (*rp)); if (rp == NULL) { return NULL; } sys_mutex_new(&rp->lock); rp->ptr = ptr; rp->strong = 1; rp->weak = 0; return rp; } static void pollmgr_refptr_delete(struct pollmgr_refptr *rp) { if (rp == NULL) { return; } LWIP_ASSERT1(rp->strong == 0); LWIP_ASSERT1(rp->weak == 0); sys_mutex_free(&rp->lock); free(rp); } /** * Add weak reference before "rp" is sent over a poll manager channel. */ void pollmgr_refptr_weak_ref(struct pollmgr_refptr *rp) { sys_mutex_lock(&rp->lock); LWIP_ASSERT1(rp->ptr != NULL); LWIP_ASSERT1(rp->strong > 0); ++rp->weak; sys_mutex_unlock(&rp->lock); } /** * Try to get the pointer from implicitely weak reference we've got * from a channel. * * If we detect that the object is still strongly referenced, but no * longer registered with the poll manager we abort strengthening * conversion here b/c lwip thread callback is already scheduled to * destruct the object. */ struct pollmgr_handler * pollmgr_refptr_get(struct pollmgr_refptr *rp) { struct pollmgr_handler *handler; size_t weak; sys_mutex_lock(&rp->lock); LWIP_ASSERT1(rp->weak > 0); weak = --rp->weak; handler = rp->ptr; if (handler == NULL) { LWIP_ASSERT1(rp->strong == 0); sys_mutex_unlock(&rp->lock); if (weak == 0) { pollmgr_refptr_delete(rp); } return NULL; } LWIP_ASSERT1(rp->strong == 1); /* * Here we woild do: * * ++rp->strong; * * and then, after channel handler is done, we would decrement it * back. * * Instead we check that the object is still registered with poll * manager. If it is, there's no race with lwip thread trying to * drop its strong reference, as lwip thread callback to destruct * the object is always scheduled by its poll manager callback. * * Conversly, if we detect that the object is no longer registered * with poll manager, we immediately abort. Since channel handler * can't do anything useful anyway and would have to return * immediately. * * Since channel handler would always find rp->strong as it had * left it, just elide extra strong reference creation to avoid * the whole back-and-forth. */ if (handler->slot < 0) { /* no longer polling */ sys_mutex_unlock(&rp->lock); return NULL; } sys_mutex_unlock(&rp->lock); return handler; } /** * Remove (the only) strong reference. * * If it were real strong/weak pointers, we should also call * destructor for the referenced object, but */ void pollmgr_refptr_unref(struct pollmgr_refptr *rp) { sys_mutex_lock(&rp->lock); LWIP_ASSERT1(rp->strong == 1); --rp->strong; if (rp->strong > 0) { sys_mutex_unlock(&rp->lock); } else { size_t weak; /* void *ptr = rp->ptr; */ rp->ptr = NULL; /* delete ptr; // see doc comment */ weak = rp->weak; sys_mutex_unlock(&rp->lock); if (weak == 0) { pollmgr_refptr_delete(rp); } } }