VirtualBox

source: vbox/trunk/src/VBox/Runtime/common/vfs/vfsreadahead.cpp@ 76734

Last change on this file since 76734 was 76553, checked in by vboxsync, 6 years ago

scm --update-copyright-year

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
File size: 34.2 KB
Line 
1/* $Id: vfsreadahead.cpp 76553 2019-01-01 01:45:53Z vboxsync $ */
2/** @file
3 * IPRT - Virtual File System, Read-Ahead Thread.
4 */
5
6/*
7 * Copyright (C) 2010-2019 Oracle Corporation
8 *
9 * This file is part of VirtualBox Open Source Edition (OSE), as
10 * available from http://www.virtualbox.org. This file is free software;
11 * you can redistribute it and/or modify it under the terms of the GNU
12 * General Public License (GPL) as published by the Free Software
13 * Foundation, in version 2 as it comes in the "COPYING" file of the
14 * VirtualBox OSE distribution. VirtualBox OSE is distributed in the
15 * hope that it will be useful, but WITHOUT ANY WARRANTY of any kind.
16 *
17 * The contents of this file may alternatively be used under the terms
18 * of the Common Development and Distribution License Version 1.0
19 * (CDDL) only, as it comes in the "COPYING.CDDL" file of the
20 * VirtualBox OSE distribution, in which case the provisions of the
21 * CDDL are applicable instead of those of the GPL.
22 *
23 * You may elect to license modified versions of this file under the
24 * terms and conditions of either the GPL or the CDDL or both.
25 */
26
27
28/*********************************************************************************************************************************
29* Header Files *
30*********************************************************************************************************************************/
31#define LOG_GROUP RTLOGGROUP_VFS
32#include "internal/iprt.h"
33#include <iprt/vfs.h>
34
35#include <iprt/asm.h>
36#include <iprt/assert.h>
37#include <iprt/err.h>
38#include <iprt/file.h>
39#include <iprt/list.h>
40#include <iprt/log.h>
41#include <iprt/poll.h>
42#include <iprt/string.h>
43#include <iprt/vfslowlevel.h>
44
45
46
47/*********************************************************************************************************************************
48* Header Files *
49*********************************************************************************************************************************/
50#include "internal/iprt.h"
51#include <iprt/vfs.h>
52
53#include <iprt/critsect.h>
54#include <iprt/err.h>
55#include <iprt/mem.h>
56#include <iprt/thread.h>
57
58
59/*********************************************************************************************************************************
60* Structures and Typedefs *
61*********************************************************************************************************************************/
62/**
63 * Buffer descriptor.
64 */
65typedef struct RTVFSREADAHEADBUFDESC
66{
67 /** List entry. */
68 RTLISTNODE ListEntry;
69 /** The offset of this extent within the file. */
70 uint64_t off;
71 /** The amount of the buffer that has been filled.
72 * (Buffer size is RTVFSREADAHEAD::cbBuffer.) */
73 uint32_t cbFilled;
74 /** */
75 uint32_t volatile fReserved;
76 /** Pointer to the buffer. */
77 uint8_t *pbBuffer;
78} RTVFSREADAHEADBUFDESC;
79/** Pointer to a memory file extent. */
80typedef RTVFSREADAHEADBUFDESC *PRTVFSREADAHEADBUFDESC;
81
82/**
83 * Read ahead file or I/O stream.
84 */
85typedef struct RTVFSREADAHEAD
86{
87 /** The I/O critical section (protects offActual).
88 * The thread doing I/O or seeking always need to own this. */
89 RTCRITSECT IoCritSect;
90
91 /** The critical section protecting the buffer lists and offConsumer.
92 *
93 * This can be taken while holding IoCritSect as that eliminates a race
94 * condition between the read ahead thread inserting into ConsumerList and
95 * a consumer thread deciding to do a direct read. */
96 RTCRITSECT BufferCritSect;
97 /** List of buffers available for consumption.
98 * The producer thread (hThread) puts buffers into this list once it's done
99 * reading into them. The consumer moves them to the FreeList once the
100 * current position has passed beyond each buffer. */
101 RTLISTANCHOR ConsumerList;
102 /** List of buffers available for the producer. */
103 RTLISTANCHOR FreeList;
104
105 /** The current file position from the consumer point of view. */
106 uint64_t offConsumer;
107
108 /** The end-of-file(/stream) offset. This is initially UINT64_MAX and later
109 * set when reading past EOF. */
110 uint64_t offEof;
111
112 /** The read ahead thread. */
113 RTTHREAD hThread;
114 /** Set when we want the thread to terminate. */
115 bool volatile fTerminateThread;
116 /** Creation flags. */
117 uint32_t fFlags;
118
119 /** The I/O stream we read from. */
120 RTVFSIOSTREAM hIos;
121 /** The file face of hIos, if we're fronting for an actual file. */
122 RTVFSFILE hFile;
123 /** The buffer size. */
124 uint32_t cbBuffer;
125 /** The number of buffers. */
126 uint32_t cBuffers;
127 /** Single big buffer allocation, cBuffers * cbBuffer in size. */
128 uint8_t *pbAllBuffers;
129 /** Array of buffer descriptors (cBuffers in size). */
130 RTVFSREADAHEADBUFDESC aBufDescs[1];
131} RTVFSREADAHEAD;
132/** Pointer to a memory file. */
133typedef RTVFSREADAHEAD *PRTVFSREADAHEAD;
134
135
136
137/**
138 * @interface_method_impl{RTVFSOBJOPS,pfnClose}
139 */
140static DECLCALLBACK(int) rtVfsReadAhead_Close(void *pvThis)
141{
142 PRTVFSREADAHEAD pThis = (PRTVFSREADAHEAD)pvThis;
143 int rc;
144
145 /*
146 * Stop the read-ahead thread.
147 */
148 if (pThis->hThread != NIL_RTTHREAD)
149 {
150 ASMAtomicWriteBool(&pThis->fTerminateThread, true);
151 rc = RTThreadUserSignal(pThis->hThread);
152 AssertRC(rc);
153 rc = RTThreadWait(pThis->hThread, RT_INDEFINITE_WAIT, NULL);
154 AssertRCReturn(rc, rc);
155 pThis->hThread = NIL_RTTHREAD;
156 }
157
158 /*
159 * Release the upstream objects.
160 */
161 RTCritSectEnter(&pThis->IoCritSect);
162
163 RTVfsIoStrmRelease(pThis->hIos);
164 pThis->hIos = NIL_RTVFSIOSTREAM;
165 RTVfsFileRelease(pThis->hFile);
166 pThis->hFile = NIL_RTVFSFILE;
167
168 RTCritSectLeave(&pThis->IoCritSect);
169
170 /*
171 * Free the buffers.
172 */
173 RTCritSectEnter(&pThis->BufferCritSect);
174 if (pThis->pbAllBuffers)
175 {
176 RTMemPageFree(pThis->pbAllBuffers, pThis->cBuffers * pThis->cbBuffer);
177 pThis->pbAllBuffers = NULL;
178 }
179 RTCritSectLeave(&pThis->BufferCritSect);
180
181 /*
182 * Destroy the critical sections.
183 */
184 RTCritSectDelete(&pThis->BufferCritSect);
185 RTCritSectDelete(&pThis->IoCritSect);
186
187 return VINF_SUCCESS;
188}
189
190
191/**
192 * @interface_method_impl{RTVFSOBJOPS,pfnQueryInfo}
193 */
194static DECLCALLBACK(int) rtVfsReadAhead_QueryInfo(void *pvThis, PRTFSOBJINFO pObjInfo, RTFSOBJATTRADD enmAddAttr)
195{
196 PRTVFSREADAHEAD pThis = (PRTVFSREADAHEAD)pvThis;
197 return RTVfsIoStrmQueryInfo(pThis->hIos, pObjInfo, enmAddAttr);
198}
199
200
201/**
202 * @interface_method_impl{RTVFSIOSTREAMOPS,pfnRead}
203 */
204static DECLCALLBACK(int) rtVfsReadAhead_Read(void *pvThis, RTFOFF off, PCRTSGBUF pSgBuf, bool fBlocking, size_t *pcbRead)
205{
206 PRTVFSREADAHEAD pThis = (PRTVFSREADAHEAD)pvThis;
207
208 Assert(pSgBuf->cSegs == 1); /* Caller deals with multiple SGs. */
209
210 /*
211 * We loop here to repeat the buffer search after entering the I/O critical
212 * section, just in case a buffer got inserted while we were waiting for it.
213 */
214 int rc = VINF_SUCCESS;
215 uint8_t *pbDst = (uint8_t *)pSgBuf->paSegs[0].pvSeg;
216 size_t cbDst = pSgBuf->paSegs[0].cbSeg;
217 size_t cbTotalRead = 0;
218 bool fPokeReader = false;
219 bool fOwnsIoCritSect = false;
220 RTCritSectEnter(&pThis->BufferCritSect);
221 for (;;)
222 {
223 /*
224 * Try satisfy the read from the buffers.
225 */
226 uint64_t offCur = pThis->offConsumer;
227 if (off != -1)
228 {
229 offCur = (uint64_t)off;
230 if (pThis->offConsumer != offCur)
231 fPokeReader = true; /* If the current position changed, poke it in case it stopped at EOF. */
232 pThis->offConsumer = offCur;
233 }
234
235 PRTVFSREADAHEADBUFDESC pBufDesc, pNextBufDesc;
236 RTListForEachSafe(&pThis->ConsumerList, pBufDesc, pNextBufDesc, RTVFSREADAHEADBUFDESC, ListEntry)
237 {
238 /* The buffers are sorted and reads must start in a buffer if
239 anything should be taken from the buffer (at least for now). */
240 if (offCur < pBufDesc->off)
241 break;
242
243 /* Anything we can read from this buffer? */
244 uint64_t offCurBuf = offCur - pBufDesc->off;
245 if (offCurBuf < pBufDesc->cbFilled)
246 {
247 size_t const cbFromCurBuf = RT_MIN(pBufDesc->cbFilled - offCurBuf, cbDst);
248 memcpy(pbDst, pBufDesc->pbBuffer + offCurBuf, cbFromCurBuf);
249 pbDst += cbFromCurBuf;
250 cbDst -= cbFromCurBuf;
251 cbTotalRead += cbFromCurBuf;
252 offCur += cbFromCurBuf;
253 }
254
255 /* Discard buffers we've read past. */
256 if (pBufDesc->off + pBufDesc->cbFilled <= offCur)
257 {
258 RTListNodeRemove(&pBufDesc->ListEntry);
259 RTListAppend(&pThis->FreeList, &pBufDesc->ListEntry);
260 fPokeReader = true; /* Poke it as there are now more buffers available. */
261 }
262
263 /* Stop if we're done. */
264 if (!cbDst)
265 break;
266 }
267
268 pThis->offConsumer = offCur;
269 if (off != -1)
270 off = offCur;
271
272 if (!cbDst)
273 break;
274
275 /*
276 * Check if we've reached the end of the file/stream.
277 */
278 if (offCur >= pThis->offEof)
279 {
280 rc = pcbRead ? VINF_EOF : VERR_EOF;
281 Log(("rtVfsReadAhead_Read: ret %Rrc; offCur=%#llx offEof=%#llx\n", rc, offCur, pThis->offEof));
282 break;
283 }
284
285
286 /*
287 * First time around we don't own the I/O critsect and need to take it
288 * and repeat the above buffer reading code.
289 */
290 if (!fOwnsIoCritSect)
291 {
292 RTCritSectLeave(&pThis->BufferCritSect);
293 RTCritSectEnter(&pThis->IoCritSect);
294 RTCritSectEnter(&pThis->BufferCritSect);
295 fOwnsIoCritSect = true;
296 continue;
297 }
298
299
300 /*
301 * Do a direct read of the remaining data.
302 */
303 if (off == -1)
304 {
305 RTFOFF offActual = RTVfsIoStrmTell(pThis->hIos);
306 if (offActual >= 0 && (uint64_t)offActual != offCur)
307 off = offCur;
308 }
309 RTSGSEG TmpSeg = { pbDst, cbDst };
310 RTSGBUF TmpSgBuf;
311 RTSgBufInit(&TmpSgBuf, &TmpSeg, 1);
312 size_t cbThisRead = cbDst;
313 rc = RTVfsIoStrmSgRead(pThis->hIos, off, &TmpSgBuf, fBlocking, pcbRead ? &cbThisRead : NULL);
314 if (RT_SUCCESS(rc))
315 {
316 cbTotalRead += cbThisRead;
317 offCur += cbThisRead;
318 pThis->offConsumer = offCur;
319 if (rc != VINF_EOF)
320 fPokeReader = true;
321 else
322 {
323 pThis->offEof = offCur;
324 Log(("rtVfsReadAhead_Read: EOF %llu (%#llx)\n", pThis->offEof, pThis->offEof));
325 }
326 }
327 /* else if (rc == VERR_EOF): hard to say where exactly the current position
328 is here as cannot have had a non-NULL pcbRead. Set offEof later. */
329 break;
330 }
331 RTCritSectLeave(&pThis->BufferCritSect);
332 if (fOwnsIoCritSect)
333 RTCritSectLeave(&pThis->IoCritSect);
334 if (fPokeReader && rc != VINF_EOF && rc != VERR_EOF)
335 RTThreadUserSignal(pThis->hThread);
336
337 if (pcbRead)
338 *pcbRead = cbTotalRead;
339 Assert(cbTotalRead <= pSgBuf->paSegs[0].cbSeg);
340
341 return rc;
342}
343
344
345/**
346 * @interface_method_impl{RTVFSIOSTREAMOPS,pfnWrite}
347 */
348static DECLCALLBACK(int) rtVfsReadAhead_Write(void *pvThis, RTFOFF off, PCRTSGBUF pSgBuf, bool fBlocking, size_t *pcbWritten)
349{
350 RT_NOREF_PV(pvThis); RT_NOREF_PV(off); RT_NOREF_PV(pSgBuf); RT_NOREF_PV(fBlocking); RT_NOREF_PV(pcbWritten);
351 AssertFailed();
352 return VERR_ACCESS_DENIED;
353}
354
355
356/**
357 * @interface_method_impl{RTVFSIOSTREAMOPS,pfnFlush}
358 */
359static DECLCALLBACK(int) rtVfsReadAhead_Flush(void *pvThis)
360{
361 PRTVFSREADAHEAD pThis = (PRTVFSREADAHEAD)pvThis;
362 return RTVfsIoStrmFlush(pThis->hIos);
363}
364
365
366/**
367 * @interface_method_impl{RTVFSIOSTREAMOPS,pfnPollOne}
368 */
369static DECLCALLBACK(int) rtVfsReadAhead_PollOne(void *pvThis, uint32_t fEvents, RTMSINTERVAL cMillies, bool fIntr,
370 uint32_t *pfRetEvents)
371{
372 PRTVFSREADAHEAD pThis = (PRTVFSREADAHEAD)pvThis;
373 if (pThis->hThread != NIL_RTTHREAD)
374 {
375 /** @todo poll one with read-ahead thread. */
376 return VERR_NOT_IMPLEMENTED;
377 }
378 return RTVfsIoStrmPoll(pThis->hIos, fEvents, cMillies, fIntr, pfRetEvents);
379}
380
381
382/**
383 * @interface_method_impl{RTVFSIOSTREAMOPS,pfnTell}
384 */
385static DECLCALLBACK(int) rtVfsReadAhead_Tell(void *pvThis, PRTFOFF poffActual)
386{
387 PRTVFSREADAHEAD pThis = (PRTVFSREADAHEAD)pvThis;
388
389 RTCritSectEnter(&pThis->BufferCritSect);
390 *poffActual = pThis->offConsumer;
391 RTCritSectLeave(&pThis->BufferCritSect);
392
393 return VINF_SUCCESS;
394}
395
396
397/**
398 * @interface_method_impl{RTVFSOBJSETOPS,pfnMode}
399 */
400static DECLCALLBACK(int) rtVfsReadAhead_SetMode(void *pvThis, RTFMODE fMode, RTFMODE fMask)
401{
402 PRTVFSREADAHEAD pThis = (PRTVFSREADAHEAD)pvThis;
403 AssertReturn(pThis->hFile != NIL_RTVFSFILE, VERR_NOT_SUPPORTED);
404
405 RTCritSectEnter(&pThis->IoCritSect);
406 RT_NOREF_PV(fMode); RT_NOREF_PV(fMask); /// @todo int rc = RTVfsFileSetMode(pThis->hFile, fMode, fMask);
407 int rc = VERR_NOT_SUPPORTED;
408 RTCritSectLeave(&pThis->IoCritSect);
409
410 return rc;
411}
412
413
414/**
415 * @interface_method_impl{RTVFSOBJSETOPS,pfnSetTimes}
416 */
417static DECLCALLBACK(int) rtVfsReadAhead_SetTimes(void *pvThis, PCRTTIMESPEC pAccessTime, PCRTTIMESPEC pModificationTime,
418 PCRTTIMESPEC pChangeTime, PCRTTIMESPEC pBirthTime)
419{
420 PRTVFSREADAHEAD pThis = (PRTVFSREADAHEAD)pvThis;
421 AssertReturn(pThis->hFile != NIL_RTVFSFILE, VERR_NOT_SUPPORTED);
422
423 RTCritSectEnter(&pThis->IoCritSect);
424 RT_NOREF_PV(pAccessTime); RT_NOREF_PV(pModificationTime); RT_NOREF_PV(pChangeTime); RT_NOREF_PV(pBirthTime);
425 /// @todo int rc = RTVfsFileSetTimes(pThis->hFile, pAccessTime, pModificationTime, pChangeTime, pBirthTime);
426 int rc = VERR_NOT_SUPPORTED;
427 RTCritSectLeave(&pThis->IoCritSect);
428
429 return rc;
430}
431
432
433/**
434 * @interface_method_impl{RTVFSOBJSETOPS,pfnSetOwner}
435 */
436static DECLCALLBACK(int) rtVfsReadAhead_SetOwner(void *pvThis, RTUID uid, RTGID gid)
437{
438 PRTVFSREADAHEAD pThis = (PRTVFSREADAHEAD)pvThis;
439 AssertReturn(pThis->hFile != NIL_RTVFSFILE, VERR_NOT_SUPPORTED);
440
441 RTCritSectEnter(&pThis->IoCritSect);
442 RT_NOREF_PV(uid); RT_NOREF_PV(gid);
443 /// @todo int rc = RTVfsFileSetOwner(pThis->hFile, uid, gid);
444 int rc = VERR_NOT_SUPPORTED;
445 RTCritSectLeave(&pThis->IoCritSect);
446
447 return rc;
448}
449
450
451/**
452 * @interface_method_impl{RTVFSFILEOPS,pfnSeek}
453 */
454static DECLCALLBACK(int) rtVfsReadAhead_Seek(void *pvThis, RTFOFF offSeek, unsigned uMethod, PRTFOFF poffActual)
455{
456 PRTVFSREADAHEAD pThis = (PRTVFSREADAHEAD)pvThis;
457 AssertReturn(pThis->hFile != NIL_RTVFSFILE, VERR_NOT_SUPPORTED);
458
459 RTCritSectEnter(&pThis->IoCritSect); /* protects against concurrent I/O using the offset. */
460 RTCritSectEnter(&pThis->BufferCritSect); /* protects offConsumer */
461
462 uint64_t offActual = UINT64_MAX;
463 int rc = RTVfsFileSeek(pThis->hFile, offSeek, uMethod, &offActual);
464 if (RT_SUCCESS(rc))
465 {
466 pThis->offConsumer = offActual;
467 if (poffActual)
468 *poffActual = offActual;
469 }
470
471 RTCritSectLeave(&pThis->BufferCritSect);
472 RTCritSectLeave(&pThis->IoCritSect);
473
474 return rc;
475}
476
477
478/**
479 * @interface_method_impl{RTVFSFILEOPS,pfnQuerySize}
480 */
481static DECLCALLBACK(int) rtVfsReadAhead_QuerySize(void *pvThis, uint64_t *pcbFile)
482{
483 PRTVFSREADAHEAD pThis = (PRTVFSREADAHEAD)pvThis;
484 AssertReturn(pThis->hFile != NIL_RTVFSFILE, VERR_NOT_SUPPORTED);
485
486 RTCritSectEnter(&pThis->IoCritSect); /* paranoia */
487 int rc = RTVfsFileGetSize(pThis->hFile, pcbFile);
488 RTCritSectLeave(&pThis->IoCritSect);
489
490 return rc;
491}
492
493
494/**
495 * @interface_method_impl{RTVFSFILEOPS,pfnSetSize}
496 */
497static DECLCALLBACK(int) rtVfsReadAhead_SetSize(void *pvThis, uint64_t cbFile, uint32_t fFlags)
498{
499 PRTVFSREADAHEAD pThis = (PRTVFSREADAHEAD)pvThis;
500 AssertReturn(pThis->hFile != NIL_RTVFSFILE, VERR_NOT_SUPPORTED);
501
502 RTCritSectEnter(&pThis->IoCritSect); /* paranoia */
503 int rc = RTVfsFileSetSize(pThis->hFile, cbFile, fFlags);
504 RTCritSectLeave(&pThis->IoCritSect);
505
506 return rc;
507}
508
509
510/**
511 * @interface_method_impl{RTVFSFILEOPS,pfnQueryMaxSize}
512 */
513static DECLCALLBACK(int) rtVfsReadAhead_QueryMaxSize(void *pvThis, uint64_t *pcbMax)
514{
515 PRTVFSREADAHEAD pThis = (PRTVFSREADAHEAD)pvThis;
516 AssertReturn(pThis->hFile != NIL_RTVFSFILE, VERR_NOT_SUPPORTED);
517
518 RTCritSectEnter(&pThis->IoCritSect); /* paranoia */
519 int rc = RTVfsFileQueryMaxSize(pThis->hFile, pcbMax);
520 RTCritSectLeave(&pThis->IoCritSect);
521
522 return rc;
523}
524
525
526/**
527 * Read ahead I/O stream operations.
528 */
529DECL_HIDDEN_CONST(const RTVFSIOSTREAMOPS) g_VfsReadAheadIosOps =
530{ /* Stream */
531 { /* Obj */
532 RTVFSOBJOPS_VERSION,
533 RTVFSOBJTYPE_IO_STREAM,
534 "Read ahead I/O stream",
535 rtVfsReadAhead_Close,
536 rtVfsReadAhead_QueryInfo,
537 RTVFSOBJOPS_VERSION
538 },
539 RTVFSIOSTREAMOPS_VERSION,
540 RTVFSIOSTREAMOPS_FEAT_NO_SG,
541 rtVfsReadAhead_Read,
542 rtVfsReadAhead_Write,
543 rtVfsReadAhead_Flush,
544 rtVfsReadAhead_PollOne,
545 rtVfsReadAhead_Tell,
546 NULL /*Skip*/,
547 NULL /*ZeroFill*/,
548 RTVFSIOSTREAMOPS_VERSION,
549};
550
551
552/**
553 * Read ahead file operations.
554 */
555DECL_HIDDEN_CONST(const RTVFSFILEOPS) g_VfsReadAheadFileOps =
556{
557 { /* Stream */
558 { /* Obj */
559 RTVFSOBJOPS_VERSION,
560 RTVFSOBJTYPE_FILE,
561 "Read ahead file",
562 rtVfsReadAhead_Close,
563 rtVfsReadAhead_QueryInfo,
564 RTVFSOBJOPS_VERSION
565 },
566 RTVFSIOSTREAMOPS_VERSION,
567 RTVFSIOSTREAMOPS_FEAT_NO_SG,
568 rtVfsReadAhead_Read,
569 rtVfsReadAhead_Write,
570 rtVfsReadAhead_Flush,
571 rtVfsReadAhead_PollOne,
572 rtVfsReadAhead_Tell,
573 NULL /*Skip*/,
574 NULL /*ZeroFill*/,
575 RTVFSIOSTREAMOPS_VERSION,
576 },
577 RTVFSFILEOPS_VERSION,
578 /*RTVFSIOFILEOPS_FEAT_NO_AT_OFFSET*/ 0,
579 { /* ObjSet */
580 RTVFSOBJSETOPS_VERSION,
581 RT_UOFFSETOF(RTVFSFILEOPS, ObjSet) - RT_UOFFSETOF(RTVFSFILEOPS, Stream.Obj),
582 rtVfsReadAhead_SetMode,
583 rtVfsReadAhead_SetTimes,
584 rtVfsReadAhead_SetOwner,
585 RTVFSOBJSETOPS_VERSION
586 },
587 rtVfsReadAhead_Seek,
588 rtVfsReadAhead_QuerySize,
589 rtVfsReadAhead_SetSize,
590 rtVfsReadAhead_QueryMaxSize,
591 RTVFSFILEOPS_VERSION
592};
593
594
595/**
596 * @callback_method_impl{PFNRTTHREAD, Read ahead thread procedure}
597 */
598static DECLCALLBACK(int) rtVfsReadAheadThreadProc(RTTHREAD hThreadSelf, void *pvUser)
599{
600 PRTVFSREADAHEAD pThis = (PRTVFSREADAHEAD)pvUser;
601 Assert(pThis);
602
603 while (!pThis->fTerminateThread)
604 {
605 int rc;
606
607 /*
608 * Is there a buffer handy for reading ahead.
609 */
610 PRTVFSREADAHEADBUFDESC pBufDesc = NULL;
611 RTCritSectEnter(&pThis->BufferCritSect);
612 if (!pThis->fTerminateThread)
613 pBufDesc = RTListRemoveFirst(&pThis->FreeList, RTVFSREADAHEADBUFDESC, ListEntry);
614 RTCritSectLeave(&pThis->BufferCritSect);
615
616 if (pBufDesc)
617 {
618 /*
619 * Got a buffer, take the I/O lock and read into it.
620 */
621 rc = VERR_CALLBACK_RETURN;
622 RTCritSectEnter(&pThis->IoCritSect);
623 if (!pThis->fTerminateThread)
624 {
625
626 pBufDesc->off = RTVfsIoStrmTell(pThis->hIos);
627 size_t cbRead = 0;
628 rc = RTVfsIoStrmRead(pThis->hIos, pBufDesc->pbBuffer, pThis->cbBuffer, true /*fBlocking*/, &cbRead);
629 if (RT_SUCCESS(rc))
630 {
631 if (rc == VINF_EOF)
632 {
633 pThis->offEof = pBufDesc->off + cbRead;
634 Log(("rtVfsReadAheadThreadProc: EOF %llu (%#llx)\n", pThis->offEof, pThis->offEof));
635 }
636 pBufDesc->cbFilled = (uint32_t)cbRead;
637
638 /*
639 * Put back the buffer. The consumer list is sorted by offset, but
640 * we should usually end up appending the buffer.
641 */
642 RTCritSectEnter(&pThis->BufferCritSect);
643 PRTVFSREADAHEADBUFDESC pAfter = RTListGetLast(&pThis->ConsumerList, RTVFSREADAHEADBUFDESC, ListEntry);
644 if (!pAfter || pAfter->off <= pBufDesc->off)
645 RTListAppend(&pThis->ConsumerList, &pBufDesc->ListEntry);
646 else
647 {
648 do
649 pAfter = RTListGetPrev(&pThis->ConsumerList, pAfter, RTVFSREADAHEADBUFDESC, ListEntry);
650 while (pAfter && pAfter->off > pBufDesc->off);
651 if (!pAfter)
652 RTListPrepend(&pThis->ConsumerList, &pBufDesc->ListEntry);
653 else
654 {
655 Assert(pAfter->off <= pBufDesc->off);
656 RTListNodeInsertAfter(&pAfter->ListEntry, &pBufDesc->ListEntry);
657 }
658 }
659 RTCritSectLeave(&pThis->BufferCritSect);
660 pBufDesc = NULL;
661
662#ifdef RT_STRICT
663 /* Verify the list ordering. */
664 unsigned cAsserted = 0;
665 uint64_t offAssert = 0;
666 PRTVFSREADAHEADBUFDESC pAssertCur;
667 RTListForEach(&pThis->ConsumerList, pAssertCur, RTVFSREADAHEADBUFDESC, ListEntry)
668 {
669 Assert(offAssert <= pAssertCur->off);
670 offAssert = pAssertCur->off;
671 Assert(cAsserted < pThis->cBuffers);
672 cAsserted++;
673 }
674#endif
675 }
676 else
677 Assert(rc != VERR_EOF);
678 }
679 RTCritSectLeave(&pThis->IoCritSect);
680
681 /*
682 * If we succeeded and we didn't yet reach the end of the stream,
683 * loop without delay to start processing the next buffer.
684 */
685 if (RT_LIKELY(!pBufDesc && rc != VINF_EOF))
686 continue;
687
688 /* Put any unused buffer back in the free list (termination/failure, not EOF). */
689 if (pBufDesc)
690 {
691 RTCritSectEnter(&pThis->BufferCritSect);
692 RTListPrepend(&pThis->FreeList, &pBufDesc->ListEntry);
693 RTCritSectLeave(&pThis->BufferCritSect);
694 }
695 if (pThis->fTerminateThread)
696 break;
697 }
698
699 /*
700 * Wait for more to do.
701 */
702 rc = RTThreadUserWait(hThreadSelf, RT_MS_1MIN);
703 if (RT_SUCCESS(rc))
704 rc = RTThreadUserReset(hThreadSelf);
705 }
706
707 return VINF_SUCCESS;
708}
709
710
711static int rtVfsCreateReadAheadInstance(RTVFSIOSTREAM hVfsIosSrc, RTVFSFILE hVfsFileSrc, uint32_t fFlags,
712 uint32_t cBuffers, uint32_t cbBuffer, PRTVFSIOSTREAM phVfsIos, PRTVFSFILE phVfsFile)
713{
714 /*
715 * Validate input a little.
716 */
717 int rc = VINF_SUCCESS;
718 AssertStmt(cBuffers < _4K, rc = VERR_OUT_OF_RANGE);
719 if (cBuffers == 0)
720 cBuffers = 4;
721 AssertStmt(cbBuffer <= _4M, rc = VERR_OUT_OF_RANGE);
722 if (cbBuffer == 0)
723 cbBuffer = _256K / cBuffers;
724 AssertStmt(cbBuffer * cBuffers < (ARCH_BITS < 64 ? _64M : _256M), rc = VERR_OUT_OF_RANGE);
725 AssertStmt(!fFlags, rc = VERR_INVALID_FLAGS);
726
727 if (RT_SUCCESS(rc))
728 {
729 /*
730 * Create a file or I/O stream instance.
731 */
732 RTVFSFILE hVfsFileReadAhead = NIL_RTVFSFILE;
733 RTVFSIOSTREAM hVfsIosReadAhead = NIL_RTVFSIOSTREAM;
734 PRTVFSREADAHEAD pThis;
735 size_t cbThis = RT_UOFFSETOF_DYN(RTVFSREADAHEAD, aBufDescs[cBuffers]);
736 if (hVfsFileSrc != NIL_RTVFSFILE)
737 rc = RTVfsNewFile(&g_VfsReadAheadFileOps, cbThis, RTFILE_O_READ, NIL_RTVFS, NIL_RTVFSLOCK,
738 &hVfsFileReadAhead, (void **)&pThis);
739 else
740 rc = RTVfsNewIoStream(&g_VfsReadAheadIosOps, cbThis, RTFILE_O_READ, NIL_RTVFS, NIL_RTVFSLOCK,
741 &hVfsIosReadAhead, (void **)&pThis);
742 if (RT_SUCCESS(rc))
743 {
744 RTListInit(&pThis->ConsumerList);
745 RTListInit(&pThis->FreeList);
746 pThis->hThread = NIL_RTTHREAD;
747 pThis->fTerminateThread = false;
748 pThis->fFlags = fFlags;
749 pThis->hFile = hVfsFileSrc;
750 pThis->hIos = hVfsIosSrc;
751 pThis->cBuffers = cBuffers;
752 pThis->cbBuffer = cbBuffer;
753 pThis->offEof = UINT64_MAX;
754 pThis->offConsumer = RTVfsIoStrmTell(hVfsIosSrc);
755 if ((RTFOFF)pThis->offConsumer >= 0)
756 {
757 rc = RTCritSectInit(&pThis->IoCritSect);
758 if (RT_SUCCESS(rc))
759 rc = RTCritSectInit(&pThis->BufferCritSect);
760 if (RT_SUCCESS(rc))
761 {
762 pThis->pbAllBuffers = (uint8_t *)RTMemPageAlloc(pThis->cbBuffer * pThis->cBuffers);
763 if (pThis->pbAllBuffers)
764 {
765 for (uint32_t i = 0; i < cBuffers; i++)
766 {
767 pThis->aBufDescs[i].cbFilled = 0;
768 pThis->aBufDescs[i].off = UINT64_MAX / 2;
769 pThis->aBufDescs[i].pbBuffer = &pThis->pbAllBuffers[cbBuffer * i];
770 RTListAppend(&pThis->FreeList, &pThis->aBufDescs[i].ListEntry);
771 }
772
773 /*
774 * Create thread.
775 */
776 rc = RTThreadCreate(&pThis->hThread, rtVfsReadAheadThreadProc, pThis, 0, RTTHREADTYPE_DEFAULT,
777 RTTHREADFLAGS_WAITABLE, "vfsreadahead");
778 if (RT_SUCCESS(rc))
779 {
780 /*
781 * We're good.
782 */
783 if (phVfsFile)
784 *phVfsFile = hVfsFileReadAhead;
785 else if (hVfsFileReadAhead == NIL_RTVFSFILE)
786 *phVfsIos = hVfsIosReadAhead;
787 else
788 {
789 *phVfsIos = RTVfsFileToIoStream(hVfsFileReadAhead);
790 RTVfsFileRelease(hVfsFileReadAhead);
791 AssertReturn(*phVfsIos != NIL_RTVFSIOSTREAM, VERR_INTERNAL_ERROR_5);
792 }
793 return VINF_SUCCESS;
794 }
795 }
796 }
797 }
798 else
799 rc = (int)pThis->offConsumer;
800 }
801 }
802
803 RTVfsFileRelease(hVfsFileSrc);
804 RTVfsIoStrmRelease(hVfsIosSrc);
805 return rc;
806}
807
808
809RTDECL(int) RTVfsCreateReadAheadForIoStream(RTVFSIOSTREAM hVfsIos, uint32_t fFlags, uint32_t cBuffers, uint32_t cbBuffer,
810 PRTVFSIOSTREAM phVfsIos)
811{
812 AssertPtrReturn(phVfsIos, VERR_INVALID_POINTER);
813 *phVfsIos = NIL_RTVFSIOSTREAM;
814
815 /*
816 * Retain the input stream, trying to obtain a file handle too so we can
817 * fully mirror it.
818 */
819 uint32_t cRefs = RTVfsIoStrmRetain(hVfsIos);
820 AssertReturn(cRefs != UINT32_MAX, VERR_INVALID_HANDLE);
821 RTVFSFILE hVfsFile = RTVfsIoStrmToFile(hVfsIos);
822
823 /*
824 * Do the job. (This always consumes the above retained references.)
825 */
826 return rtVfsCreateReadAheadInstance(hVfsIos, hVfsFile, fFlags, cBuffers, cbBuffer, phVfsIos, NULL);
827}
828
829
830RTDECL(int) RTVfsCreateReadAheadForFile(RTVFSFILE hVfsFile, uint32_t fFlags, uint32_t cBuffers, uint32_t cbBuffer,
831 PRTVFSFILE phVfsFile)
832{
833 AssertPtrReturn(phVfsFile, VERR_INVALID_POINTER);
834 *phVfsFile = NIL_RTVFSFILE;
835
836 /*
837 * Retain the input file and cast it o an I/O stream.
838 */
839 RTVFSIOSTREAM hVfsIos = RTVfsFileToIoStream(hVfsFile);
840 AssertReturn(hVfsIos != NIL_RTVFSIOSTREAM, VERR_INVALID_HANDLE);
841 uint32_t cRefs = RTVfsFileRetain(hVfsFile);
842 AssertReturnStmt(cRefs != UINT32_MAX, RTVfsIoStrmRelease(hVfsIos), VERR_INVALID_HANDLE);
843
844 /*
845 * Do the job. (This always consumes the above retained references.)
846 */
847 return rtVfsCreateReadAheadInstance(hVfsIos, hVfsFile, fFlags, cBuffers, cbBuffer, NULL, phVfsFile);
848}
849
850
851/**
852 * @interface_method_impl{RTVFSCHAINELEMENTREG,pfnValidate}
853 */
854static DECLCALLBACK(int) rtVfsChainReadAhead_Validate(PCRTVFSCHAINELEMENTREG pProviderReg, PRTVFSCHAINSPEC pSpec,
855 PRTVFSCHAINELEMSPEC pElement, uint32_t *poffError, PRTERRINFO pErrInfo)
856{
857 RT_NOREF(pProviderReg, poffError, pErrInfo);
858
859 /*
860 * Basics.
861 */
862 if ( pElement->enmType != RTVFSOBJTYPE_FILE
863 && pElement->enmType != RTVFSOBJTYPE_IO_STREAM)
864 return VERR_VFS_CHAIN_ONLY_FILE_OR_IOS;
865 if (pElement->enmTypeIn == RTVFSOBJTYPE_INVALID)
866 return VERR_VFS_CHAIN_CANNOT_BE_FIRST_ELEMENT;
867 if ( pElement->enmTypeIn != RTVFSOBJTYPE_FILE
868 && pElement->enmTypeIn != RTVFSOBJTYPE_IO_STREAM)
869 return VERR_VFS_CHAIN_TAKES_FILE_OR_IOS;
870 if (pSpec->fOpenFile & RTFILE_O_WRITE)
871 return VERR_VFS_CHAIN_READ_ONLY_IOS;
872 if (pElement->cArgs > 2)
873 return VERR_VFS_CHAIN_AT_MOST_TWO_ARGS;
874
875 /*
876 * Parse the two optional arguments.
877 */
878 uint32_t cBuffers = 0;
879 if (pElement->cArgs > 0)
880 {
881 const char *psz = pElement->paArgs[0].psz;
882 if (*psz)
883 {
884 int rc = RTStrToUInt32Full(psz, 0, &cBuffers);
885 if (RT_FAILURE(rc))
886 {
887 *poffError = pElement->paArgs[0].offSpec;
888 return VERR_VFS_CHAIN_INVALID_ARGUMENT;
889 }
890 }
891 }
892
893 uint32_t cbBuffer = 0;
894 if (pElement->cArgs > 1)
895 {
896 const char *psz = pElement->paArgs[1].psz;
897 if (*psz)
898 {
899 int rc = RTStrToUInt32Full(psz, 0, &cbBuffer);
900 if (RT_FAILURE(rc))
901 {
902 *poffError = pElement->paArgs[1].offSpec;
903 return VERR_VFS_CHAIN_INVALID_ARGUMENT;
904 }
905 }
906 }
907
908 /*
909 * Save the parsed arguments in the spec since their both optional.
910 */
911 pElement->uProvider = RT_MAKE_U64(cBuffers, cbBuffer);
912
913 return VINF_SUCCESS;
914}
915
916
917/**
918 * @interface_method_impl{RTVFSCHAINELEMENTREG,pfnInstantiate}
919 */
920static DECLCALLBACK(int) rtVfsChainReadAhead_Instantiate(PCRTVFSCHAINELEMENTREG pProviderReg, PCRTVFSCHAINSPEC pSpec,
921 PCRTVFSCHAINELEMSPEC pElement, RTVFSOBJ hPrevVfsObj,
922 PRTVFSOBJ phVfsObj, uint32_t *poffError, PRTERRINFO pErrInfo)
923{
924 RT_NOREF(pProviderReg, pSpec, pElement, poffError, pErrInfo);
925 AssertReturn(hPrevVfsObj != NIL_RTVFSOBJ, VERR_VFS_CHAIN_IPE);
926
927 /* Try for a file if we can. */
928 int rc;
929 RTVFSFILE hVfsFileIn = RTVfsObjToFile(hPrevVfsObj);
930 if (hVfsFileIn != NIL_RTVFSFILE)
931 {
932 RTVFSFILE hVfsFile = NIL_RTVFSFILE;
933 rc = RTVfsCreateReadAheadForFile(hVfsFileIn, 0 /*fFlags*/, RT_LO_U32(pElement->uProvider),
934 RT_HI_U32(pElement->uProvider), &hVfsFile);
935 RTVfsFileRelease(hVfsFileIn);
936 if (RT_SUCCESS(rc))
937 {
938 *phVfsObj = RTVfsObjFromFile(hVfsFile);
939 RTVfsFileRelease(hVfsFile);
940 if (*phVfsObj != NIL_RTVFSOBJ)
941 return VINF_SUCCESS;
942 rc = VERR_VFS_CHAIN_CAST_FAILED;
943 }
944 }
945 else if (pElement->enmType == RTVFSOBJTYPE_IO_STREAM)
946 {
947 RTVFSIOSTREAM hVfsIosIn = RTVfsObjToIoStream(hPrevVfsObj);
948 if (hVfsIosIn != NIL_RTVFSIOSTREAM)
949 {
950 RTVFSIOSTREAM hVfsIos = NIL_RTVFSIOSTREAM;
951 rc = RTVfsCreateReadAheadForIoStream(hVfsIosIn, 0 /*fFlags*/, RT_LO_U32(pElement->uProvider),
952 RT_HI_U32(pElement->uProvider), &hVfsIos);
953 RTVfsIoStrmRelease(hVfsIosIn);
954 if (RT_SUCCESS(rc))
955 {
956 *phVfsObj = RTVfsObjFromIoStream(hVfsIos);
957 RTVfsIoStrmRelease(hVfsIos);
958 if (*phVfsObj != NIL_RTVFSOBJ)
959 return VINF_SUCCESS;
960 rc = VERR_VFS_CHAIN_CAST_FAILED;
961 }
962 }
963 else
964 rc = VERR_VFS_CHAIN_CAST_FAILED;
965 }
966 else
967 rc = VERR_VFS_CHAIN_CAST_FAILED;
968 return rc;
969}
970
971
972/**
973 * @interface_method_impl{RTVFSCHAINELEMENTREG,pfnCanReuseElement}
974 */
975static DECLCALLBACK(bool) rtVfsChainReadAhead_CanReuseElement(PCRTVFSCHAINELEMENTREG pProviderReg,
976 PCRTVFSCHAINSPEC pSpec, PCRTVFSCHAINELEMSPEC pElement,
977 PCRTVFSCHAINSPEC pReuseSpec, PCRTVFSCHAINELEMSPEC pReuseElement)
978{
979 RT_NOREF(pProviderReg, pSpec, pElement, pReuseSpec, pReuseElement);
980 return false;
981}
982
983
984/** VFS chain element 'pull'. */
985static RTVFSCHAINELEMENTREG g_rtVfsChainReadAheadReg =
986{
987 /* uVersion = */ RTVFSCHAINELEMENTREG_VERSION,
988 /* fReserved = */ 0,
989 /* pszName = */ "pull",
990 /* ListEntry = */ { NULL, NULL },
991 /* pszHelp = */ "Takes an I/O stream or file and provides read-ahead caching.\n"
992 "Optional first argument specifies how many buffers to use, 0 indicating the default.\n"
993 "Optional second argument specifies the buffer size, 0 indicating the default.",
994 /* pfnValidate = */ rtVfsChainReadAhead_Validate,
995 /* pfnInstantiate = */ rtVfsChainReadAhead_Instantiate,
996 /* pfnCanReuseElement = */ rtVfsChainReadAhead_CanReuseElement,
997 /* uEndMarker = */ RTVFSCHAINELEMENTREG_VERSION
998};
999
1000RTVFSCHAIN_AUTO_REGISTER_ELEMENT_PROVIDER(&g_rtVfsChainReadAheadReg, rtVfsChainReadAheadReg);
1001
Note: See TracBrowser for help on using the repository browser.

© 2024 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette