VirtualBox

source: vbox/trunk/src/libs/xpcom18a4/nsprpub/pr/tests/cltsrv.c@ 1

Last change on this file since 1 was 1, checked in by vboxsync, 55 years ago

import

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
File size: 39.6 KB
Line 
1/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2/* ***** BEGIN LICENSE BLOCK *****
3 * Version: MPL 1.1/GPL 2.0/LGPL 2.1
4 *
5 * The contents of this file are subject to the Mozilla Public License Version
6 * 1.1 (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 * http://www.mozilla.org/MPL/
9 *
10 * Software distributed under the License is distributed on an "AS IS" basis,
11 * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
12 * for the specific language governing rights and limitations under the
13 * License.
14 *
15 * The Original Code is the Netscape Portable Runtime (NSPR).
16 *
17 * The Initial Developer of the Original Code is
18 * Netscape Communications Corporation.
19 * Portions created by the Initial Developer are Copyright (C) 1998-2000
20 * the Initial Developer. All Rights Reserved.
21 *
22 * Contributor(s):
23 *
24 * Alternatively, the contents of this file may be used under the terms of
25 * either the GNU General Public License Version 2 or later (the "GPL"), or
26 * the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
27 * in which case the provisions of the GPL or the LGPL are applicable instead
28 * of those above. If you wish to allow use of your version of this file only
29 * under the terms of either the GPL or the LGPL, and not to allow others to
30 * use your version of this file under the terms of the MPL, indicate your
31 * decision by deleting the provisions above and replace them with the notice
32 * and other provisions required by the GPL or the LGPL. If you do not delete
33 * the provisions above, a recipient may use your version of this file under
34 * the terms of any one of the MPL, the GPL or the LGPL.
35 *
36 * ***** END LICENSE BLOCK ***** */
37
38/*
39 *
40 * Notes:
41 * [1] lth. The call to Sleep() is a hack to get the test case to run
42 * on Windows 95. Without it, the test case fails with an error
43 * WSAECONNRESET following a recv() call. The error is caused by the
44 * server side thread termination without a shutdown() or closesocket()
45 * call. Windows docmunentation suggests that this is predicted
46 * behavior; that other platforms get away with it is ... serindipity.
47 * The test case should shutdown() or closesocket() before
48 * thread termination. I didn't have time to figure out where or how
49 * to do it. The Sleep() call inserts enough delay to allow the
50 * client side to recv() all his data before the server side thread
51 * terminates. Whew! ...
52 *
53 ** Modification History:
54 * 14-May-97 AGarcia- Converted the test to accomodate the debug_mode flag.
55 * The debug mode will print all of the printfs associated with this test.
56 * The regress mode will be the default mode. Since the regress tool limits
57 * the output to a one line status:PASS or FAIL,all of the printf statements
58 * have been handled with an if (debug_mode) statement.
59 */
60
61#include "prclist.h"
62#include "prcvar.h"
63#include "prerror.h"
64#include "prinit.h"
65#include "prinrval.h"
66#include "prio.h"
67#include "prlock.h"
68#include "prlog.h"
69#include "prtime.h"
70#include "prmem.h"
71#include "prnetdb.h"
72#include "prprf.h"
73#include "prthread.h"
74
75#include "pprio.h"
76#include "primpl.h"
77
78#include "plstr.h"
79#include "plerror.h"
80#include "plgetopt.h"
81
82#include <stdlib.h>
83#include <string.h>
84
85
86#if defined(XP_UNIX)
87#include <math.h>
88#endif
89
90#ifdef XP_MAC
91#include "prlog.h"
92#define printf PR_LogPrint
93#endif
94
95/*
96** This is the beginning of the test
97*/
98
99#define RECV_FLAGS 0
100#define SEND_FLAGS 0
101#define DEFAULT_LOW 0
102#define DEFAULT_HIGH 0
103#define BUFFER_SIZE 1024
104#define DEFAULT_BACKLOG 5
105#define DEFAULT_PORT 12849
106#define DEFAULT_CLIENTS 1
107#define ALLOWED_IN_ACCEPT 1
108#define DEFAULT_CLIPPING 1000
109#define DEFAULT_WORKERS_MIN 1
110#define DEFAULT_WORKERS_MAX 1
111#define DEFAULT_SERVER "localhost"
112#define DEFAULT_EXECUTION_TIME 10
113#define DEFAULT_CLIENT_TIMEOUT 4000
114#define DEFAULT_SERVER_TIMEOUT 4000
115#define DEFAULT_SERVER_PRIORITY PR_PRIORITY_HIGH
116
117typedef enum CSState_e {cs_init, cs_run, cs_stop, cs_exit} CSState_t;
118
119static void PR_CALLBACK Worker(void *arg);
120typedef struct CSPool_s CSPool_t;
121typedef struct CSWorker_s CSWorker_t;
122typedef struct CSServer_s CSServer_t;
123typedef enum Verbosity
124{
125 TEST_LOG_ALWAYS,
126 TEST_LOG_ERROR,
127 TEST_LOG_WARNING,
128 TEST_LOG_NOTICE,
129 TEST_LOG_INFO,
130 TEST_LOG_STATUS,
131 TEST_LOG_VERBOSE
132} Verbosity;
133
134static PRInt32 domain = AF_INET;
135static PRInt32 protocol = 6; /* TCP */
136static PRFileDesc *debug_out = NULL;
137static PRBool debug_mode = PR_FALSE;
138static PRBool pthread_stats = PR_FALSE;
139static Verbosity verbosity = TEST_LOG_ALWAYS;
140static PRThreadScope thread_scope = PR_LOCAL_THREAD;
141
142struct CSWorker_s
143{
144 PRCList element; /* list of the server's workers */
145
146 PRThread *thread; /* this worker objects thread */
147 CSServer_t *server; /* back pointer to server structure */
148};
149
150struct CSPool_s
151{
152 PRCondVar *exiting;
153 PRCondVar *acceptComplete;
154 PRUint32 accepting, active, workers;
155};
156
157struct CSServer_s
158{
159 PRCList list; /* head of worker list */
160
161 PRLock *ml;
162 PRThread *thread; /* the main server thread */
163 PRCondVar *stateChange;
164
165 PRUint16 port; /* port we're listening on */
166 PRUint32 backlog; /* size of our listener backlog */
167 PRFileDesc *listener; /* the fd accepting connections */
168
169 CSPool_t pool; /* statistics on worker threads */
170 CSState_t state; /* the server's state */
171 struct /* controlling worker counts */
172 {
173 PRUint32 minimum, maximum, accepting;
174 } workers;
175
176 /* statistics */
177 PRIntervalTime started, stopped;
178 PRUint32 operations, bytesTransferred;
179};
180
181typedef struct CSDescriptor_s
182{
183 PRInt32 size; /* size of transfer */
184 char filename[60]; /* filename, null padded */
185} CSDescriptor_t;
186
187typedef struct CSClient_s
188{
189 PRLock *ml;
190 PRThread *thread;
191 PRCondVar *stateChange;
192 PRNetAddr serverAddress;
193
194 CSState_t state;
195
196 /* statistics */
197 PRIntervalTime started, stopped;
198 PRUint32 operations, bytesTransferred;
199} CSClient_t;
200
201#define TEST_LOG(l, p, a) \
202 do { \
203 if (debug_mode || (p <= verbosity)) printf a; \
204 } while (0)
205
206PRLogModuleInfo *cltsrv_log_file = NULL;
207
208#define MY_ASSERT(_expr) \
209 ((_expr)?((void)0):_MY_Assert(# _expr,__FILE__,__LINE__))
210
211#define TEST_ASSERT(_expr) \
212 ((_expr)?((void)0):_MY_Assert(# _expr,__FILE__,__LINE__))
213
214static void _MY_Assert(const char *s, const char *file, PRIntn ln)
215{
216 PL_PrintError(NULL);
217#if DEBUG
218 PR_Assert(s, file, ln);
219#endif
220} /* _MW_Assert */
221
222static PRBool Aborted(PRStatus rv)
223{
224 return ((PR_FAILURE == rv) && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) ?
225 PR_TRUE : PR_FALSE;
226}
227
228static void TimeOfDayMessage(const char *msg, PRThread* me)
229{
230 char buffer[100];
231 PRExplodedTime tod;
232 PR_ExplodeTime(PR_Now(), PR_LocalTimeParameters, &tod);
233 (void)PR_FormatTime(buffer, sizeof(buffer), "%T", &tod);
234
235 TEST_LOG(
236 cltsrv_log_file, TEST_LOG_ALWAYS,
237 ("%s(0x%p): %s\n", msg, me, buffer));
238} /* TimeOfDayMessage */
239
240
241static void PR_CALLBACK Client(void *arg)
242{
243 PRStatus rv;
244 PRIntn index;
245 char buffer[1024];
246 PRFileDesc *fd = NULL;
247 PRUintn clipping = DEFAULT_CLIPPING;
248 PRThread *me = PR_CurrentThread();
249 CSClient_t *client = (CSClient_t*)arg;
250 CSDescriptor_t *descriptor = PR_NEW(CSDescriptor_t);
251 PRIntervalTime timeout = PR_MillisecondsToInterval(DEFAULT_CLIENT_TIMEOUT);
252
253
254 for (index = 0; index < sizeof(buffer); ++index)
255 buffer[index] = (char)index;
256
257 client->started = PR_IntervalNow();
258
259 PR_Lock(client->ml);
260 client->state = cs_run;
261 PR_NotifyCondVar(client->stateChange);
262 PR_Unlock(client->ml);
263
264 TimeOfDayMessage("Client started at", me);
265
266 while (cs_run == client->state)
267 {
268 PRInt32 bytes, descbytes, filebytes, netbytes;
269
270 (void)PR_NetAddrToString(&client->serverAddress, buffer, sizeof(buffer));
271 TEST_LOG(cltsrv_log_file, TEST_LOG_INFO,
272 ("\tClient(0x%p): connecting to server at %s\n", me, buffer));
273
274 fd = PR_Socket(domain, SOCK_STREAM, protocol);
275 TEST_ASSERT(NULL != fd);
276 rv = PR_Connect(fd, &client->serverAddress, timeout);
277 if (PR_FAILURE == rv)
278 {
279 TEST_LOG(
280 cltsrv_log_file, TEST_LOG_ERROR,
281 ("\tClient(0x%p): conection failed (%d, %d)\n",
282 me, PR_GetError(), PR_GetOSError()));
283 goto aborted;
284 }
285
286 memset(descriptor, 0, sizeof(*descriptor));
287 descriptor->size = PR_htonl(descbytes = rand() % clipping);
288 PR_snprintf(
289 descriptor->filename, sizeof(descriptor->filename),
290 "CS%p%p-%p.dat", client->started, me, client->operations);
291 TEST_LOG(
292 cltsrv_log_file, TEST_LOG_VERBOSE,
293 ("\tClient(0x%p): sending descriptor for %u bytes\n", me, descbytes));
294 bytes = PR_Send(
295 fd, descriptor, sizeof(*descriptor), SEND_FLAGS, timeout);
296 if (sizeof(CSDescriptor_t) != bytes)
297 {
298 if (Aborted(PR_FAILURE)) goto aborted;
299 if (PR_IO_TIMEOUT_ERROR == PR_GetError())
300 {
301 TEST_LOG(
302 cltsrv_log_file, TEST_LOG_ERROR,
303 ("\tClient(0x%p): send descriptor timeout\n", me));
304 goto retry;
305 }
306 }
307 TEST_ASSERT(sizeof(*descriptor) == bytes);
308
309 netbytes = 0;
310 while (netbytes < descbytes)
311 {
312 filebytes = sizeof(buffer);
313 if ((descbytes - netbytes) < filebytes)
314 filebytes = descbytes - netbytes;
315 TEST_LOG(
316 cltsrv_log_file, TEST_LOG_VERBOSE,
317 ("\tClient(0x%p): sending %d bytes\n", me, filebytes));
318 bytes = PR_Send(fd, buffer, filebytes, SEND_FLAGS, timeout);
319 if (filebytes != bytes)
320 {
321 if (Aborted(PR_FAILURE)) goto aborted;
322 if (PR_IO_TIMEOUT_ERROR == PR_GetError())
323 {
324 TEST_LOG(
325 cltsrv_log_file, TEST_LOG_ERROR,
326 ("\tClient(0x%p): send data timeout\n", me));
327 goto retry;
328 }
329 }
330 TEST_ASSERT(bytes == filebytes);
331 netbytes += bytes;
332 }
333 filebytes = 0;
334 while (filebytes < descbytes)
335 {
336 netbytes = sizeof(buffer);
337 if ((descbytes - filebytes) < netbytes)
338 netbytes = descbytes - filebytes;
339 TEST_LOG(
340 cltsrv_log_file, TEST_LOG_VERBOSE,
341 ("\tClient(0x%p): receiving %d bytes\n", me, netbytes));
342 bytes = PR_Recv(fd, buffer, netbytes, RECV_FLAGS, timeout);
343 if (-1 == bytes)
344 {
345 if (Aborted(PR_FAILURE))
346 {
347 TEST_LOG(
348 cltsrv_log_file, TEST_LOG_ERROR,
349 ("\tClient(0x%p): receive data aborted\n", me));
350 goto aborted;
351 }
352 else if (PR_IO_TIMEOUT_ERROR == PR_GetError())
353 TEST_LOG(
354 cltsrv_log_file, TEST_LOG_ERROR,
355 ("\tClient(0x%p): receive data timeout\n", me));
356 else
357 TEST_LOG(
358 cltsrv_log_file, TEST_LOG_ERROR,
359 ("\tClient(0x%p): receive error (%d, %d)\n",
360 me, PR_GetError(), PR_GetOSError()));
361 goto retry;
362 }
363 if (0 == bytes)
364 {
365 TEST_LOG(
366 cltsrv_log_file, TEST_LOG_ERROR,
367 ("\t\tClient(0x%p): unexpected end of stream\n",
368 PR_CurrentThread()));
369 break;
370 }
371 filebytes += bytes;
372 }
373
374 rv = PR_Shutdown(fd, PR_SHUTDOWN_BOTH);
375 if (Aborted(rv)) goto aborted;
376 TEST_ASSERT(PR_SUCCESS == rv);
377retry:
378 (void)PR_Close(fd); fd = NULL;
379 TEST_LOG(
380 cltsrv_log_file, TEST_LOG_INFO,
381 ("\tClient(0x%p): disconnected from server\n", me));
382
383 PR_Lock(client->ml);
384 client->operations += 1;
385 client->bytesTransferred += 2 * descbytes;
386 rv = PR_WaitCondVar(client->stateChange, rand() % clipping);
387 PR_Unlock(client->ml);
388 if (Aborted(rv)) break;
389 }
390
391aborted:
392 client->stopped = PR_IntervalNow();
393
394 PR_ClearInterrupt();
395 if (NULL != fd) rv = PR_Close(fd);
396
397 PR_Lock(client->ml);
398 client->state = cs_exit;
399 PR_NotifyCondVar(client->stateChange);
400 PR_Unlock(client->ml);
401 PR_DELETE(descriptor);
402 TEST_LOG(
403 cltsrv_log_file, TEST_LOG_ALWAYS,
404 ("\tClient(0x%p): stopped after %u operations and %u bytes\n",
405 PR_CurrentThread(), client->operations, client->bytesTransferred));
406
407} /* Client */
408
409static PRStatus ProcessRequest(PRFileDesc *fd, CSServer_t *server)
410{
411 PRStatus drv, rv;
412 char buffer[1024];
413 PRFileDesc *file = NULL;
414 PRThread * me = PR_CurrentThread();
415 PRInt32 bytes, descbytes, netbytes, filebytes = 0;
416 CSDescriptor_t *descriptor = PR_NEW(CSDescriptor_t);
417 PRIntervalTime timeout = PR_MillisecondsToInterval(DEFAULT_SERVER_TIMEOUT);
418
419 TEST_LOG(
420 cltsrv_log_file, TEST_LOG_VERBOSE,
421 ("\tProcessRequest(0x%p): receiving desciptor\n", me));
422 bytes = PR_Recv(
423 fd, descriptor, sizeof(*descriptor), RECV_FLAGS, timeout);
424 if (-1 == bytes)
425 {
426 rv = PR_FAILURE;
427 if (Aborted(rv)) goto exit;
428 if (PR_IO_TIMEOUT_ERROR == PR_GetError())
429 {
430 TEST_LOG(
431 cltsrv_log_file, TEST_LOG_ERROR,
432 ("\tProcessRequest(0x%p): receive timeout\n", me));
433 }
434 goto exit;
435 }
436 if (0 == bytes)
437 {
438 rv = PR_FAILURE;
439 TEST_LOG(
440 cltsrv_log_file, TEST_LOG_ERROR,
441 ("\tProcessRequest(0x%p): unexpected end of file\n", me));
442 goto exit;
443 }
444 descbytes = PR_ntohl(descriptor->size);
445 TEST_ASSERT(sizeof(*descriptor) == bytes);
446
447 TEST_LOG(
448 cltsrv_log_file, TEST_LOG_VERBOSE,
449 ("\t\tProcessRequest(0x%p): read descriptor {%d, %s}\n",
450 me, descbytes, descriptor->filename));
451
452 file = PR_Open(
453 descriptor->filename, (PR_CREATE_FILE | PR_WRONLY), 0666);
454 if (NULL == file)
455 {
456 rv = PR_FAILURE;
457 if (Aborted(rv)) goto aborted;
458 if (PR_IO_TIMEOUT_ERROR == PR_GetError())
459 {
460 TEST_LOG(
461 cltsrv_log_file, TEST_LOG_ERROR,
462 ("\tProcessRequest(0x%p): open file timeout\n", me));
463 goto aborted;
464 }
465 }
466 TEST_ASSERT(NULL != file);
467
468 filebytes = 0;
469 while (filebytes < descbytes)
470 {
471 netbytes = sizeof(buffer);
472 if ((descbytes - filebytes) < netbytes)
473 netbytes = descbytes - filebytes;
474 TEST_LOG(
475 cltsrv_log_file, TEST_LOG_VERBOSE,
476 ("\tProcessRequest(0x%p): receive %d bytes\n", me, netbytes));
477 bytes = PR_Recv(fd, buffer, netbytes, RECV_FLAGS, timeout);
478 if (-1 == bytes)
479 {
480 rv = PR_FAILURE;
481 if (Aborted(rv)) goto aborted;
482 if (PR_IO_TIMEOUT_ERROR == PR_GetError())
483 {
484 TEST_LOG(
485 cltsrv_log_file, TEST_LOG_ERROR,
486 ("\t\tProcessRequest(0x%p): receive data timeout\n", me));
487 goto aborted;
488 }
489 /*
490 * XXX: I got (PR_CONNECT_RESET_ERROR, ERROR_NETNAME_DELETED)
491 * on NT here. This is equivalent to ECONNRESET on Unix.
492 * -wtc
493 */
494 TEST_LOG(
495 cltsrv_log_file, TEST_LOG_WARNING,
496 ("\t\tProcessRequest(0x%p): unexpected error (%d, %d)\n",
497 me, PR_GetError(), PR_GetOSError()));
498 goto aborted;
499 }
500 if(0 == bytes)
501 {
502 TEST_LOG(
503 cltsrv_log_file, TEST_LOG_WARNING,
504 ("\t\tProcessRequest(0x%p): unexpected end of stream\n", me));
505 rv = PR_FAILURE;
506 goto aborted;
507 }
508 filebytes += bytes;
509 netbytes = bytes;
510 /* The byte count for PR_Write should be positive */
511 MY_ASSERT(netbytes > 0);
512 TEST_LOG(
513 cltsrv_log_file, TEST_LOG_VERBOSE,
514 ("\tProcessRequest(0x%p): write %d bytes to file\n", me, netbytes));
515 bytes = PR_Write(file, buffer, netbytes);
516 if (netbytes != bytes)
517 {
518 rv = PR_FAILURE;
519 if (Aborted(rv)) goto aborted;
520 if (PR_IO_TIMEOUT_ERROR == PR_GetError())
521 {
522 TEST_LOG(
523 cltsrv_log_file, TEST_LOG_ERROR,
524 ("\t\tProcessRequest(0x%p): write file timeout\n", me));
525 goto aborted;
526 }
527 }
528 TEST_ASSERT(bytes > 0);
529 }
530
531 PR_Lock(server->ml);
532 server->operations += 1;
533 server->bytesTransferred += filebytes;
534 PR_Unlock(server->ml);
535
536 rv = PR_Close(file);
537 if (Aborted(rv)) goto aborted;
538 TEST_ASSERT(PR_SUCCESS == rv);
539 file = NULL;
540
541 TEST_LOG(
542 cltsrv_log_file, TEST_LOG_VERBOSE,
543 ("\t\tProcessRequest(0x%p): opening %s\n", me, descriptor->filename));
544 file = PR_Open(descriptor->filename, PR_RDONLY, 0);
545 if (NULL == file)
546 {
547 rv = PR_FAILURE;
548 if (Aborted(rv)) goto aborted;
549 if (PR_IO_TIMEOUT_ERROR == PR_GetError())
550 {
551 TEST_LOG(
552 cltsrv_log_file, TEST_LOG_ERROR,
553 ("\t\tProcessRequest(0x%p): open file timeout\n",
554 PR_CurrentThread()));
555 goto aborted;
556 }
557 TEST_LOG(
558 cltsrv_log_file, TEST_LOG_ERROR,
559 ("\t\tProcessRequest(0x%p): other file open error (%u, %u)\n",
560 me, PR_GetError(), PR_GetOSError()));
561 goto aborted;
562 }
563 TEST_ASSERT(NULL != file);
564
565 netbytes = 0;
566 while (netbytes < descbytes)
567 {
568 filebytes = sizeof(buffer);
569 if ((descbytes - netbytes) < filebytes)
570 filebytes = descbytes - netbytes;
571 TEST_LOG(
572 cltsrv_log_file, TEST_LOG_VERBOSE,
573 ("\tProcessRequest(0x%p): read %d bytes from file\n", me, filebytes));
574 bytes = PR_Read(file, buffer, filebytes);
575 if (filebytes != bytes)
576 {
577 rv = PR_FAILURE;
578 if (Aborted(rv)) goto aborted;
579 if (PR_IO_TIMEOUT_ERROR == PR_GetError())
580 TEST_LOG(
581 cltsrv_log_file, TEST_LOG_ERROR,
582 ("\t\tProcessRequest(0x%p): read file timeout\n", me));
583 else
584 TEST_LOG(
585 cltsrv_log_file, TEST_LOG_ERROR,
586 ("\t\tProcessRequest(0x%p): other file error (%d, %d)\n",
587 me, PR_GetError(), PR_GetOSError()));
588 goto aborted;
589 }
590 TEST_ASSERT(bytes > 0);
591 netbytes += bytes;
592 filebytes = bytes;
593 TEST_LOG(
594 cltsrv_log_file, TEST_LOG_VERBOSE,
595 ("\t\tProcessRequest(0x%p): sending %d bytes\n", me, filebytes));
596 bytes = PR_Send(fd, buffer, filebytes, SEND_FLAGS, timeout);
597 if (filebytes != bytes)
598 {
599 rv = PR_FAILURE;
600 if (Aborted(rv)) goto aborted;
601 if (PR_IO_TIMEOUT_ERROR == PR_GetError())
602 {
603 TEST_LOG(
604 cltsrv_log_file, TEST_LOG_ERROR,
605 ("\t\tProcessRequest(0x%p): send data timeout\n", me));
606 goto aborted;
607 }
608 break;
609 }
610 TEST_ASSERT(bytes > 0);
611 }
612
613 PR_Lock(server->ml);
614 server->bytesTransferred += filebytes;
615 PR_Unlock(server->ml);
616
617 rv = PR_Shutdown(fd, PR_SHUTDOWN_BOTH);
618 if (Aborted(rv)) goto aborted;
619
620 rv = PR_Close(file);
621 if (Aborted(rv)) goto aborted;
622 TEST_ASSERT(PR_SUCCESS == rv);
623 file = NULL;
624
625aborted:
626 PR_ClearInterrupt();
627 if (NULL != file) PR_Close(file);
628 drv = PR_Delete(descriptor->filename);
629 TEST_ASSERT(PR_SUCCESS == drv);
630exit:
631 TEST_LOG(
632 cltsrv_log_file, TEST_LOG_VERBOSE,
633 ("\t\tProcessRequest(0x%p): Finished\n", me));
634
635 PR_DELETE(descriptor);
636
637#if defined(WIN95)
638 PR_Sleep(PR_MillisecondsToInterval(200)); /* lth. see note [1] */
639#endif
640 return rv;
641} /* ProcessRequest */
642
643static PRStatus CreateWorker(CSServer_t *server, CSPool_t *pool)
644{
645 CSWorker_t *worker = PR_NEWZAP(CSWorker_t);
646 worker->server = server;
647 PR_INIT_CLIST(&worker->element);
648 worker->thread = PR_CreateThread(
649 PR_USER_THREAD, Worker, worker,
650 DEFAULT_SERVER_PRIORITY, thread_scope,
651 PR_UNJOINABLE_THREAD, 0);
652 if (NULL == worker->thread)
653 {
654 PR_DELETE(worker);
655 return PR_FAILURE;
656 }
657
658 TEST_LOG(cltsrv_log_file, TEST_LOG_STATUS,
659 ("\tCreateWorker(0x%p): create new worker (0x%p)\n",
660 PR_CurrentThread(), worker->thread));
661
662 return PR_SUCCESS;
663} /* CreateWorker */
664
665static void PR_CALLBACK Worker(void *arg)
666{
667 PRStatus rv;
668 PRNetAddr from;
669 PRFileDesc *fd = NULL;
670 PRThread *me = PR_CurrentThread();
671 CSWorker_t *worker = (CSWorker_t*)arg;
672 CSServer_t *server = worker->server;
673 CSPool_t *pool = &server->pool;
674
675 TEST_LOG(
676 cltsrv_log_file, TEST_LOG_NOTICE,
677 ("\t\tWorker(0x%p): started [%u]\n", me, pool->workers + 1));
678
679 PR_Lock(server->ml);
680 PR_APPEND_LINK(&worker->element, &server->list);
681 pool->workers += 1; /* define our existance */
682
683 while (cs_run == server->state)
684 {
685 while (pool->accepting >= server->workers.accepting)
686 {
687 TEST_LOG(
688 cltsrv_log_file, TEST_LOG_VERBOSE,
689 ("\t\tWorker(0x%p): waiting for accept slot[%d]\n",
690 me, pool->accepting));
691 rv = PR_WaitCondVar(pool->acceptComplete, PR_INTERVAL_NO_TIMEOUT);
692 if (Aborted(rv) || (cs_run != server->state))
693 {
694 TEST_LOG(
695 cltsrv_log_file, TEST_LOG_NOTICE,
696 ("\tWorker(0x%p): has been %s\n",
697 me, (Aborted(rv) ? "interrupted" : "stopped")));
698 goto exit;
699 }
700 }
701 pool->accepting += 1; /* how many are really in accept */
702 PR_Unlock(server->ml);
703
704 TEST_LOG(
705 cltsrv_log_file, TEST_LOG_VERBOSE,
706 ("\t\tWorker(0x%p): calling accept\n", me));
707 fd = PR_Accept(server->listener, &from, PR_INTERVAL_NO_TIMEOUT);
708
709 PR_Lock(server->ml);
710 pool->accepting -= 1;
711 PR_NotifyCondVar(pool->acceptComplete);
712
713 if ((NULL == fd) && Aborted(PR_FAILURE))
714 {
715 if (NULL != server->listener)
716 {
717 PR_Close(server->listener);
718 server->listener = NULL;
719 }
720 goto exit;
721 }
722
723 if (NULL != fd)
724 {
725 /*
726 ** Create another worker of the total number of workers is
727 ** less than the minimum specified or we have none left in
728 ** accept() AND we're not over the maximum.
729 ** This sort of presumes that the number allowed in accept
730 ** is at least as many as the minimum. Otherwise we'll keep
731 ** creating new threads and deleting them soon after.
732 */
733 PRBool another =
734 ((pool->workers < server->workers.minimum) ||
735 ((0 == pool->accepting)
736 && (pool->workers < server->workers.maximum))) ?
737 PR_TRUE : PR_FALSE;
738 pool->active += 1;
739 PR_Unlock(server->ml);
740
741 if (another) (void)CreateWorker(server, pool);
742
743 rv = ProcessRequest(fd, server);
744 if (PR_SUCCESS != rv)
745 TEST_LOG(
746 cltsrv_log_file, TEST_LOG_ERROR,
747 ("\t\tWorker(0x%p): server process ended abnormally\n", me));
748 (void)PR_Close(fd); fd = NULL;
749
750 PR_Lock(server->ml);
751 pool->active -= 1;
752 }
753 }
754
755exit:
756 PR_ClearInterrupt();
757 PR_Unlock(server->ml);
758
759 if (NULL != fd)
760 {
761 (void)PR_Shutdown(fd, PR_SHUTDOWN_BOTH);
762 (void)PR_Close(fd);
763 }
764
765 TEST_LOG(
766 cltsrv_log_file, TEST_LOG_NOTICE,
767 ("\t\tWorker(0x%p): exiting [%u]\n", PR_CurrentThread(), pool->workers));
768
769 PR_Lock(server->ml);
770 pool->workers -= 1; /* undefine our existance */
771 PR_REMOVE_AND_INIT_LINK(&worker->element);
772 PR_NotifyCondVar(pool->exiting);
773 PR_Unlock(server->ml);
774
775 PR_DELETE(worker); /* destruction of the "worker" object */
776
777} /* Worker */
778
779static void PR_CALLBACK Server(void *arg)
780{
781 PRStatus rv;
782 PRNetAddr serverAddress;
783 PRThread *me = PR_CurrentThread();
784 CSServer_t *server = (CSServer_t*)arg;
785 PRSocketOptionData sockOpt;
786
787 server->listener = PR_Socket(domain, SOCK_STREAM, protocol);
788
789 sockOpt.option = PR_SockOpt_Reuseaddr;
790 sockOpt.value.reuse_addr = PR_TRUE;
791 rv = PR_SetSocketOption(server->listener, &sockOpt);
792 TEST_ASSERT(PR_SUCCESS == rv);
793
794 memset(&serverAddress, 0, sizeof(serverAddress));
795 if (PR_AF_INET6 != domain)
796 rv = PR_InitializeNetAddr(PR_IpAddrAny, DEFAULT_PORT, &serverAddress);
797 else
798 rv = PR_SetNetAddr(PR_IpAddrAny, PR_AF_INET6, DEFAULT_PORT,
799 &serverAddress);
800 rv = PR_Bind(server->listener, &serverAddress);
801 TEST_ASSERT(PR_SUCCESS == rv);
802
803 rv = PR_Listen(server->listener, server->backlog);
804 TEST_ASSERT(PR_SUCCESS == rv);
805
806 server->started = PR_IntervalNow();
807 TimeOfDayMessage("Server started at", me);
808
809 PR_Lock(server->ml);
810 server->state = cs_run;
811 PR_NotifyCondVar(server->stateChange);
812 PR_Unlock(server->ml);
813
814 /*
815 ** Create the first worker (actually, a thread that accepts
816 ** connections and then processes the work load as needed).
817 ** From this point on, additional worker threads are created
818 ** as they are needed by existing worker threads.
819 */
820 rv = CreateWorker(server, &server->pool);
821 TEST_ASSERT(PR_SUCCESS == rv);
822
823 /*
824 ** From here on this thread is merely hanging around as the contact
825 ** point for the main test driver. It's just waiting for the driver
826 ** to declare the test complete.
827 */
828 TEST_LOG(
829 cltsrv_log_file, TEST_LOG_VERBOSE,
830 ("\tServer(0x%p): waiting for state change\n", me));
831
832 PR_Lock(server->ml);
833 while ((cs_run == server->state) && !Aborted(rv))
834 {
835 rv = PR_WaitCondVar(server->stateChange, PR_INTERVAL_NO_TIMEOUT);
836 }
837 PR_Unlock(server->ml);
838 PR_ClearInterrupt();
839
840 TEST_LOG(
841 cltsrv_log_file, TEST_LOG_INFO,
842 ("\tServer(0x%p): shutting down workers\n", me));
843
844 /*
845 ** Get all the worker threads to exit. They know how to
846 ** clean up after themselves, so this is just a matter of
847 ** waiting for clorine in the pool to take effect. During
848 ** this stage we're ignoring interrupts.
849 */
850 server->workers.minimum = server->workers.maximum = 0;
851
852 PR_Lock(server->ml);
853 while (!PR_CLIST_IS_EMPTY(&server->list))
854 {
855 PRCList *head = PR_LIST_HEAD(&server->list);
856 CSWorker_t *worker = (CSWorker_t*)head;
857 TEST_LOG(
858 cltsrv_log_file, TEST_LOG_VERBOSE,
859 ("\tServer(0x%p): interrupting worker(0x%p)\n", me, worker));
860 rv = PR_Interrupt(worker->thread);
861 TEST_ASSERT(PR_SUCCESS == rv);
862 PR_REMOVE_AND_INIT_LINK(head);
863 }
864
865 while (server->pool.workers > 0)
866 {
867 TEST_LOG(
868 cltsrv_log_file, TEST_LOG_NOTICE,
869 ("\tServer(0x%p): waiting for %u workers to exit\n",
870 me, server->pool.workers));
871 (void)PR_WaitCondVar(server->pool.exiting, PR_INTERVAL_NO_TIMEOUT);
872 }
873
874 server->state = cs_exit;
875 PR_NotifyCondVar(server->stateChange);
876 PR_Unlock(server->ml);
877
878 TEST_LOG(
879 cltsrv_log_file, TEST_LOG_ALWAYS,
880 ("\tServer(0x%p): stopped after %u operations and %u bytes\n",
881 me, server->operations, server->bytesTransferred));
882
883 if (NULL != server->listener) PR_Close(server->listener);
884 server->stopped = PR_IntervalNow();
885
886} /* Server */
887
888static void WaitForCompletion(PRIntn execution)
889{
890 while (execution > 0)
891 {
892 PRIntn dally = (execution > 30) ? 30 : execution;
893 PR_Sleep(PR_SecondsToInterval(dally));
894 if (pthread_stats) PT_FPrintStats(debug_out, "\nPThread Statistics\n");
895 execution -= dally;
896 }
897} /* WaitForCompletion */
898
899static void Help(void)
900{
901 PR_fprintf(debug_out, "cltsrv test program usage:\n");
902 PR_fprintf(debug_out, "\t-a <n> threads allowed in accept (5)\n");
903 PR_fprintf(debug_out, "\t-b <n> backlock for listen (5)\n");
904 PR_fprintf(debug_out, "\t-c <threads> number of clients to create (1)\n");
905 PR_fprintf(debug_out, "\t-f <low> low water mark for fd caching (0)\n");
906 PR_fprintf(debug_out, "\t-F <high> high water mark for fd caching (0)\n");
907 PR_fprintf(debug_out, "\t-w <threads> minimal number of server threads (1)\n");
908 PR_fprintf(debug_out, "\t-W <threads> maximum number of server threads (1)\n");
909 PR_fprintf(debug_out, "\t-e <seconds> duration of the test in seconds (10)\n");
910 PR_fprintf(debug_out, "\t-s <string> dsn name of server (localhost)\n");
911 PR_fprintf(debug_out, "\t-G use GLOBAL threads (LOCAL)\n");
912 PR_fprintf(debug_out, "\t-X use XTP as transport (TCP)\n");
913 PR_fprintf(debug_out, "\t-6 Use IPv6 (IPv4)\n");
914 PR_fprintf(debug_out, "\t-v verbosity (accumulative) (0)\n");
915 PR_fprintf(debug_out, "\t-p pthread statistics (FALSE)\n");
916 PR_fprintf(debug_out, "\t-d debug mode (FALSE)\n");
917 PR_fprintf(debug_out, "\t-h this message\n");
918} /* Help */
919
920static Verbosity IncrementVerbosity(void)
921{
922 PRIntn verboge = (PRIntn)verbosity + 1;
923 return (Verbosity)verboge;
924} /* IncrementVerbosity */
925
926PRIntn main(PRIntn argc, char** argv)
927{
928 PRUintn index;
929 PRBool boolean;
930 CSClient_t *client;
931 PRStatus rv, joinStatus;
932 CSServer_t *server = NULL;
933
934 PRUintn backlog = DEFAULT_BACKLOG;
935 PRUintn clients = DEFAULT_CLIENTS;
936 const char *serverName = DEFAULT_SERVER;
937 PRBool serverIsLocal = PR_TRUE;
938 PRUintn accepting = ALLOWED_IN_ACCEPT;
939 PRUintn workersMin = DEFAULT_WORKERS_MIN;
940 PRUintn workersMax = DEFAULT_WORKERS_MAX;
941 PRIntn execution = DEFAULT_EXECUTION_TIME;
942 PRIntn low = DEFAULT_LOW, high = DEFAULT_HIGH;
943
944 /*
945 * -G use global threads
946 * -a <n> threads allowed in accept
947 * -b <n> backlock for listen
948 * -c <threads> number of clients to create
949 * -f <low> low water mark for caching FDs
950 * -F <high> high water mark for caching FDs
951 * -w <threads> minimal number of server threads
952 * -W <threads> maximum number of server threads
953 * -e <seconds> duration of the test in seconds
954 * -s <string> dsn name of server (implies no server here)
955 * -v verbosity
956 */
957
958 PLOptStatus os;
959 PLOptState *opt = PL_CreateOptState(argc, argv, "GX6b:a:c:f:F:w:W:e:s:vdhp");
960
961 debug_out = PR_GetSpecialFD(PR_StandardError);
962
963 while (PL_OPT_EOL != (os = PL_GetNextOpt(opt)))
964 {
965 if (PL_OPT_BAD == os) continue;
966 switch (opt->option)
967 {
968 case 'G': /* use global threads */
969 thread_scope = PR_GLOBAL_THREAD;
970 break;
971 case 'X': /* use XTP as transport */
972 protocol = 36;
973 break;
974 case '6': /* Use IPv6 */
975 domain = PR_AF_INET6;
976 break;
977 case 'a': /* the value for accepting */
978 accepting = atoi(opt->value);
979 break;
980 case 'b': /* the value for backlock */
981 backlog = atoi(opt->value);
982 break;
983 case 'c': /* number of client threads */
984 clients = atoi(opt->value);
985 break;
986 case 'f': /* low water fd cache */
987 low = atoi(opt->value);
988 break;
989 case 'F': /* low water fd cache */
990 high = atoi(opt->value);
991 break;
992 case 'w': /* minimum server worker threads */
993 workersMin = atoi(opt->value);
994 break;
995 case 'W': /* maximum server worker threads */
996 workersMax = atoi(opt->value);
997 break;
998 case 'e': /* program execution time in seconds */
999 execution = atoi(opt->value);
1000 break;
1001 case 's': /* server's address */
1002 serverName = opt->value;
1003 break;
1004 case 'v': /* verbosity */
1005 verbosity = IncrementVerbosity();
1006 break;
1007 case 'd': /* debug mode */
1008 debug_mode = PR_TRUE;
1009 break;
1010 case 'p': /* pthread mode */
1011 pthread_stats = PR_TRUE;
1012 break;
1013 case 'h':
1014 default:
1015 Help();
1016 return 2;
1017 }
1018 }
1019 PL_DestroyOptState(opt);
1020
1021 if (0 != PL_strcmp(serverName, DEFAULT_SERVER)) serverIsLocal = PR_FALSE;
1022 if (0 == execution) execution = DEFAULT_EXECUTION_TIME;
1023 if (0 == workersMax) workersMax = DEFAULT_WORKERS_MAX;
1024 if (0 == workersMin) workersMin = DEFAULT_WORKERS_MIN;
1025 if (0 == accepting) accepting = ALLOWED_IN_ACCEPT;
1026 if (0 == backlog) backlog = DEFAULT_BACKLOG;
1027
1028 if (workersMin > accepting) accepting = workersMin;
1029
1030 PR_STDIO_INIT();
1031 TimeOfDayMessage("Client/Server started at", PR_CurrentThread());
1032
1033 cltsrv_log_file = PR_NewLogModule("cltsrv_log");
1034 MY_ASSERT(NULL != cltsrv_log_file);
1035 boolean = PR_SetLogFile("cltsrv.log");
1036 MY_ASSERT(boolean);
1037
1038#ifdef XP_MAC
1039 debug_mode = PR_TRUE;
1040#endif
1041
1042 rv = PR_SetFDCacheSize(low, high);
1043 PR_ASSERT(PR_SUCCESS == rv);
1044
1045 if (serverIsLocal)
1046 {
1047 /* Establish the server */
1048 TEST_LOG(
1049 cltsrv_log_file, TEST_LOG_INFO,
1050 ("main(0x%p): starting server\n", PR_CurrentThread()));
1051
1052 server = PR_NEWZAP(CSServer_t);
1053 PR_INIT_CLIST(&server->list);
1054 server->state = cs_init;
1055 server->ml = PR_NewLock();
1056 server->backlog = backlog;
1057 server->port = DEFAULT_PORT;
1058 server->workers.minimum = workersMin;
1059 server->workers.maximum = workersMax;
1060 server->workers.accepting = accepting;
1061 server->stateChange = PR_NewCondVar(server->ml);
1062 server->pool.exiting = PR_NewCondVar(server->ml);
1063 server->pool.acceptComplete = PR_NewCondVar(server->ml);
1064
1065 TEST_LOG(
1066 cltsrv_log_file, TEST_LOG_NOTICE,
1067 ("main(0x%p): creating server thread\n", PR_CurrentThread()));
1068
1069 server->thread = PR_CreateThread(
1070 PR_USER_THREAD, Server, server, PR_PRIORITY_HIGH,
1071 thread_scope, PR_JOINABLE_THREAD, 0);
1072 TEST_ASSERT(NULL != server->thread);
1073
1074 TEST_LOG(
1075 cltsrv_log_file, TEST_LOG_VERBOSE,
1076 ("main(0x%p): waiting for server init\n", PR_CurrentThread()));
1077
1078 PR_Lock(server->ml);
1079 while (server->state == cs_init)
1080 PR_WaitCondVar(server->stateChange, PR_INTERVAL_NO_TIMEOUT);
1081 PR_Unlock(server->ml);
1082
1083 TEST_LOG(
1084 cltsrv_log_file, TEST_LOG_VERBOSE,
1085 ("main(0x%p): server init complete (port #%d)\n",
1086 PR_CurrentThread(), server->port));
1087 }
1088
1089 if (clients != 0)
1090 {
1091 /* Create all of the clients */
1092 PRHostEnt host;
1093 char buffer[BUFFER_SIZE];
1094 client = (CSClient_t*)PR_CALLOC(clients * sizeof(CSClient_t));
1095
1096 TEST_LOG(
1097 cltsrv_log_file, TEST_LOG_VERBOSE,
1098 ("main(0x%p): creating %d client threads\n",
1099 PR_CurrentThread(), clients));
1100
1101 if (!serverIsLocal)
1102 {
1103 rv = PR_GetHostByName(serverName, buffer, BUFFER_SIZE, &host);
1104 if (PR_SUCCESS != rv)
1105 {
1106 PL_FPrintError(PR_STDERR, "PR_GetHostByName");
1107 return 2;
1108 }
1109 }
1110
1111 for (index = 0; index < clients; ++index)
1112 {
1113 client[index].state = cs_init;
1114 client[index].ml = PR_NewLock();
1115 if (serverIsLocal)
1116 {
1117 if (PR_AF_INET6 != domain)
1118 (void)PR_InitializeNetAddr(
1119 PR_IpAddrLoopback, DEFAULT_PORT,
1120 &client[index].serverAddress);
1121 else
1122 rv = PR_SetNetAddr(PR_IpAddrLoopback, PR_AF_INET6,
1123 DEFAULT_PORT, &client[index].serverAddress);
1124 }
1125 else
1126 {
1127 (void)PR_EnumerateHostEnt(
1128 0, &host, DEFAULT_PORT, &client[index].serverAddress);
1129 }
1130 client[index].stateChange = PR_NewCondVar(client[index].ml);
1131 TEST_LOG(
1132 cltsrv_log_file, TEST_LOG_INFO,
1133 ("main(0x%p): creating client threads\n", PR_CurrentThread()));
1134 client[index].thread = PR_CreateThread(
1135 PR_USER_THREAD, Client, &client[index], PR_PRIORITY_NORMAL,
1136 thread_scope, PR_JOINABLE_THREAD, 0);
1137 TEST_ASSERT(NULL != client[index].thread);
1138 PR_Lock(client[index].ml);
1139 while (cs_init == client[index].state)
1140 PR_WaitCondVar(client[index].stateChange, PR_INTERVAL_NO_TIMEOUT);
1141 PR_Unlock(client[index].ml);
1142 }
1143 }
1144
1145 /* Then just let them go at it for a bit */
1146 TEST_LOG(
1147 cltsrv_log_file, TEST_LOG_ALWAYS,
1148 ("main(0x%p): waiting for execution interval (%d seconds)\n",
1149 PR_CurrentThread(), execution));
1150
1151 WaitForCompletion(execution);
1152
1153 TimeOfDayMessage("Shutting down", PR_CurrentThread());
1154
1155 if (clients != 0)
1156 {
1157 for (index = 0; index < clients; ++index)
1158 {
1159 TEST_LOG(cltsrv_log_file, TEST_LOG_STATUS,
1160 ("main(0x%p): notifying client(0x%p) to stop\n",
1161 PR_CurrentThread(), client[index].thread));
1162
1163 PR_Lock(client[index].ml);
1164 if (cs_run == client[index].state)
1165 {
1166 client[index].state = cs_stop;
1167 PR_Interrupt(client[index].thread);
1168 while (cs_stop == client[index].state)
1169 PR_WaitCondVar(
1170 client[index].stateChange, PR_INTERVAL_NO_TIMEOUT);
1171 }
1172 PR_Unlock(client[index].ml);
1173
1174 TEST_LOG(cltsrv_log_file, TEST_LOG_VERBOSE,
1175 ("main(0x%p): joining client(0x%p)\n",
1176 PR_CurrentThread(), client[index].thread));
1177
1178 joinStatus = PR_JoinThread(client[index].thread);
1179 TEST_ASSERT(PR_SUCCESS == joinStatus);
1180 PR_DestroyCondVar(client[index].stateChange);
1181 PR_DestroyLock(client[index].ml);
1182 }
1183 PR_DELETE(client);
1184 }
1185
1186 if (NULL != server)
1187 {
1188 /* All clients joined - retrieve the server */
1189 TEST_LOG(
1190 cltsrv_log_file, TEST_LOG_NOTICE,
1191 ("main(0x%p): notifying server(0x%p) to stop\n",
1192 PR_CurrentThread(), server->thread));
1193
1194 PR_Lock(server->ml);
1195 server->state = cs_stop;
1196 PR_Interrupt(server->thread);
1197 while (cs_exit != server->state)
1198 PR_WaitCondVar(server->stateChange, PR_INTERVAL_NO_TIMEOUT);
1199 PR_Unlock(server->ml);
1200
1201 TEST_LOG(
1202 cltsrv_log_file, TEST_LOG_NOTICE,
1203 ("main(0x%p): joining server(0x%p)\n",
1204 PR_CurrentThread(), server->thread));
1205 joinStatus = PR_JoinThread(server->thread);
1206 TEST_ASSERT(PR_SUCCESS == joinStatus);
1207
1208 PR_DestroyCondVar(server->stateChange);
1209 PR_DestroyCondVar(server->pool.exiting);
1210 PR_DestroyCondVar(server->pool.acceptComplete);
1211 PR_DestroyLock(server->ml);
1212 PR_DELETE(server);
1213 }
1214
1215 TEST_LOG(
1216 cltsrv_log_file, TEST_LOG_ALWAYS,
1217 ("main(0x%p): test complete\n", PR_CurrentThread()));
1218
1219 PT_FPrintStats(debug_out, "\nPThread Statistics\n");
1220
1221 TimeOfDayMessage("Test exiting at", PR_CurrentThread());
1222 PR_Cleanup();
1223 return 0;
1224} /* main */
1225
1226/* cltsrv.c */
Note: See TracBrowser for help on using the repository browser.

© 2024 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette