VirtualBox

Changeset 26396 in vbox


Ignore:
Timestamp:
Feb 9, 2010 6:32:26 PM (15 years ago)
Author:
vboxsync
Message:

Web service: make no. of threads a cmd line param, encapsulate queue code

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/src/VBox/Main/webservice/vboxweb.cpp

    r26395 r26396  
    142142        { "--timeout",          't', RTGETOPT_REQ_UINT32 },
    143143        { "--check-interval",   'i', RTGETOPT_REQ_UINT32 },
     144        { "--threads",          'T', RTGETOPT_REQ_UINT32 },
    144145        { "--verbose",          'v', RTGETOPT_REQ_NOTHING },
    145146        { "--logfile",          'F', RTGETOPT_REQ_STRING },
     
    182183            case 't':
    183184                pcszDescr = "Session timeout in seconds; 0 = disable timeouts (" DEFAULT_TIMEOUT_SECS_STRING ").";
     185            break;
     186
     187            case 'T':
     188                pcszDescr = "Number of worker threads to run in parallel (5).";
    184189            break;
    185190
     
    295300    }
    296301
     302    /**
     303     * Adds the given socket to the SOAP queue and posts the
     304     * member event sem to wake up the workers.
     305     * @param s Socket from soap_accept() which has work to do.
     306     */
     307    uint32_t add(int s)
     308    {
     309        uint32_t cItems;
     310        // enqueue the socket of this connection and post eventsem so
     311        // that one of our threads can pick it up
     312        util::AutoWriteLock qlock(m_mutex COMMA_LOCKVAL_SRC_POS);
     313        m_llSocketsQ.push_back(s);
     314        cItems = m_llSocketsQ.size();
     315        qlock.release();
     316
     317        // unblock one of the worker threads
     318        RTSemEventMultiSignal(m_event);
     319
     320        return cItems;
     321    }
     322
     323    /**
     324     * Blocks the current thread until work comes in; then returns
     325     * the SOAP socket which has work to do. This reduces m_cIdleThreads
     326     * by one, and the caller MUST call done() when it's done processing.
     327     * @return
     328     */
     329    int get()
     330    {
     331        while (1)
     332        {
     333            // wait for something to happen
     334            RTSemEventMultiWait(m_event, RT_INDEFINITE_WAIT);
     335
     336            util::AutoWriteLock qlock(m_mutex COMMA_LOCKVAL_SRC_POS);
     337            if (m_llSocketsQ.size())
     338            {
     339                int socket = m_llSocketsQ.front();
     340                m_llSocketsQ.pop_front();
     341                --m_cIdleThreads;
     342
     343                // reset the multi event only if the queue is now empty; otherwise
     344                // another thread will also wake up when we release the mutex and
     345                // process another one
     346                if (m_llSocketsQ.size() == 0)
     347                    RTSemEventMultiReset(m_event);
     348
     349                qlock.release();
     350
     351                return socket;
     352            }
     353
     354            // nothing to do: keep looping
     355        }
     356    }
     357
     358    /**
     359     * To be called by a thread after fetching an item from the
     360     * queue via get() and having finished its lengthy processing.
     361     */
     362    void done()
     363    {
     364        util::AutoWriteLock qlock(m_mutex COMMA_LOCKVAL_SRC_POS);
     365        ++m_cIdleThreads;
     366    }
     367
    297368    util::WriteLockHandle   m_mutex;
    298369    RTSEMEVENTMULTI         m_event;
     
    305376};
    306377
     378/**
     379 * Thread function for each of the SOAP queue worker threads. This keeps
     380 * running, blocks on the event semaphore in SoapThread.SoapQ and picks
     381 * up a socket from the queue therein, which has been put there by
     382 * beginProcessing().
     383 *
     384 * @param pThread
     385 * @param pvThread
     386 * @return
     387 */
    307388int fntSoapQueue(RTTHREAD pThread, void *pvThread)
    308389{
     
    313394    while (1)
    314395    {
    315         // wait for something to happen
    316         RTSemEventMultiWait(pst->pQ->m_event, RT_INDEFINITE_WAIT);
    317 
    318         util::AutoWriteLock qlock(pst->pQ->m_mutex COMMA_LOCKVAL_SRC_POS);
    319         if (pst->pQ->m_llSocketsQ.size())
    320         {
    321             pst->soap->socket = pst->pQ->m_llSocketsQ.front();
    322             pst->pQ->m_llSocketsQ.pop_front();
    323             --pst->pQ->m_cIdleThreads;
    324 
    325             // reset the multi event only if the queue is now empty; otherwise
    326             // another thread will also wake up when we release the mutex and
    327             // process another one
    328             if (pst->pQ->m_llSocketsQ.size() == 0)
    329                 RTSemEventMultiReset(pst->pQ->m_event);
    330 
    331             qlock.release();
    332 
    333             WebLog("Thread %d is handling connection from IP=%lu.%lu.%lu.%lu socket=%d (%d threads idle)",
    334                    pst->u,
    335                    (pst->soap->ip>>24)&0xFF,
    336                    (pst->soap->ip>>16)&0xFF,
    337                    (pst->soap->ip>>8)&0xFF,
    338                    pst->soap->ip&0xFF,
    339                    pst->soap->socket,
    340                    pst->pQ->m_cIdleThreads);
    341 
    342             // process the request; this goes into the COM code in methodmaps.cpp
    343             soap_serve(pst->soap);
    344 
    345             soap_destroy(pst->soap); // clean up class instances
    346             soap_end(pst->soap); // clean up everything and close socket
    347 
    348             qlock.acquire();
    349             ++pst->pQ->m_cIdleThreads;
    350         }
     396        // wait for a socket to arrive on the queue
     397        pst->soap->socket = pst->pQ->get();
     398
     399        WebLog("T%d handles connection from IP=%lu.%lu.%lu.%lu socket=%d (%d thr idle)\n",
     400                pst->u,
     401                (pst->soap->ip>>24)&0xFF,
     402                (pst->soap->ip>>16)&0xFF,
     403                (pst->soap->ip>>8)&0xFF,
     404                pst->soap->ip&0xFF,
     405                pst->soap->socket,
     406                pst->pQ->m_cIdleThreads);
     407
     408        // process the request; this goes into the COM code in methodmaps.cpp
     409        soap_serve(pst->soap);
     410
     411        soap_destroy(pst->soap); // clean up class instances
     412        soap_end(pst->soap); // clean up everything and close socket
     413
     414        // tell the queue we're idle again
     415        pst->pQ->done();
    351416    }
    352417
     
    356421/**
    357422 * Called from main(). This implements the loop that takes SOAP calls
    358  * from HTTP and serves them, calling the COM method implementations
    359  * in the generated methodmaps.cpp code.
    360  */
    361 void beginProcessing()
     423 * from HTTP and serves them by handing sockets to the SOAP queue
     424 * worker threads.
     425 */
     426void beginProcessing(size_t cThreads)
    362427{
    363428    // set up gSOAP
     
    382447               m);
    383448
    384         // initialize thread queue, mutex and eventsem, create 10 threads
    385         SoapQ soapq(10, &soap);
     449        // initialize thread queue, mutex and eventsem, create worker threads
     450        SoapQ soapq(cThreads, &soap);
    386451
    387452        for (uint64_t i = 1;
     
    397462            }
    398463
    399             // enqueue the socket of this connection and post eventsem so
    400             // that one of our threads can pick it up
    401             util::AutoWriteLock qlock(soapq.m_mutex COMMA_LOCKVAL_SRC_POS);
    402             soapq.m_llSocketsQ.push_back(s);
    403             qlock.release();
    404 
    405             WebLog("Request %llu on socket %d queued for processing\n", i, s);
    406             // unblock one of the worker threads
    407             RTSemEventMultiSignal(soapq.m_event);
     464            // add the socket to the queue and tell worker threads to
     465            // pick up the jobn
     466            size_t cItemsOnQ = soapq.add(s);
     467            WebLog("Request %llu on socket %d queued for processing (%d items on Q)\n", i, s, cItemsOnQ);
    408468
    409469            // we have to process main event queue
     
    427487{
    428488    int rc;
     489
     490    uint32_t cWorkerThreads = 5;
    429491
    430492    // intialize runtime
     
    471533                       "Opened log file \"%s\"\n", VBOX_VERSION_STRING, ValueUnion.psz);
    472534            }
     535            break;
     536
     537            case 'T':
     538                cWorkerThreads = ValueUnion.u32;
    473539            break;
    474540
     
    577643    }
    578644
    579     beginProcessing();
     645    beginProcessing(cWorkerThreads);
    580646
    581647    com::Shutdown();
Note: See TracChangeset for help on using the changeset viewer.

© 2024 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette