VirtualBox

source: vbox/trunk/src/libs/xpcom18a4/xpcom/io/nsStreamUtils.cpp@ 53433

Last change on this file since 53433 was 1, checked in by vboxsync, 55 years ago

import

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
File size: 18.7 KB
Line 
1/* ***** BEGIN LICENSE BLOCK *****
2 * Version: MPL 1.1/GPL 2.0/LGPL 2.1
3 *
4 * The contents of this file are subject to the Mozilla Public License Version
5 * 1.1 (the "License"); you may not use this file except in compliance with
6 * the License. You may obtain a copy of the License at
7 * http://www.mozilla.org/MPL/
8 *
9 * Software distributed under the License is distributed on an "AS IS" basis,
10 * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
11 * for the specific language governing rights and limitations under the
12 * License.
13 *
14 * The Original Code is Mozilla.
15 *
16 * The Initial Developer of the Original Code is
17 * Netscape Communications Corporation.
18 * Portions created by the Initial Developer are Copyright (C) 2002
19 * the Initial Developer. All Rights Reserved.
20 *
21 * Contributor(s):
22 * Darin Fisher <[email protected]>
23 *
24 * Alternatively, the contents of this file may be used under the terms of
25 * either the GNU General Public License Version 2 or later (the "GPL"), or
26 * the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
27 * in which case the provisions of the GPL or the LGPL are applicable instead
28 * of those above. If you wish to allow use of your version of this file only
29 * under the terms of either the GPL or the LGPL, and not to allow others to
30 * use your version of this file under the terms of the MPL, indicate your
31 * decision by deleting the provisions above and replace them with the notice
32 * and other provisions required by the GPL or the LGPL. If you do not delete
33 * the provisions above, a recipient may use your version of this file under
34 * the terms of any one of the MPL, the GPL or the LGPL.
35 *
36 * ***** END LICENSE BLOCK ***** */
37
38#include "nsStreamUtils.h"
39#include "nsCOMPtr.h"
40#include "nsIPipe.h"
41#include "nsIEventTarget.h"
42#include "nsAutoLock.h"
43
44//-----------------------------------------------------------------------------
45
46class nsInputStreamReadyEvent : public PLEvent
47 , public nsIInputStreamCallback
48{
49public:
50 NS_DECL_ISUPPORTS
51
52 nsInputStreamReadyEvent(nsIInputStreamCallback *callback,
53 nsIEventTarget *target)
54 : mCallback(callback)
55 , mTarget(target)
56 {
57 }
58
59private:
60 ~nsInputStreamReadyEvent()
61 {
62 if (mCallback) {
63 nsresult rv;
64 //
65 // whoa!! looks like we never posted this event. take care to
66 // release mCallback on the correct thread. if mTarget lives on the
67 // calling thread, then we are ok. otherwise, we have to try to
68 // proxy the Release over the right thread. if that thread is dead,
69 // then there's nothing we can do... better to leak than crash.
70 //
71 PRBool val;
72 rv = mTarget->IsOnCurrentThread(&val);
73 if (NS_FAILED(rv) || !val) {
74 nsCOMPtr<nsIInputStreamCallback> event;
75 NS_NewInputStreamReadyEvent(getter_AddRefs(event), mCallback, mTarget);
76 mCallback = 0;
77 if (event) {
78 rv = event->OnInputStreamReady(nsnull);
79 if (NS_FAILED(rv)) {
80 NS_NOTREACHED("leaking stream event");
81 nsISupports *sup = event;
82 NS_ADDREF(sup);
83 }
84 }
85 }
86 }
87 }
88
89public:
90 NS_IMETHOD OnInputStreamReady(nsIAsyncInputStream *stream)
91 {
92 mStream = stream;
93
94 // will be released when event is handled
95 NS_ADDREF_THIS();
96
97 PL_InitEvent(this, nsnull, EventHandler, EventCleanup);
98
99 if (NS_FAILED(mTarget->PostEvent(this))) {
100 NS_WARNING("PostEvent failed");
101 NS_RELEASE_THIS();
102 return NS_ERROR_FAILURE;
103 }
104
105 return NS_OK;
106 }
107
108private:
109 nsCOMPtr<nsIAsyncInputStream> mStream;
110 nsCOMPtr<nsIInputStreamCallback> mCallback;
111 nsCOMPtr<nsIEventTarget> mTarget;
112
113 PR_STATIC_CALLBACK(void *) EventHandler(PLEvent *plevent)
114 {
115 nsInputStreamReadyEvent *ev = (nsInputStreamReadyEvent *) plevent;
116 // bypass event delivery if this is a cleanup event...
117 if (ev->mCallback)
118 ev->mCallback->OnInputStreamReady(ev->mStream);
119 ev->mCallback = 0;
120 return NULL;
121 }
122
123 PR_STATIC_CALLBACK(void) EventCleanup(PLEvent *plevent)
124 {
125 nsInputStreamReadyEvent *ev = (nsInputStreamReadyEvent *) plevent;
126 NS_RELEASE(ev);
127 }
128};
129
130NS_IMPL_THREADSAFE_ISUPPORTS1(nsInputStreamReadyEvent,
131 nsIInputStreamCallback)
132
133//-----------------------------------------------------------------------------
134
135class nsOutputStreamReadyEvent : public PLEvent
136 , public nsIOutputStreamCallback
137{
138public:
139 NS_DECL_ISUPPORTS
140
141 nsOutputStreamReadyEvent(nsIOutputStreamCallback *callback,
142 nsIEventTarget *target)
143 : mCallback(callback)
144 , mTarget(target)
145 {
146 }
147
148private:
149 ~nsOutputStreamReadyEvent()
150 {
151 if (mCallback) {
152 nsresult rv;
153 //
154 // whoa!! looks like we never posted this event. take care to
155 // release mCallback on the correct thread. if mTarget lives on the
156 // calling thread, then we are ok. otherwise, we have to try to
157 // proxy the Release over the right thread. if that thread is dead,
158 // then there's nothing we can do... better to leak than crash.
159 //
160 PRBool val;
161 rv = mTarget->IsOnCurrentThread(&val);
162 if (NS_FAILED(rv) || !val) {
163 nsCOMPtr<nsIOutputStreamCallback> event;
164 NS_NewOutputStreamReadyEvent(getter_AddRefs(event), mCallback, mTarget);
165 mCallback = 0;
166 if (event) {
167 rv = event->OnOutputStreamReady(nsnull);
168 if (NS_FAILED(rv)) {
169 NS_NOTREACHED("leaking stream event");
170 nsISupports *sup = event;
171 NS_ADDREF(sup);
172 }
173 }
174 }
175 }
176 }
177
178public:
179 void Init(nsIOutputStreamCallback *callback, nsIEventTarget *target)
180 {
181 mCallback = callback;
182 mTarget = target;
183
184 PL_InitEvent(this, nsnull, EventHandler, EventCleanup);
185 }
186
187 NS_IMETHOD OnOutputStreamReady(nsIAsyncOutputStream *stream)
188 {
189 mStream = stream;
190
191 // this will be released when the event is handled
192 NS_ADDREF_THIS();
193
194 PL_InitEvent(this, nsnull, EventHandler, EventCleanup);
195
196 if (NS_FAILED(mTarget->PostEvent(this))) {
197 NS_WARNING("PostEvent failed");
198 NS_RELEASE_THIS();
199 return NS_ERROR_FAILURE;
200 }
201
202 return NS_OK;
203 }
204
205private:
206 nsCOMPtr<nsIAsyncOutputStream> mStream;
207 nsCOMPtr<nsIOutputStreamCallback> mCallback;
208 nsCOMPtr<nsIEventTarget> mTarget;
209
210 PR_STATIC_CALLBACK(void *) EventHandler(PLEvent *plevent)
211 {
212 nsOutputStreamReadyEvent *ev = (nsOutputStreamReadyEvent *) plevent;
213 if (ev->mCallback)
214 ev->mCallback->OnOutputStreamReady(ev->mStream);
215 ev->mCallback = 0;
216 return NULL;
217 }
218
219 PR_STATIC_CALLBACK(void) EventCleanup(PLEvent *ev)
220 {
221 nsOutputStreamReadyEvent *event = (nsOutputStreamReadyEvent *) ev;
222 NS_RELEASE(event);
223 }
224};
225
226NS_IMPL_THREADSAFE_ISUPPORTS1(nsOutputStreamReadyEvent,
227 nsIOutputStreamCallback)
228
229//-----------------------------------------------------------------------------
230
231NS_COM nsresult
232NS_NewInputStreamReadyEvent(nsIInputStreamCallback **event,
233 nsIInputStreamCallback *callback,
234 nsIEventTarget *target)
235{
236 nsInputStreamReadyEvent *ev = new nsInputStreamReadyEvent(callback, target);
237 if (!ev)
238 return NS_ERROR_OUT_OF_MEMORY;
239 NS_ADDREF(*event = ev);
240 return NS_OK;
241}
242
243NS_COM nsresult
244NS_NewOutputStreamReadyEvent(nsIOutputStreamCallback **event,
245 nsIOutputStreamCallback *callback,
246 nsIEventTarget *target)
247{
248 nsOutputStreamReadyEvent *ev = new nsOutputStreamReadyEvent(callback, target);
249 if (!ev)
250 return NS_ERROR_OUT_OF_MEMORY;
251 NS_ADDREF(*event = ev);
252 return NS_OK;
253}
254
255//-----------------------------------------------------------------------------
256// NS_AsyncCopy implementation
257
258// abstract stream copier...
259class nsAStreamCopier : public nsIInputStreamCallback
260 , public nsIOutputStreamCallback
261{
262public:
263 NS_DECL_ISUPPORTS
264
265 nsAStreamCopier()
266 : mLock(nsnull)
267 , mCallback(nsnull)
268 , mClosure(nsnull)
269 , mChunkSize(0)
270 , mEventInProcess(PR_FALSE)
271 , mEventIsPending(PR_FALSE)
272 {
273 }
274
275 // virtual since subclasses call superclass Release()
276 virtual ~nsAStreamCopier()
277 {
278 if (mLock)
279 PR_DestroyLock(mLock);
280 }
281
282 // kick off the async copy...
283 nsresult Start(nsIInputStream *source,
284 nsIOutputStream *sink,
285 nsIEventTarget *target,
286 nsAsyncCopyCallbackFun callback,
287 void *closure,
288 PRUint32 chunksize)
289 {
290 mSource = source;
291 mSink = sink;
292 mTarget = target;
293 mCallback = callback;
294 mClosure = closure;
295 mChunkSize = chunksize;
296
297 mLock = PR_NewLock();
298 if (!mLock)
299 return NS_ERROR_OUT_OF_MEMORY;
300
301 mAsyncSource = do_QueryInterface(mSource);
302 mAsyncSink = do_QueryInterface(mSink);
303
304 return PostContinuationEvent();
305 }
306
307 // implemented by subclasses, returns number of bytes copied and
308 // sets source and sink condition before returning.
309 virtual PRUint32 DoCopy(nsresult *sourceCondition, nsresult *sinkCondition) = 0;
310
311 void Process()
312 {
313 if (!mSource || !mSink)
314 return;
315
316 nsresult sourceCondition, sinkCondition;
317
318 // ok, copy data from source to sink.
319 for (;;) {
320 PRUint32 n = DoCopy(&sourceCondition, &sinkCondition);
321 if (NS_FAILED(sourceCondition) || NS_FAILED(sinkCondition) || n == 0) {
322 if (sourceCondition == NS_BASE_STREAM_WOULD_BLOCK && mAsyncSource) {
323 // need to wait for more data from source. while waiting for
324 // more source data, be sure to observe failures on output end.
325 mAsyncSource->AsyncWait(this, 0, 0, nsnull);
326
327 if (mAsyncSink)
328 mAsyncSink->AsyncWait(this,
329 nsIAsyncOutputStream::WAIT_CLOSURE_ONLY,
330 0, nsnull);
331 }
332 else if (sinkCondition == NS_BASE_STREAM_WOULD_BLOCK && mAsyncSink) {
333 // need to wait for more room in the sink. while waiting for
334 // more room in the sink, be sure to observer failures on the
335 // input end.
336 mAsyncSink->AsyncWait(this, 0, 0, nsnull);
337
338 if (mAsyncSource)
339 mAsyncSource->AsyncWait(this,
340 nsIAsyncInputStream::WAIT_CLOSURE_ONLY,
341 0, nsnull);
342 }
343 else {
344 // close source
345 if (mAsyncSource)
346 mAsyncSource->CloseWithStatus(sinkCondition);
347 else
348 mSource->Close();
349 mAsyncSource = nsnull;
350 mSource = nsnull;
351
352 // close sink
353 if (mAsyncSink)
354 mAsyncSink->CloseWithStatus(sourceCondition);
355 else
356 mSink->Close();
357 mAsyncSink = nsnull;
358 mSink = nsnull;
359
360 // notify state complete...
361 if (mCallback) {
362 nsresult status = sourceCondition;
363 if (NS_SUCCEEDED(status))
364 status = sinkCondition;
365 if (status == NS_BASE_STREAM_CLOSED)
366 status = NS_OK;
367 mCallback(mClosure, status);
368 }
369 }
370 break;
371 }
372 }
373 }
374
375 NS_IMETHOD OnInputStreamReady(nsIAsyncInputStream *source)
376 {
377 PostContinuationEvent();
378 return NS_OK;
379 }
380
381 NS_IMETHOD OnOutputStreamReady(nsIAsyncOutputStream *sink)
382 {
383 PostContinuationEvent();
384 return NS_OK;
385 }
386
387 PR_STATIC_CALLBACK(void*) HandleContinuationEvent(PLEvent *event)
388 {
389 nsAStreamCopier *self = (nsAStreamCopier *) event->owner;
390 self->Process();
391
392 // clear "in process" flag and post any pending continuation event
393 nsAutoLock lock(self->mLock);
394 self->mEventInProcess = PR_FALSE;
395 if (self->mEventIsPending) {
396 self->mEventIsPending = PR_FALSE;
397 self->PostContinuationEvent_Locked();
398 }
399 return nsnull;
400 }
401
402 PR_STATIC_CALLBACK(void) DestroyContinuationEvent(PLEvent *event)
403 {
404 nsAStreamCopier *self = (nsAStreamCopier *) event->owner;
405 NS_RELEASE(self);
406 delete event;
407 }
408
409 nsresult PostContinuationEvent()
410 {
411 // we cannot post a continuation event if there is currently
412 // an event in process. doing so could result in Process being
413 // run simultaneously on multiple threads, so we mark the event
414 // as pending, and if an event is already in process then we
415 // just let that existing event take care of posting the real
416 // continuation event.
417
418 nsAutoLock lock(mLock);
419 return PostContinuationEvent_Locked();
420 }
421
422 nsresult PostContinuationEvent_Locked()
423 {
424 nsresult rv = NS_OK;
425 if (mEventInProcess)
426 mEventIsPending = PR_TRUE;
427 else {
428 PLEvent *event = new PLEvent;
429 if (!event)
430 rv = NS_ERROR_OUT_OF_MEMORY;
431 else {
432 NS_ADDREF_THIS();
433 PL_InitEvent(event, this,
434 HandleContinuationEvent,
435 DestroyContinuationEvent);
436
437 rv = mTarget->PostEvent(event);
438 if (NS_SUCCEEDED(rv))
439 mEventInProcess = PR_TRUE;
440 else {
441 NS_ERROR("unable to post continuation event");
442 PL_DestroyEvent(event);
443 }
444 }
445 }
446 return rv;
447 }
448
449protected:
450 nsCOMPtr<nsIInputStream> mSource;
451 nsCOMPtr<nsIOutputStream> mSink;
452 nsCOMPtr<nsIAsyncInputStream> mAsyncSource;
453 nsCOMPtr<nsIAsyncOutputStream> mAsyncSink;
454 nsCOMPtr<nsIEventTarget> mTarget;
455 PRLock *mLock;
456 nsAsyncCopyCallbackFun mCallback;
457 void *mClosure;
458 PRUint32 mChunkSize;
459 PRPackedBool mEventInProcess;
460 PRPackedBool mEventIsPending;
461};
462
463NS_IMPL_THREADSAFE_ISUPPORTS2(nsAStreamCopier,
464 nsIInputStreamCallback,
465 nsIOutputStreamCallback)
466
467class nsStreamCopierIB : public nsAStreamCopier
468{
469public:
470 nsStreamCopierIB() : nsAStreamCopier() {}
471 virtual ~nsStreamCopierIB() {}
472
473 struct ReadSegmentsState {
474 nsIOutputStream *mSink;
475 nsresult mSinkCondition;
476 };
477
478 static NS_METHOD ConsumeInputBuffer(nsIInputStream *inStr,
479 void *closure,
480 const char *buffer,
481 PRUint32 offset,
482 PRUint32 count,
483 PRUint32 *countWritten)
484 {
485 ReadSegmentsState *state = (ReadSegmentsState *) closure;
486
487 nsresult rv = state->mSink->Write(buffer, count, countWritten);
488 if (NS_FAILED(rv))
489 state->mSinkCondition = rv;
490 else if (*countWritten == 0)
491 state->mSinkCondition = NS_BASE_STREAM_CLOSED;
492
493 return state->mSinkCondition;
494 }
495
496 PRUint32 DoCopy(nsresult *sourceCondition, nsresult *sinkCondition)
497 {
498 ReadSegmentsState state;
499 state.mSink = mSink;
500 state.mSinkCondition = NS_OK;
501
502 PRUint32 n;
503 *sourceCondition =
504 mSource->ReadSegments(ConsumeInputBuffer, &state, mChunkSize, &n);
505 *sinkCondition = state.mSinkCondition;
506 return n;
507 }
508};
509
510class nsStreamCopierOB : public nsAStreamCopier
511{
512public:
513 nsStreamCopierOB() : nsAStreamCopier() {}
514 virtual ~nsStreamCopierOB() {}
515
516 struct WriteSegmentsState {
517 nsIInputStream *mSource;
518 nsresult mSourceCondition;
519 };
520
521 static NS_METHOD FillOutputBuffer(nsIOutputStream *outStr,
522 void *closure,
523 char *buffer,
524 PRUint32 offset,
525 PRUint32 count,
526 PRUint32 *countRead)
527 {
528 WriteSegmentsState *state = (WriteSegmentsState *) closure;
529
530 nsresult rv = state->mSource->Read(buffer, count, countRead);
531 if (NS_FAILED(rv))
532 state->mSourceCondition = rv;
533 else if (*countRead == 0)
534 state->mSourceCondition = NS_BASE_STREAM_CLOSED;
535
536 return state->mSourceCondition;
537 }
538
539 PRUint32 DoCopy(nsresult *sourceCondition, nsresult *sinkCondition)
540 {
541 WriteSegmentsState state;
542 state.mSource = mSource;
543 state.mSourceCondition = NS_OK;
544
545 PRUint32 n;
546 *sinkCondition =
547 mSink->WriteSegments(FillOutputBuffer, &state, mChunkSize, &n);
548 *sourceCondition = state.mSourceCondition;
549 return n;
550 }
551};
552
553//-----------------------------------------------------------------------------
554
555NS_COM nsresult
556NS_AsyncCopy(nsIInputStream *source,
557 nsIOutputStream *sink,
558 nsIEventTarget *target,
559 nsAsyncCopyMode mode,
560 PRUint32 chunkSize,
561 nsAsyncCopyCallbackFun callback,
562 void *closure)
563{
564 NS_ASSERTION(target, "non-null target required");
565
566 nsresult rv;
567 nsAStreamCopier *copier;
568
569 if (mode == NS_ASYNCCOPY_VIA_READSEGMENTS)
570 copier = new nsStreamCopierIB();
571 else
572 copier = new nsStreamCopierOB();
573
574 if (!copier)
575 return NS_ERROR_OUT_OF_MEMORY;
576
577 // Start() takes an owning ref to the copier...
578 NS_ADDREF(copier);
579 rv = copier->Start(source, sink, target, callback, closure, chunkSize);
580 NS_RELEASE(copier);
581
582 return rv;
583}
Note: See TracBrowser for help on using the repository browser.

© 2024 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette