VirtualBox

Ignore:
Timestamp:
Sep 14, 2017 2:15:39 AM (7 years ago)
Author:
vboxsync
Message:

Rewrite "channels". Use RTREQQUEUE to schedule requests on the
pollmgr thread and just one socket to wake up poll(2). To minimize
churn keep the old API, so no other files are affected.

Location:
trunk/src/VBox/NetworkServices/NAT
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • trunk/src/VBox/NetworkServices/NAT/proxy_pollmgr.c

    r63285 r68753  
    2828#include <err.h>
    2929#include <errno.h>
     30#include <fcntl.h>
    3031#include <poll.h>
    3132#include <stdio.h>
     
    4142#endif
    4243
     44#include <iprt/req.h>
     45#include <iprt/err.h>
     46
     47
    4348#define POLLMGR_GARBAGE (-1)
     49
     50
     51enum {
     52    POLLMGR_QUEUE = 0,
     53
     54    POLLMGR_SLOT_STATIC_COUNT,
     55    POLLMGR_SLOT_FIRST_DYNAMIC = POLLMGR_SLOT_STATIC_COUNT
     56};
     57
     58
     59struct pollmgr_chan {
     60    struct pollmgr_handler *handler;
     61    void *arg;
     62    bool arg_valid;
     63};
    4464
    4565struct pollmgr {
     
    5373#define POLLMGR_CHFD_RD 0       /* - pollmgr side */
    5474#define POLLMGR_CHFD_WR 1       /* - client side */
     75
     76
     77    /* emulate channels with request queue */
     78    RTREQQUEUE queue;
     79    struct pollmgr_handler queue_handler;
     80    struct pollmgr_chan chan_handlers[POLLMGR_CHAN_COUNT];
    5581} pollmgr;
    5682
     83
     84static int pollmgr_queue_callback(struct pollmgr_handler *, SOCKET, int);
     85static void pollmgr_chan_call_handler(int, void *);
    5786
    5887static void pollmgr_loop(void);
     
    82111    struct pollmgr_handler **newhdls;
    83112    nfds_t newcap;
    84     int status;
     113    int rc, status;
    85114    nfds_t i;
     115
     116    rc = RTReqQueueCreate(&pollmgr.queue);
     117    if (RT_FAILURE(rc))
     118        return -1;
    86119
    87120    pollmgr.fds = NULL;
     
    97130    for (i = 0; i < POLLMGR_SLOT_STATIC_COUNT; ++i) {
    98131#ifndef RT_OS_WINDOWS
     132        int j;
     133
    99134        status = socketpair(PF_LOCAL, SOCK_DGRAM, 0, pollmgr.chan[i]);
    100135        if (status < 0) {
    101136            DPRINTF(("socketpair: %R[sockerr]\n", SOCKERRNO()));
    102137            goto cleanup_close;
     138        }
     139
     140        /* now manually make them O_NONBLOCK */
     141        for (j = 0; j < 2; ++j) {
     142            int s = pollmgr.chan[i][j];
     143            int sflags;
     144
     145            sflags = fcntl(s, F_GETFL, 0);
     146            if (sflags < 0) {
     147                DPRINTF0(("F_GETFL: %R[sockerr]\n", errno));
     148                goto cleanup_close;
     149            }
     150
     151            status = fcntl(s, F_SETFL, sflags | O_NONBLOCK);
     152            if (status < 0) {
     153                DPRINTF0(("O_NONBLOCK: %R[sockerr]\n", errno));
     154                goto cleanup_close;
     155            }
    103156        }
    104157#else
     
    141194    }
    142195
     196    /* add request queue notification */
     197    pollmgr.queue_handler.callback = pollmgr_queue_callback;
     198    pollmgr.queue_handler.data = NULL;
     199    pollmgr.queue_handler.slot = -1;
     200
     201    pollmgr_add_at(POLLMGR_QUEUE, &pollmgr.queue_handler,
     202                   pollmgr.chan[POLLMGR_QUEUE][POLLMGR_CHFD_RD],
     203                   POLLIN);
     204
    143205    return 0;
    144206
     
    157219
    158220/*
     221 * Add new channel.  We now implement channels with request queue, so
     222 * all channels get the same socket that triggers queue processing.
     223 *
    159224 * Must be called before pollmgr loop is started, so no locking.
    160225 */
     
    162227pollmgr_add_chan(int slot, struct pollmgr_handler *handler)
    163228{
    164     if (slot >= POLLMGR_SLOT_FIRST_DYNAMIC) {
    165         handler->slot = -1;
    166         return INVALID_SOCKET;
    167     }
    168 
    169     pollmgr_add_at(slot, handler, pollmgr.chan[slot][POLLMGR_CHFD_RD], POLLIN);
    170     return pollmgr.chan[slot][POLLMGR_CHFD_WR];
     229    AssertReturn(0 <= slot && slot < POLLMGR_CHAN_COUNT, INVALID_SOCKET);
     230    AssertReturn(handler != NULL && handler->callback != NULL, INVALID_SOCKET);
     231
     232    handler->slot = slot;
     233    pollmgr.chan_handlers[slot].handler = handler;
     234    return pollmgr.chan[POLLMGR_QUEUE][POLLMGR_CHFD_WR];
     235}
     236
     237
     238/*
     239 * This used to actually send data over the channel's socket.  Now we
     240 * queue a request and send single byte notification over shared
     241 * POLLMGR_QUEUE socket.
     242 */
     243ssize_t
     244pollmgr_chan_send(int slot, void *buf, size_t nbytes)
     245{
     246    static const char notification = 0x5a;
     247
     248    void *ptr;
     249    SOCKET fd;
     250    ssize_t nsent;
     251    int rc;
     252
     253    AssertReturn(0 <= slot && slot < POLLMGR_CHAN_COUNT, -1);
     254
     255    /*
     256     * XXX: Hack alert.  We only ever "sent" single pointer which was
     257     * simultaneously both the wakeup event for the poll and the
     258     * argument for the channel handler that it read from the channel.
     259     * So now we pass this pointer to the request and arrange for the
     260     * handler to "read" it when it asks for it.
     261     */
     262    if (nbytes != sizeof(void *)) {
     263        return -1;
     264    }
     265
     266    ptr = *(void **)buf;
     267
     268    rc = RTReqQueueCallEx(pollmgr.queue, NULL, 0,
     269                          RTREQFLAGS_VOID | RTREQFLAGS_NO_WAIT,
     270                          (PFNRT)pollmgr_chan_call_handler, 2,
     271                          slot, ptr);
     272
     273    fd = pollmgr.chan[POLLMGR_QUEUE][POLLMGR_CHFD_WR];
     274    nsent = send(fd, &notification, 1, 0);
     275    if (nsent == SOCKET_ERROR) {
     276        DPRINTF(("send on chan %d: %R[sockerr]\n", slot, SOCKERRNO()));
     277        return -1;
     278    }
     279    else if ((size_t)nsent != 1) {
     280        DPRINTF(("send on chan %d: datagram truncated to %u bytes",
     281                 slot, (unsigned int)nsent));
     282        return -1;
     283    }
     284
     285    /* caller thinks it's sending the pointer */
     286    return sizeof(void *);
     287}
     288
     289
     290/*
     291 * pollmgr_chan_send() sent us a notification, process the queue.
     292 */
     293static int
     294pollmgr_queue_callback(struct pollmgr_handler *handler, SOCKET fd, int revents)
     295{
     296    ssize_t nread;
     297    int sockerr;
     298    int rc;
     299
     300    RT_NOREF(handler, revents);
     301    Assert(pollmgr.queue != NIL_RTREQQUEUE);
     302
     303    nread = recv(fd, (char *)pollmgr_udpbuf, sizeof(pollmgr_udpbuf), 0);
     304    sockerr = SOCKERRNO();      /* save now, may be clobbered */
     305
     306    if (nread == SOCKET_ERROR) {
     307        DPRINTF0(("%s: recv: %R[sockerr]\n", __func__, sockerr));
     308        return POLLIN;
     309    }
     310
     311    DPRINTF2(("%s: read %zd\n", __func__, nread));
     312    if (nread == 0) {
     313        return POLLIN;
     314    }
     315
     316    rc = RTReqQueueProcess(pollmgr.queue, 0);
     317    if (RT_UNLIKELY(rc != VERR_TIMEOUT && RT_FAILURE_NP(rc))) {
     318        DPRINTF0(("%s: RTReqQueueProcess: %Rrc\n", __func__, rc));
     319    }
     320
     321    return POLLIN;
     322}
     323
     324
     325/*
     326 * Queued requests use this function to emulate the call to the
     327 * handler's callback.
     328 */
     329static void
     330pollmgr_chan_call_handler(int slot, void *arg)
     331{
     332    struct pollmgr_handler *handler;
     333    int nevents;
     334
     335    AssertReturnVoid(0 <= slot && slot < POLLMGR_CHAN_COUNT);
     336
     337    handler = pollmgr.chan_handlers[slot].handler;
     338    AssertReturnVoid(handler != NULL && handler->callback != NULL);
     339
     340    /* arrange for pollmgr_chan_recv_ptr() to "receive" the arg */
     341    pollmgr.chan_handlers[slot].arg = arg;
     342    pollmgr.chan_handlers[slot].arg_valid = true;
     343
     344    nevents = handler->callback(handler, -1, POLLIN);
     345    if (nevents != POLLIN) {
     346        DPRINTF2(("%s: nevents=0x%x!\n", nevents));
     347    }
     348}
     349
     350
     351/*
     352 * "Receive" a pointer "sent" over poll manager channel.
     353 */
     354void *
     355pollmgr_chan_recv_ptr(struct pollmgr_handler *handler, SOCKET fd, int revents)
     356{
     357    int slot;
     358    void *ptr;
     359
     360    RT_NOREF(fd);
     361
     362    slot = handler->slot;
     363    Assert(0 <= slot && slot < POLLMGR_CHAN_COUNT);
     364
     365    if (revents & POLLNVAL) {
     366        errx(EXIT_FAILURE, "chan %d: fd invalid", (int)handler->slot);
     367        /* NOTREACHED */
     368    }
     369
     370    if (revents & (POLLERR | POLLHUP)) {
     371        errx(EXIT_FAILURE, "chan %d: fd error", (int)handler->slot);
     372        /* NOTREACHED */
     373    }
     374
     375    LWIP_ASSERT1(revents & POLLIN);
     376
     377    if (!pollmgr.chan_handlers[slot].arg_valid) {
     378        err(EXIT_FAILURE, "chan %d: recv", (int)handler->slot);
     379        /* NOTREACHED */
     380    }
     381
     382    ptr = pollmgr.chan_handlers[slot].arg;
     383    pollmgr.chan_handlers[slot].arg_valid = false;
     384
     385    return ptr;
    171386}
    172387
     
    239454
    240455    handler->slot = slot;
    241 }
    242 
    243 
    244 ssize_t
    245 pollmgr_chan_send(int slot, void *buf, size_t nbytes)
    246 {
    247     SOCKET fd;
    248     ssize_t nsent;
    249 
    250     if (slot >= POLLMGR_SLOT_FIRST_DYNAMIC) {
    251         return -1;
    252     }
    253 
    254     fd = pollmgr.chan[slot][POLLMGR_CHFD_WR];
    255     nsent = send(fd, buf, (int)nbytes, 0);
    256     if (nsent == SOCKET_ERROR) {
    257         DPRINTF(("send on chan %d: %R[sockerr]\n", slot, SOCKERRNO()));
    258         return -1;
    259     }
    260     else if ((size_t)nsent != nbytes) {
    261         DPRINTF(("send on chan %d: datagram truncated to %u bytes",
    262                  slot, (unsigned int)nsent));
    263         return -1;
    264     }
    265 
    266     return nsent;
    267 }
    268 
    269 
    270 /**
    271  * Receive a pointer sent over poll manager channel.
    272  */
    273 void *
    274 pollmgr_chan_recv_ptr(struct pollmgr_handler *handler, SOCKET fd, int revents)
    275 {
    276     void *ptr;
    277     ssize_t nread;
    278     NOREF(handler);
    279 
    280     if (revents & POLLNVAL) {
    281         errx(EXIT_FAILURE, "chan %d: fd invalid", (int)handler->slot);
    282         /* NOTREACHED */
    283     }
    284 
    285     if (revents & (POLLERR | POLLHUP)) {
    286         errx(EXIT_FAILURE, "chan %d: fd error", (int)handler->slot);
    287         /* NOTREACHED */
    288     }
    289 
    290     LWIP_ASSERT1(revents & POLLIN);
    291     nread = recv(fd, (char *)&ptr, sizeof(ptr), 0);
    292 
    293     if (nread == SOCKET_ERROR) {
    294         err(EXIT_FAILURE, "chan %d: recv", (int)handler->slot);
    295         /* NOTREACHED */
    296     }
    297     if (nread != sizeof(ptr)) {
    298         errx(EXIT_FAILURE, "chan %d: recv: read %d bytes",
    299              (int)handler->slot, (int)nread);
    300         /* NOTREACHED */
    301     }
    302 
    303     return ptr;
    304456}
    305457
  • trunk/src/VBox/NetworkServices/NAT/proxy_pollmgr.h

    r62481 r68753  
    3636    POLLMGR_CHAN_PORTFWD,       /* add/remove port forwarding rules */
    3737
    38     POLLMGR_SLOT_STATIC_COUNT,
    39     POLLMGR_SLOT_FIRST_DYNAMIC = POLLMGR_SLOT_STATIC_COUNT
     38    POLLMGR_CHAN_COUNT
    4039};
    4140
Note: See TracChangeset for help on using the changeset viewer.

© 2024 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette