VirtualBox

source: vbox/trunk/src/libs/xpcom18a4/nsprpub/pr/tests/thrpool_server.c@ 1

Last change on this file since 1 was 1, checked in by vboxsync, 55 years ago

import

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
File size: 16.2 KB
Line 
1/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2/* ***** BEGIN LICENSE BLOCK *****
3 * Version: MPL 1.1/GPL 2.0/LGPL 2.1
4 *
5 * The contents of this file are subject to the Mozilla Public License Version
6 * 1.1 (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 * http://www.mozilla.org/MPL/
9 *
10 * Software distributed under the License is distributed on an "AS IS" basis,
11 * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
12 * for the specific language governing rights and limitations under the
13 * License.
14 *
15 * The Original Code is the Netscape Portable Runtime (NSPR).
16 *
17 * The Initial Developer of the Original Code is
18 * Netscape Communications Corporation.
19 * Portions created by the Initial Developer are Copyright (C) 1999-2000
20 * the Initial Developer. All Rights Reserved.
21 *
22 * Contributor(s):
23 *
24 * Alternatively, the contents of this file may be used under the terms of
25 * either the GNU General Public License Version 2 or later (the "GPL"), or
26 * the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
27 * in which case the provisions of the GPL or the LGPL are applicable instead
28 * of those above. If you wish to allow use of your version of this file only
29 * under the terms of either the GPL or the LGPL, and not to allow others to
30 * use your version of this file under the terms of the MPL, indicate your
31 * decision by deleting the provisions above and replace them with the notice
32 * and other provisions required by the GPL or the LGPL. If you do not delete
33 * the provisions above, a recipient may use your version of this file under
34 * the terms of any one of the MPL, the GPL or the LGPL.
35 *
36 * ***** END LICENSE BLOCK ***** */
37
38/***********************************************************************
39**
40** Name: thrpool.c
41**
42** Description: Test threadpool functionality.
43**
44** Modification History:
45*/
46#include "primpl.h"
47
48#include "plgetopt.h"
49
50#include <stdio.h>
51#include <string.h>
52#include <errno.h>
53#ifdef XP_UNIX
54#include <sys/mman.h>
55#endif
56#if defined(_PR_PTHREADS) && !defined(_PR_DCETHREADS)
57#include <pthread.h>
58#endif
59
60/* for getcwd */
61#if defined(XP_UNIX) || defined (XP_OS2_EMX) || defined(XP_BEOS)
62#include <unistd.h>
63#elif defined(XP_PC)
64#include <direct.h>
65#endif
66
67#ifdef WIN32
68#include <process.h>
69#endif
70
71static int _debug_on = 0;
72static char *program_name = NULL;
73static void serve_client_write(void *arg);
74
75#ifdef XP_MAC
76#include "prlog.h"
77#include "prsem.h"
78int fprintf(FILE *stream, const char *fmt, ...)
79{
80 PR_LogPrint(fmt);
81 return 0;
82}
83#define printf PR_LogPrint
84extern void SetupMacPrintfLog(char *logFile);
85#else
86#include "obsolete/prsem.h"
87#endif
88
89#ifdef XP_PC
90#define mode_t int
91#endif
92
93#define DPRINTF(arg) if (_debug_on) printf arg
94
95
96#define BUF_DATA_SIZE (2 * 1024)
97#define TCP_MESG_SIZE 1024
98#define NUM_TCP_CLIENTS 10 /* for a listen queue depth of 5 */
99
100
101#define NUM_TCP_CONNECTIONS_PER_CLIENT 10
102#define NUM_TCP_MESGS_PER_CONNECTION 10
103#define TCP_SERVER_PORT 10000
104#define SERVER_MAX_BIND_COUNT 100
105
106static PRInt32 num_tcp_clients = NUM_TCP_CLIENTS;
107static PRInt32 num_tcp_connections_per_client = NUM_TCP_CONNECTIONS_PER_CLIENT;
108static PRInt32 tcp_mesg_size = TCP_MESG_SIZE;
109static PRInt32 num_tcp_mesgs_per_connection = NUM_TCP_MESGS_PER_CONNECTION;
110static void TCP_Server_Accept(void *arg);
111
112
113int failed_already=0;
114typedef struct buffer {
115 char data[BUF_DATA_SIZE];
116} buffer;
117
118
119typedef struct Server_Param {
120 PRJobIoDesc iod; /* socket to read from/write to */
121 PRInt32 datalen; /* bytes of data transfered in each read/write */
122 PRNetAddr netaddr;
123 PRMonitor *exit_mon; /* monitor to signal on exit */
124 PRInt32 *job_counterp; /* counter to decrement, before exit */
125 PRInt32 conn_counter; /* counter to decrement, before exit */
126 PRThreadPool *tp;
127} Server_Param;
128
129typedef struct Serve_Client_Param {
130 PRJobIoDesc iod; /* socket to read from/write to */
131 PRInt32 datalen; /* bytes of data transfered in each read/write */
132 PRMonitor *exit_mon; /* monitor to signal on exit */
133 PRInt32 *job_counterp; /* counter to decrement, before exit */
134 PRThreadPool *tp;
135} Serve_Client_Param;
136
137typedef struct Session {
138 PRJobIoDesc iod; /* socket to read from/write to */
139 buffer *in_buf;
140 PRInt32 bytes;
141 PRInt32 msg_num;
142 PRInt32 bytes_read;
143 PRMonitor *exit_mon; /* monitor to signal on exit */
144 PRInt32 *job_counterp; /* counter to decrement, before exit */
145 PRThreadPool *tp;
146} Session;
147
148static void
149serve_client_read(void *arg)
150{
151 Session *sp = (Session *) arg;
152 int rem;
153 int bytes;
154 int offset;
155 PRFileDesc *sockfd;
156 char *buf;
157 PRJob *jobp;
158
159 PRIntervalTime timeout = PR_INTERVAL_NO_TIMEOUT;
160
161 sockfd = sp->iod.socket;
162 buf = sp->in_buf->data;
163
164 PR_ASSERT(sp->msg_num < num_tcp_mesgs_per_connection);
165 PR_ASSERT(sp->bytes_read < sp->bytes);
166
167 offset = sp->bytes_read;
168 rem = sp->bytes - offset;
169 bytes = PR_Recv(sockfd, buf + offset, rem, 0, timeout);
170 if (bytes < 0) {
171 return;
172 }
173 sp->bytes_read += bytes;
174 sp->iod.timeout = PR_SecondsToInterval(60);
175 if (sp->bytes_read < sp->bytes) {
176 jobp = PR_QueueJob_Read(sp->tp, &sp->iod, serve_client_read, sp,
177 PR_FALSE);
178 PR_ASSERT(NULL != jobp);
179 return;
180 }
181 PR_ASSERT(sp->bytes_read == sp->bytes);
182 DPRINTF(("serve_client: read complete, msg(%d) \n", sp->msg_num));
183
184 sp->iod.timeout = PR_SecondsToInterval(60);
185 jobp = PR_QueueJob_Write(sp->tp, &sp->iod, serve_client_write, sp,
186 PR_FALSE);
187 PR_ASSERT(NULL != jobp);
188
189 return;
190}
191
192static void
193serve_client_write(void *arg)
194{
195 Session *sp = (Session *) arg;
196 int bytes;
197 PRFileDesc *sockfd;
198 char *buf;
199 PRJob *jobp;
200
201 sockfd = sp->iod.socket;
202 buf = sp->in_buf->data;
203
204 PR_ASSERT(sp->msg_num < num_tcp_mesgs_per_connection);
205
206 bytes = PR_Send(sockfd, buf, sp->bytes, 0, PR_INTERVAL_NO_TIMEOUT);
207 PR_ASSERT(bytes == sp->bytes);
208
209 if (bytes < 0) {
210 return;
211 }
212 DPRINTF(("serve_client: write complete, msg(%d) \n", sp->msg_num));
213 sp->msg_num++;
214 if (sp->msg_num < num_tcp_mesgs_per_connection) {
215 sp->bytes_read = 0;
216 sp->iod.timeout = PR_SecondsToInterval(60);
217 jobp = PR_QueueJob_Read(sp->tp, &sp->iod, serve_client_read, sp,
218 PR_FALSE);
219 PR_ASSERT(NULL != jobp);
220 return;
221 }
222
223 DPRINTF(("serve_client: read/write complete, msg(%d) \n", sp->msg_num));
224 if (PR_Shutdown(sockfd, PR_SHUTDOWN_BOTH) < 0) {
225 fprintf(stderr,"%s: ERROR - PR_Shutdown\n", program_name);
226 }
227
228 PR_Close(sockfd);
229 PR_EnterMonitor(sp->exit_mon);
230 --(*sp->job_counterp);
231 PR_Notify(sp->exit_mon);
232 PR_ExitMonitor(sp->exit_mon);
233
234 PR_DELETE(sp->in_buf);
235 PR_DELETE(sp);
236
237 return;
238}
239
240/*
241 * Serve_Client
242 * Thread, started by the server, for serving a client connection.
243 * Reads data from socket and writes it back, unmodified, and
244 * closes the socket
245 */
246static void PR_CALLBACK
247Serve_Client(void *arg)
248{
249 Serve_Client_Param *scp = (Serve_Client_Param *) arg;
250 buffer *in_buf;
251 Session *sp;
252 PRJob *jobp;
253
254 sp = PR_NEW(Session);
255 sp->iod = scp->iod;
256
257 in_buf = PR_NEW(buffer);
258 if (in_buf == NULL) {
259 fprintf(stderr,"%s: failed to alloc buffer struct\n",program_name);
260 failed_already=1;
261 return;
262 }
263
264 sp->in_buf = in_buf;
265 sp->bytes = scp->datalen;
266 sp->msg_num = 0;
267 sp->bytes_read = 0;
268 sp->tp = scp->tp;
269 sp->exit_mon = scp->exit_mon;
270 sp->job_counterp = scp->job_counterp;
271
272 sp->iod.timeout = PR_SecondsToInterval(60);
273 jobp = PR_QueueJob_Read(sp->tp, &sp->iod, serve_client_read, sp,
274 PR_FALSE);
275 PR_ASSERT(NULL != jobp);
276 PR_DELETE(scp);
277}
278
279static void
280print_stats(void *arg)
281{
282 Server_Param *sp = (Server_Param *) arg;
283 PRThreadPool *tp = sp->tp;
284 PRInt32 counter;
285 PRJob *jobp;
286
287 PR_EnterMonitor(sp->exit_mon);
288 counter = (*sp->job_counterp);
289 PR_ExitMonitor(sp->exit_mon);
290
291 printf("PRINT_STATS: #client connections = %d\n",counter);
292
293
294 jobp = PR_QueueJob_Timer(tp, PR_MillisecondsToInterval(500),
295 print_stats, sp, PR_FALSE);
296
297 PR_ASSERT(NULL != jobp);
298}
299
300static int job_counter = 0;
301/*
302 * TCP Server
303 * Server binds an address to a socket, starts a client process and
304 * listens for incoming connections.
305 * Each client connects to the server and sends a chunk of data
306 * Starts a Serve_Client job for each incoming connection, to read
307 * the data from the client and send it back to the client, unmodified.
308 * Each client checks that data received from server is same as the
309 * data it sent to the server.
310 * Finally, the threadpool is shutdown
311 */
312static void PR_CALLBACK
313TCP_Server(void *arg)
314{
315 PRThreadPool *tp = (PRThreadPool *) arg;
316 Server_Param *sp;
317 PRFileDesc *sockfd;
318 PRNetAddr netaddr;
319 PRMonitor *sc_mon;
320 PRJob *jobp;
321 int i;
322 PRStatus rval;
323
324 /*
325 * Create a tcp socket
326 */
327 if ((sockfd = PR_NewTCPSocket()) == NULL) {
328 fprintf(stderr,"%s: PR_NewTCPSocket failed\n", program_name);
329 return;
330 }
331 memset(&netaddr, 0 , sizeof(netaddr));
332 netaddr.inet.family = PR_AF_INET;
333 netaddr.inet.port = PR_htons(TCP_SERVER_PORT);
334 netaddr.inet.ip = PR_htonl(PR_INADDR_ANY);
335 /*
336 * try a few times to bind server's address, if addresses are in
337 * use
338 */
339 i = 0;
340 while (PR_Bind(sockfd, &netaddr) < 0) {
341 if (PR_GetError() == PR_ADDRESS_IN_USE_ERROR) {
342 netaddr.inet.port += 2;
343 if (i++ < SERVER_MAX_BIND_COUNT)
344 continue;
345 }
346 fprintf(stderr,"%s: ERROR - PR_Bind failed\n", program_name);
347 perror("PR_Bind");
348 failed_already=1;
349 return;
350 }
351
352 if (PR_Listen(sockfd, 32) < 0) {
353 fprintf(stderr,"%s: ERROR - PR_Listen failed\n", program_name);
354 failed_already=1;
355 return;
356 }
357
358 if (PR_GetSockName(sockfd, &netaddr) < 0) {
359 fprintf(stderr,"%s: ERROR - PR_GetSockName failed\n", program_name);
360 failed_already=1;
361 return;
362 }
363
364 DPRINTF((
365 "TCP_Server: PR_BIND netaddr.inet.ip = 0x%lx, netaddr.inet.port = %d\n",
366 netaddr.inet.ip, netaddr.inet.port));
367
368 sp = PR_NEW(Server_Param);
369 if (sp == NULL) {
370 fprintf(stderr,"%s: PR_NEW failed\n", program_name);
371 failed_already=1;
372 return;
373 }
374 sp->iod.socket = sockfd;
375 sp->iod.timeout = PR_SecondsToInterval(60);
376 sp->datalen = tcp_mesg_size;
377 sp->exit_mon = sc_mon;
378 sp->job_counterp = &job_counter;
379 sp->conn_counter = 0;
380 sp->tp = tp;
381 sp->netaddr = netaddr;
382
383 /* create and cancel an io job */
384 jobp = PR_QueueJob_Accept(tp, &sp->iod, TCP_Server_Accept, sp,
385 PR_FALSE);
386 PR_ASSERT(NULL != jobp);
387 rval = PR_CancelJob(jobp);
388 PR_ASSERT(PR_SUCCESS == rval);
389
390 /*
391 * create the client process
392 */
393 {
394#define MAX_ARGS 4
395 char *argv[MAX_ARGS + 1];
396 int index = 0;
397 char port[32];
398 char path[1024 + sizeof("/thrpool_client")];
399 (void)getcwd(path, sizeof(path));
400 (void)strcat(path, "/thrpool_client");
401#ifdef XP_PC
402 (void)strcat(path, ".exe");
403#endif
404 argv[index++] = path;
405 sprintf(port,"%d",PR_ntohs(netaddr.inet.port));
406 if (_debug_on)
407 {
408 argv[index++] = "-d";
409 argv[index++] = "-p";
410 argv[index++] = port;
411 argv[index++] = NULL;
412 } else {
413 argv[index++] = "-p";
414 argv[index++] = port;
415 argv[index++] = NULL;
416 }
417 PR_ASSERT(MAX_ARGS >= (index - 1));
418
419 DPRINTF(("creating client process %s ...\n", path));
420 if (PR_FAILURE == PR_CreateProcessDetached(path, argv, NULL, NULL)) {
421 fprintf(stderr,
422 "thrpool_server: ERROR - PR_CreateProcessDetached failed\n");
423 failed_already=1;
424 return;
425 }
426 }
427
428 sc_mon = PR_NewMonitor();
429 if (sc_mon == NULL) {
430 fprintf(stderr,"%s: PR_NewMonitor failed\n", program_name);
431 failed_already=1;
432 return;
433 }
434
435 sp->iod.socket = sockfd;
436 sp->iod.timeout = PR_SecondsToInterval(60);
437 sp->datalen = tcp_mesg_size;
438 sp->exit_mon = sc_mon;
439 sp->job_counterp = &job_counter;
440 sp->conn_counter = 0;
441 sp->tp = tp;
442 sp->netaddr = netaddr;
443
444 /* create and cancel a timer job */
445 jobp = PR_QueueJob_Timer(tp, PR_MillisecondsToInterval(5000),
446 print_stats, sp, PR_FALSE);
447 PR_ASSERT(NULL != jobp);
448 rval = PR_CancelJob(jobp);
449 PR_ASSERT(PR_SUCCESS == rval);
450
451 DPRINTF(("TCP_Server: Accepting connections \n"));
452
453 jobp = PR_QueueJob_Accept(tp, &sp->iod, TCP_Server_Accept, sp,
454 PR_FALSE);
455 PR_ASSERT(NULL != jobp);
456 return;
457}
458
459static void
460TCP_Server_Accept(void *arg)
461{
462 Server_Param *sp = (Server_Param *) arg;
463 PRThreadPool *tp = sp->tp;
464 Serve_Client_Param *scp;
465 PRFileDesc *newsockfd;
466 PRJob *jobp;
467
468 if ((newsockfd = PR_Accept(sp->iod.socket, &sp->netaddr,
469 PR_INTERVAL_NO_TIMEOUT)) == NULL) {
470 fprintf(stderr,"%s: ERROR - PR_Accept failed\n", program_name);
471 failed_already=1;
472 goto exit;
473 }
474 scp = PR_NEW(Serve_Client_Param);
475 if (scp == NULL) {
476 fprintf(stderr,"%s: PR_NEW failed\n", program_name);
477 failed_already=1;
478 goto exit;
479 }
480
481 /*
482 * Start a Serve_Client job for each incoming connection
483 */
484 scp->iod.socket = newsockfd;
485 scp->iod.timeout = PR_SecondsToInterval(60);
486 scp->datalen = tcp_mesg_size;
487 scp->exit_mon = sp->exit_mon;
488 scp->job_counterp = sp->job_counterp;
489 scp->tp = sp->tp;
490
491 PR_EnterMonitor(sp->exit_mon);
492 (*sp->job_counterp)++;
493 PR_ExitMonitor(sp->exit_mon);
494 jobp = PR_QueueJob(tp, Serve_Client, scp,
495 PR_FALSE);
496
497 PR_ASSERT(NULL != jobp);
498 DPRINTF(("TCP_Server: Created Serve_Client = 0x%lx\n", jobp));
499
500 /*
501 * single-threaded update; no lock needed
502 */
503 sp->conn_counter++;
504 if (sp->conn_counter <
505 (num_tcp_clients * num_tcp_connections_per_client)) {
506 jobp = PR_QueueJob_Accept(tp, &sp->iod, TCP_Server_Accept, sp,
507 PR_FALSE);
508 PR_ASSERT(NULL != jobp);
509 return;
510 }
511 jobp = PR_QueueJob_Timer(tp, PR_MillisecondsToInterval(500),
512 print_stats, sp, PR_FALSE);
513
514 PR_ASSERT(NULL != jobp);
515 DPRINTF(("TCP_Server: Created print_stats timer job = 0x%lx\n", jobp));
516
517exit:
518 PR_EnterMonitor(sp->exit_mon);
519 /* Wait for server jobs to finish */
520 while (0 != *sp->job_counterp) {
521 PR_Wait(sp->exit_mon, PR_INTERVAL_NO_TIMEOUT);
522 DPRINTF(("TCP_Server: conn_counter = %d\n",
523 *sp->job_counterp));
524 }
525
526 PR_ExitMonitor(sp->exit_mon);
527 if (sp->iod.socket) {
528 PR_Close(sp->iod.socket);
529 }
530 PR_DestroyMonitor(sp->exit_mon);
531 printf("%30s","TCP_Socket_Client_Server_Test:");
532 printf("%2ld Server %2ld Clients %2ld connections_per_client\n",1l,
533 num_tcp_clients, num_tcp_connections_per_client);
534 printf("%30s %2ld messages_per_connection %4ld bytes_per_message\n",":",
535 num_tcp_mesgs_per_connection, tcp_mesg_size);
536
537 DPRINTF(("%s: calling PR_ShutdownThreadPool\n", program_name));
538 PR_ShutdownThreadPool(sp->tp);
539 PR_DELETE(sp);
540}
541
542/************************************************************************/
543
544#define DEFAULT_INITIAL_THREADS 4
545#define DEFAULT_MAX_THREADS 100
546#define DEFAULT_STACKSIZE (512 * 1024)
547
548int
549main(int argc, char **argv)
550{
551 PRInt32 initial_threads = DEFAULT_INITIAL_THREADS;
552 PRInt32 max_threads = DEFAULT_MAX_THREADS;
553 PRInt32 stacksize = DEFAULT_STACKSIZE;
554 PRThreadPool *tp = NULL;
555 PRStatus rv;
556 PRJob *jobp;
557
558 /*
559 * -d debug mode
560 */
561 PLOptStatus os;
562 PLOptState *opt;
563
564 program_name = argv[0];
565 opt = PL_CreateOptState(argc, argv, "d");
566 while (PL_OPT_EOL != (os = PL_GetNextOpt(opt)))
567 {
568 if (PL_OPT_BAD == os) continue;
569 switch (opt->option)
570 {
571 case 'd': /* debug mode */
572 _debug_on = 1;
573 break;
574 default:
575 break;
576 }
577 }
578 PL_DestroyOptState(opt);
579
580 PR_Init(PR_USER_THREAD, PR_PRIORITY_NORMAL, 0);
581 PR_STDIO_INIT();
582
583#ifdef XP_MAC
584 SetupMacPrintfLog("socket.log");
585#endif
586 PR_SetConcurrency(4);
587
588 tp = PR_CreateThreadPool(initial_threads, max_threads, stacksize);
589 if (NULL == tp) {
590 printf("PR_CreateThreadPool failed\n");
591 failed_already=1;
592 goto done;
593 }
594 jobp = PR_QueueJob(tp, TCP_Server, tp, PR_TRUE);
595 rv = PR_JoinJob(jobp);
596 PR_ASSERT(PR_SUCCESS == rv);
597
598 DPRINTF(("%s: calling PR_JoinThreadPool\n", program_name));
599 rv = PR_JoinThreadPool(tp);
600 PR_ASSERT(PR_SUCCESS == rv);
601 DPRINTF(("%s: returning from PR_JoinThreadPool\n", program_name));
602
603done:
604 PR_Cleanup();
605 if (failed_already) return 1;
606 else return 0;
607}
Note: See TracBrowser for help on using the repository browser.

© 2024 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette