VirtualBox

source: vbox/trunk/src/libs/liblzma-5.8.1/common/stream_encoder_mt.c@ 108911

Last change on this file since 108911 was 108911, checked in by vboxsync, 4 weeks ago

libs/liblzma: Applied and adjusted our liblzma changes to 5.8.1 and export to OSE. jiraref:VBP-1635

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
  • Property sync-process set to export
File size: 35.3 KB
Line 
1// SPDX-License-Identifier: 0BSD
2
3///////////////////////////////////////////////////////////////////////////////
4//
5/// \file stream_encoder_mt.c
6/// \brief Multithreaded .xz Stream encoder
7//
8// Author: Lasse Collin
9//
10///////////////////////////////////////////////////////////////////////////////
11
12#include "filter_encoder.h"
13#include "easy_preset.h"
14#include "block_encoder.h"
15#include "block_buffer_encoder.h"
16#include "index_encoder.h"
17#include "outqueue.h"
18
19
20/// Maximum supported block size. This makes it simpler to prevent integer
21/// overflows if we are given unusually large block size.
22#define BLOCK_SIZE_MAX (UINT64_MAX / LZMA_THREADS_MAX)
23
24
25typedef enum {
26 /// Waiting for work.
27 THR_IDLE,
28
29 /// Encoding is in progress.
30 THR_RUN,
31
32 /// Encoding is in progress but no more input data will
33 /// be read.
34 THR_FINISH,
35
36 /// The main thread wants the thread to stop whatever it was doing
37 /// but not exit.
38 THR_STOP,
39
40 /// The main thread wants the thread to exit. We could use
41 /// cancellation but since there's stopped anyway, this is lazier.
42 THR_EXIT,
43
44} worker_state;
45
46typedef struct lzma_stream_coder_s lzma_stream_coder;
47
48typedef struct worker_thread_s worker_thread;
49struct worker_thread_s {
50 worker_state state;
51
52 /// Input buffer of coder->block_size bytes. The main thread will
53 /// put new input into this and update in_size accordingly. Once
54 /// no more input is coming, state will be set to THR_FINISH.
55 uint8_t *in;
56
57 /// Amount of data available in the input buffer. This is modified
58 /// only by the main thread.
59 size_t in_size;
60
61 /// Output buffer for this thread. This is set by the main
62 /// thread every time a new Block is started with this thread
63 /// structure.
64 lzma_outbuf *outbuf;
65
66 /// Pointer to the main structure is needed when putting this
67 /// thread back to the stack of free threads.
68 lzma_stream_coder *coder;
69
70 /// The allocator is set by the main thread. Since a copy of the
71 /// pointer is kept here, the application must not change the
72 /// allocator before calling lzma_end().
73 const lzma_allocator *allocator;
74
75 /// Amount of uncompressed data that has already been compressed.
76 uint64_t progress_in;
77
78 /// Amount of compressed data that is ready.
79 uint64_t progress_out;
80
81 /// Block encoder
82 lzma_next_coder block_encoder;
83
84 /// Compression options for this Block
85 lzma_block block_options;
86
87 /// Filter chain for this thread. By copying the filters array
88 /// to each thread it is possible to change the filter chain
89 /// between Blocks using lzma_filters_update().
90 lzma_filter filters[LZMA_FILTERS_MAX + 1];
91
92 /// Next structure in the stack of free worker threads.
93 worker_thread *next;
94
95 mythread_mutex mutex;
96 mythread_cond cond;
97
98 /// The ID of this thread is used to join the thread
99 /// when it's not needed anymore.
100 mythread thread_id;
101};
102
103
104struct lzma_stream_coder_s {
105 enum {
106 SEQ_STREAM_HEADER,
107 SEQ_BLOCK,
108 SEQ_INDEX,
109 SEQ_STREAM_FOOTER,
110 } sequence;
111
112 /// Start a new Block every block_size bytes of input unless
113 /// LZMA_FULL_FLUSH or LZMA_FULL_BARRIER is used earlier.
114 size_t block_size;
115
116 /// The filter chain to use for the next Block.
117 /// This can be updated using lzma_filters_update()
118 /// after LZMA_FULL_BARRIER or LZMA_FULL_FLUSH.
119 lzma_filter filters[LZMA_FILTERS_MAX + 1];
120
121 /// A copy of filters[] will be put here when attempting to get
122 /// a new worker thread. This will be copied to a worker thread
123 /// when a thread becomes free and then this cache is marked as
124 /// empty by setting [0].id = LZMA_VLI_UNKNOWN. Without this cache
125 /// the filter options from filters[] would get uselessly copied
126 /// multiple times (allocated and freed) when waiting for a new free
127 /// worker thread.
128 ///
129 /// This is freed if filters[] is updated via lzma_filters_update().
130 lzma_filter filters_cache[LZMA_FILTERS_MAX + 1];
131
132
133 /// Index to hold sizes of the Blocks
134 lzma_index *index;
135
136 /// Index encoder
137 lzma_next_coder index_encoder;
138
139
140 /// Stream Flags for encoding the Stream Header and Stream Footer.
141 lzma_stream_flags stream_flags;
142
143 /// Buffer to hold Stream Header and Stream Footer.
144 uint8_t header[LZMA_STREAM_HEADER_SIZE];
145
146 /// Read position in header[]
147 size_t header_pos;
148
149
150 /// Output buffer queue for compressed data
151 lzma_outq outq;
152
153 /// How much memory to allocate for each lzma_outbuf.buf
154 size_t outbuf_alloc_size;
155
156
157 /// Maximum wait time if cannot use all the input and cannot
158 /// fill the output buffer. This is in milliseconds.
159 uint32_t timeout;
160
161
162 /// Error code from a worker thread
163 lzma_ret thread_error;
164
165 /// Array of allocated thread-specific structures
166 worker_thread *threads;
167
168 /// Number of structures in "threads" above. This is also the
169 /// number of threads that will be created at maximum.
170 uint32_t threads_max;
171
172 /// Number of thread structures that have been initialized, and
173 /// thus the number of worker threads actually created so far.
174 uint32_t threads_initialized;
175
176 /// Stack of free threads. When a thread finishes, it puts itself
177 /// back into this stack. This starts as empty because threads
178 /// are created only when actually needed.
179 worker_thread *threads_free;
180
181 /// The most recent worker thread to which the main thread writes
182 /// the new input from the application.
183 worker_thread *thr;
184
185
186 /// Amount of uncompressed data in Blocks that have already
187 /// been finished.
188 uint64_t progress_in;
189
190 /// Amount of compressed data in Stream Header + Blocks that
191 /// have already been finished.
192 uint64_t progress_out;
193
194
195 mythread_mutex mutex;
196 mythread_cond cond;
197};
198
199
200/// Tell the main thread that something has gone wrong.
201static void
202worker_error(worker_thread *thr, lzma_ret ret)
203{
204 assert(ret != LZMA_OK);
205 assert(ret != LZMA_STREAM_END);
206
207 mythread_sync(thr->coder->mutex) {
208 if (thr->coder->thread_error == LZMA_OK)
209 thr->coder->thread_error = ret;
210
211 mythread_cond_signal(&thr->coder->cond);
212 }
213
214 return;
215}
216
217
218static worker_state
219worker_encode(worker_thread *thr, size_t *out_pos, worker_state state)
220{
221 assert(thr->progress_in == 0);
222 assert(thr->progress_out == 0);
223
224 // Set the Block options.
225 thr->block_options = (lzma_block){
226 .version = 0,
227 .check = thr->coder->stream_flags.check,
228 .compressed_size = thr->outbuf->allocated,
229 .uncompressed_size = thr->coder->block_size,
230 .filters = thr->filters,
231 };
232
233 // Calculate maximum size of the Block Header. This amount is
234 // reserved in the beginning of the buffer so that Block Header
235 // along with Compressed Size and Uncompressed Size can be
236 // written there.
237 lzma_ret ret = lzma_block_header_size(&thr->block_options);
238 if (ret != LZMA_OK) {
239 worker_error(thr, ret);
240 return THR_STOP;
241 }
242
243 // Initialize the Block encoder.
244 ret = lzma_block_encoder_init(&thr->block_encoder,
245 thr->allocator, &thr->block_options);
246 if (ret != LZMA_OK) {
247 worker_error(thr, ret);
248 return THR_STOP;
249 }
250
251 size_t in_pos = 0;
252 size_t in_size = 0;
253
254 *out_pos = thr->block_options.header_size;
255 const size_t out_size = thr->outbuf->allocated;
256
257 do {
258 mythread_sync(thr->mutex) {
259 // Store in_pos and *out_pos into *thr so that
260 // an application may read them via
261 // lzma_get_progress() to get progress information.
262 //
263 // NOTE: These aren't updated when the encoding
264 // finishes. Instead, the final values are taken
265 // later from thr->outbuf.
266 thr->progress_in = in_pos;
267 thr->progress_out = *out_pos;
268
269 while (in_size == thr->in_size
270 && thr->state == THR_RUN)
271 mythread_cond_wait(&thr->cond, &thr->mutex);
272
273 state = thr->state;
274 in_size = thr->in_size;
275 }
276
277 // Return if we were asked to stop or exit.
278 if (state >= THR_STOP)
279 return state;
280
281 lzma_action action = state == THR_FINISH
282 ? LZMA_FINISH : LZMA_RUN;
283
284 // Limit the amount of input given to the Block encoder
285 // at once. This way this thread can react fairly quickly
286 // if the main thread wants us to stop or exit.
287 static const size_t in_chunk_max = 16384;
288 size_t in_limit = in_size;
289 if (in_size - in_pos > in_chunk_max) {
290 in_limit = in_pos + in_chunk_max;
291 action = LZMA_RUN;
292 }
293
294 ret = thr->block_encoder.code(
295 thr->block_encoder.coder, thr->allocator,
296 thr->in, &in_pos, in_limit, thr->outbuf->buf,
297 out_pos, out_size, action);
298 } while (ret == LZMA_OK && *out_pos < out_size);
299
300 switch (ret) {
301 case LZMA_STREAM_END:
302 assert(state == THR_FINISH);
303
304 // Encode the Block Header. By doing it after
305 // the compression, we can store the Compressed Size
306 // and Uncompressed Size fields.
307 ret = lzma_block_header_encode(&thr->block_options,
308 thr->outbuf->buf);
309 if (ret != LZMA_OK) {
310 worker_error(thr, ret);
311 return THR_STOP;
312 }
313
314 break;
315
316 case LZMA_OK:
317 // The data was incompressible. Encode it using uncompressed
318 // LZMA2 chunks.
319 //
320 // First wait that we have gotten all the input.
321 mythread_sync(thr->mutex) {
322 while (thr->state == THR_RUN)
323 mythread_cond_wait(&thr->cond, &thr->mutex);
324
325 state = thr->state;
326 in_size = thr->in_size;
327 }
328
329 if (state >= THR_STOP)
330 return state;
331
332 // Do the encoding. This takes care of the Block Header too.
333 *out_pos = 0;
334 ret = lzma_block_uncomp_encode(&thr->block_options,
335 thr->in, in_size, thr->outbuf->buf,
336 out_pos, out_size);
337
338 // It shouldn't fail.
339 if (ret != LZMA_OK) {
340 worker_error(thr, LZMA_PROG_ERROR);
341 return THR_STOP;
342 }
343
344 break;
345
346 default:
347 worker_error(thr, ret);
348 return THR_STOP;
349 }
350
351 // Set the size information that will be read by the main thread
352 // to write the Index field.
353 thr->outbuf->unpadded_size
354 = lzma_block_unpadded_size(&thr->block_options);
355 assert(thr->outbuf->unpadded_size != 0);
356 thr->outbuf->uncompressed_size = thr->block_options.uncompressed_size;
357
358 return THR_FINISH;
359}
360
361
362static MYTHREAD_RET_TYPE
363#ifndef VBOX
364worker_start(void *thr_ptr)
365#else
366worker_start(RTTHREAD hThread, void *thr_ptr)
367#endif
368{
369 worker_thread *thr = thr_ptr;
370 worker_state state = THR_IDLE; // Init to silence a warning
371
372 while (true) {
373 // Wait for work.
374 mythread_sync(thr->mutex) {
375 while (true) {
376 // The thread is already idle so if we are
377 // requested to stop, just set the state.
378 if (thr->state == THR_STOP) {
379 thr->state = THR_IDLE;
380 mythread_cond_signal(&thr->cond);
381 }
382
383 state = thr->state;
384 if (state != THR_IDLE)
385 break;
386
387 mythread_cond_wait(&thr->cond, &thr->mutex);
388 }
389 }
390
391 size_t out_pos = 0;
392
393 assert(state != THR_IDLE);
394 assert(state != THR_STOP);
395
396 if (state <= THR_FINISH)
397 state = worker_encode(thr, &out_pos, state);
398
399 if (state == THR_EXIT)
400 break;
401
402 // Mark the thread as idle unless the main thread has
403 // told us to exit. Signal is needed for the case
404 // where the main thread is waiting for the threads to stop.
405 mythread_sync(thr->mutex) {
406 if (thr->state != THR_EXIT) {
407 thr->state = THR_IDLE;
408 mythread_cond_signal(&thr->cond);
409 }
410 }
411
412 mythread_sync(thr->coder->mutex) {
413 // If no errors occurred, make the encoded data
414 // available to be copied out.
415 if (state == THR_FINISH) {
416 thr->outbuf->pos = out_pos;
417 thr->outbuf->finished = true;
418 }
419
420 // Update the main progress info.
421 thr->coder->progress_in
422 += thr->outbuf->uncompressed_size;
423 thr->coder->progress_out += out_pos;
424 thr->progress_in = 0;
425 thr->progress_out = 0;
426
427 // Return this thread to the stack of free threads.
428 thr->next = thr->coder->threads_free;
429 thr->coder->threads_free = thr;
430
431 mythread_cond_signal(&thr->coder->cond);
432 }
433 }
434
435 // Exiting, free the resources.
436 lzma_filters_free(thr->filters, thr->allocator);
437
438 mythread_mutex_destroy(&thr->mutex);
439 mythread_cond_destroy(&thr->cond);
440
441 lzma_next_end(&thr->block_encoder, thr->allocator);
442 lzma_free(thr->in, thr->allocator);
443 return MYTHREAD_RET_VALUE;
444}
445
446
447/// Make the threads stop but not exit. Optionally wait for them to stop.
448static void
449threads_stop(lzma_stream_coder *coder, bool wait_for_threads)
450{
451 // Tell the threads to stop.
452 for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
453 mythread_sync(coder->threads[i].mutex) {
454 coder->threads[i].state = THR_STOP;
455 mythread_cond_signal(&coder->threads[i].cond);
456 }
457 }
458
459 if (!wait_for_threads)
460 return;
461
462 // Wait for the threads to settle in the idle state.
463 for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
464 mythread_sync(coder->threads[i].mutex) {
465 while (coder->threads[i].state != THR_IDLE)
466 mythread_cond_wait(&coder->threads[i].cond,
467 &coder->threads[i].mutex);
468 }
469 }
470
471 return;
472}
473
474
475/// Stop the threads and free the resources associated with them.
476/// Wait until the threads have exited.
477static void
478threads_end(lzma_stream_coder *coder, const lzma_allocator *allocator)
479{
480 for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
481 mythread_sync(coder->threads[i].mutex) {
482 coder->threads[i].state = THR_EXIT;
483 mythread_cond_signal(&coder->threads[i].cond);
484 }
485 }
486
487 for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
488 int ret = mythread_join(coder->threads[i].thread_id);
489 assert(ret == 0);
490 (void)ret;
491 }
492
493 lzma_free(coder->threads, allocator);
494 return;
495}
496
497
498/// Initialize a new worker_thread structure and create a new thread.
499static lzma_ret
500initialize_new_thread(lzma_stream_coder *coder,
501 const lzma_allocator *allocator)
502{
503 worker_thread *thr = &coder->threads[coder->threads_initialized];
504
505 thr->in = lzma_alloc(coder->block_size, allocator);
506 if (thr->in == NULL)
507 return LZMA_MEM_ERROR;
508
509 if (mythread_mutex_init(&thr->mutex))
510 goto error_mutex;
511
512 if (mythread_cond_init(&thr->cond))
513 goto error_cond;
514
515 thr->state = THR_IDLE;
516 thr->allocator = allocator;
517 thr->coder = coder;
518 thr->progress_in = 0;
519 thr->progress_out = 0;
520 thr->block_encoder = LZMA_NEXT_CODER_INIT;
521 thr->filters[0].id = LZMA_VLI_UNKNOWN;
522
523 if (mythread_create(&thr->thread_id, &worker_start, thr))
524 goto error_thread;
525
526 ++coder->threads_initialized;
527 coder->thr = thr;
528
529 return LZMA_OK;
530
531error_thread:
532 mythread_cond_destroy(&thr->cond);
533
534error_cond:
535 mythread_mutex_destroy(&thr->mutex);
536
537error_mutex:
538 lzma_free(thr->in, allocator);
539 return LZMA_MEM_ERROR;
540}
541
542
543static lzma_ret
544get_thread(lzma_stream_coder *coder, const lzma_allocator *allocator)
545{
546 // If there are no free output subqueues, there is no
547 // point to try getting a thread.
548 if (!lzma_outq_has_buf(&coder->outq))
549 return LZMA_OK;
550
551 // That's also true if we cannot allocate memory for the output
552 // buffer in the output queue.
553 return_if_error(lzma_outq_prealloc_buf(&coder->outq, allocator,
554 coder->outbuf_alloc_size));
555
556 // Make a thread-specific copy of the filter chain. Put it in
557 // the cache array first so that if we cannot get a new thread yet,
558 // the allocation is ready when we try again.
559 if (coder->filters_cache[0].id == LZMA_VLI_UNKNOWN)
560 return_if_error(lzma_filters_copy(
561 coder->filters, coder->filters_cache, allocator));
562
563 // If there is a free structure on the stack, use it.
564 mythread_sync(coder->mutex) {
565 if (coder->threads_free != NULL) {
566 coder->thr = coder->threads_free;
567 coder->threads_free = coder->threads_free->next;
568 }
569 }
570
571 if (coder->thr == NULL) {
572 // If there are no uninitialized structures left, return.
573 if (coder->threads_initialized == coder->threads_max)
574 return LZMA_OK;
575
576 // Initialize a new thread.
577 return_if_error(initialize_new_thread(coder, allocator));
578 }
579
580 // Reset the parts of the thread state that have to be done
581 // in the main thread.
582 mythread_sync(coder->thr->mutex) {
583 coder->thr->state = THR_RUN;
584 coder->thr->in_size = 0;
585 coder->thr->outbuf = lzma_outq_get_buf(&coder->outq, NULL);
586
587 // Free the old thread-specific filter options and replace
588 // them with the already-allocated new options from
589 // coder->filters_cache[]. Then mark the cache as empty.
590 lzma_filters_free(coder->thr->filters, allocator);
591 memcpy(coder->thr->filters, coder->filters_cache,
592 sizeof(coder->filters_cache));
593 coder->filters_cache[0].id = LZMA_VLI_UNKNOWN;
594
595 mythread_cond_signal(&coder->thr->cond);
596 }
597
598 return LZMA_OK;
599}
600
601
602static lzma_ret
603stream_encode_in(lzma_stream_coder *coder, const lzma_allocator *allocator,
604 const uint8_t *restrict in, size_t *restrict in_pos,
605 size_t in_size, lzma_action action)
606{
607 while (*in_pos < in_size
608 || (coder->thr != NULL && action != LZMA_RUN)) {
609 if (coder->thr == NULL) {
610 // Get a new thread.
611 const lzma_ret ret = get_thread(coder, allocator);
612 if (coder->thr == NULL)
613 return ret;
614 }
615
616 // Copy the input data to thread's buffer.
617 size_t thr_in_size = coder->thr->in_size;
618 lzma_bufcpy(in, in_pos, in_size, coder->thr->in,
619 &thr_in_size, coder->block_size);
620
621 // Tell the Block encoder to finish if
622 // - it has got block_size bytes of input; or
623 // - all input was used and LZMA_FINISH, LZMA_FULL_FLUSH,
624 // or LZMA_FULL_BARRIER was used.
625 //
626 // TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER.
627 const bool finish = thr_in_size == coder->block_size
628 || (*in_pos == in_size && action != LZMA_RUN);
629
630 bool block_error = false;
631
632 mythread_sync(coder->thr->mutex) {
633 if (coder->thr->state == THR_IDLE) {
634 // Something has gone wrong with the Block
635 // encoder. It has set coder->thread_error
636 // which we will read a few lines later.
637 block_error = true;
638 } else {
639 // Tell the Block encoder its new amount
640 // of input and update the state if needed.
641 coder->thr->in_size = thr_in_size;
642
643 if (finish)
644 coder->thr->state = THR_FINISH;
645
646 mythread_cond_signal(&coder->thr->cond);
647 }
648 }
649
650 if (block_error) {
651#ifndef VBOX
652 lzma_ret ret;
653#else
654 lzma_ret ret = LZMA_OK; // Init to silence a warning.
655#endif
656
657 mythread_sync(coder->mutex) {
658 ret = coder->thread_error;
659 }
660
661 return ret;
662 }
663
664 if (finish)
665 coder->thr = NULL;
666 }
667
668 return LZMA_OK;
669}
670
671
672/// Wait until more input can be consumed, more output can be read, or
673/// an optional timeout is reached.
674static bool
675wait_for_work(lzma_stream_coder *coder, mythread_condtime *wait_abs,
676 bool *has_blocked, bool has_input)
677{
678 if (coder->timeout != 0 && !*has_blocked) {
679 // Every time when stream_encode_mt() is called via
680 // lzma_code(), *has_blocked starts as false. We set it
681 // to true here and calculate the absolute time when
682 // we must return if there's nothing to do.
683 //
684 // This way if we block multiple times for short moments
685 // less than "timeout" milliseconds, we will return once
686 // "timeout" amount of time has passed since the *first*
687 // blocking occurred. If the absolute time was calculated
688 // again every time we block, "timeout" would effectively
689 // be meaningless if we never consecutively block longer
690 // than "timeout" ms.
691 *has_blocked = true;
692 mythread_condtime_set(wait_abs, &coder->cond, coder->timeout);
693 }
694
695 bool timed_out = false;
696
697 mythread_sync(coder->mutex) {
698 // There are four things that we wait. If one of them
699 // becomes possible, we return.
700 // - If there is input left, we need to get a free
701 // worker thread and an output buffer for it.
702 // - Data ready to be read from the output queue.
703 // - A worker thread indicates an error.
704 // - Time out occurs.
705 while ((!has_input || coder->threads_free == NULL
706 || !lzma_outq_has_buf(&coder->outq))
707 && !lzma_outq_is_readable(&coder->outq)
708 && coder->thread_error == LZMA_OK
709 && !timed_out) {
710 if (coder->timeout != 0)
711 timed_out = mythread_cond_timedwait(
712 &coder->cond, &coder->mutex,
713 wait_abs) != 0;
714 else
715 mythread_cond_wait(&coder->cond,
716 &coder->mutex);
717 }
718 }
719
720 return timed_out;
721}
722
723
724static lzma_ret
725stream_encode_mt(void *coder_ptr, const lzma_allocator *allocator,
726 const uint8_t *restrict in, size_t *restrict in_pos,
727 size_t in_size, uint8_t *restrict out,
728 size_t *restrict out_pos, size_t out_size, lzma_action action)
729{
730 lzma_stream_coder *coder = coder_ptr;
731
732 switch (coder->sequence) {
733 case SEQ_STREAM_HEADER:
734 lzma_bufcpy(coder->header, &coder->header_pos,
735 sizeof(coder->header),
736 out, out_pos, out_size);
737 if (coder->header_pos < sizeof(coder->header))
738 return LZMA_OK;
739
740 coder->header_pos = 0;
741 coder->sequence = SEQ_BLOCK;
742 FALLTHROUGH;
743
744 case SEQ_BLOCK: {
745 // Initialized to silence warnings.
746 lzma_vli unpadded_size = 0;
747 lzma_vli uncompressed_size = 0;
748 lzma_ret ret = LZMA_OK;
749
750 // These are for wait_for_work().
751 bool has_blocked = false;
752 mythread_condtime wait_abs = { 0 };
753
754 while (true) {
755 mythread_sync(coder->mutex) {
756 // Check for Block encoder errors.
757 ret = coder->thread_error;
758 if (ret != LZMA_OK) {
759 assert(ret != LZMA_STREAM_END);
760 break; // Break out of mythread_sync.
761 }
762
763 // Try to read compressed data to out[].
764 ret = lzma_outq_read(&coder->outq, allocator,
765 out, out_pos, out_size,
766 &unpadded_size,
767 &uncompressed_size);
768 }
769
770 if (ret == LZMA_STREAM_END) {
771 // End of Block. Add it to the Index.
772 ret = lzma_index_append(coder->index,
773 allocator, unpadded_size,
774 uncompressed_size);
775 if (ret != LZMA_OK) {
776 threads_stop(coder, false);
777 return ret;
778 }
779
780 // If we didn't fill the output buffer yet,
781 // try to read more data. Maybe the next
782 // outbuf has been finished already too.
783 if (*out_pos < out_size)
784 continue;
785 }
786
787 if (ret != LZMA_OK) {
788 // coder->thread_error was set.
789 threads_stop(coder, false);
790 return ret;
791 }
792
793 // Try to give uncompressed data to a worker thread.
794 ret = stream_encode_in(coder, allocator,
795 in, in_pos, in_size, action);
796 if (ret != LZMA_OK) {
797 threads_stop(coder, false);
798 return ret;
799 }
800
801 // See if we should wait or return.
802 //
803 // TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER.
804 if (*in_pos == in_size) {
805 // LZMA_RUN: More data is probably coming
806 // so return to let the caller fill the
807 // input buffer.
808 if (action == LZMA_RUN)
809 return LZMA_OK;
810
811 // LZMA_FULL_BARRIER: The same as with
812 // LZMA_RUN but tell the caller that the
813 // barrier was completed.
814 if (action == LZMA_FULL_BARRIER)
815 return LZMA_STREAM_END;
816
817 // Finishing or flushing isn't completed until
818 // all input data has been encoded and copied
819 // to the output buffer.
820 if (lzma_outq_is_empty(&coder->outq)) {
821 // LZMA_FINISH: Continue to encode
822 // the Index field.
823 if (action == LZMA_FINISH)
824 break;
825
826 // LZMA_FULL_FLUSH: Return to tell
827 // the caller that flushing was
828 // completed.
829 if (action == LZMA_FULL_FLUSH)
830 return LZMA_STREAM_END;
831 }
832 }
833
834 // Return if there is no output space left.
835 // This check must be done after testing the input
836 // buffer, because we might want to use a different
837 // return code.
838 if (*out_pos == out_size)
839 return LZMA_OK;
840
841 // Neither in nor out has been used completely.
842 // Wait until there's something we can do.
843 if (wait_for_work(coder, &wait_abs, &has_blocked,
844 *in_pos < in_size))
845 return LZMA_TIMED_OUT;
846 }
847
848 // All Blocks have been encoded and the threads have stopped.
849 // Prepare to encode the Index field.
850 return_if_error(lzma_index_encoder_init(
851 &coder->index_encoder, allocator,
852 coder->index));
853 coder->sequence = SEQ_INDEX;
854
855 // Update the progress info to take the Index and
856 // Stream Footer into account. Those are very fast to encode
857 // so in terms of progress information they can be thought
858 // to be ready to be copied out.
859 coder->progress_out += lzma_index_size(coder->index)
860 + LZMA_STREAM_HEADER_SIZE;
861
862 FALLTHROUGH;
863 }
864
865 case SEQ_INDEX: {
866 // Call the Index encoder. It doesn't take any input, so
867 // those pointers can be NULL.
868 const lzma_ret ret = coder->index_encoder.code(
869 coder->index_encoder.coder, allocator,
870 NULL, NULL, 0,
871 out, out_pos, out_size, LZMA_RUN);
872 if (ret != LZMA_STREAM_END)
873 return ret;
874
875 // Encode the Stream Footer into coder->buffer.
876 coder->stream_flags.backward_size
877 = lzma_index_size(coder->index);
878 if (lzma_stream_footer_encode(&coder->stream_flags,
879 coder->header) != LZMA_OK)
880 return LZMA_PROG_ERROR;
881
882 coder->sequence = SEQ_STREAM_FOOTER;
883 FALLTHROUGH;
884 }
885
886 case SEQ_STREAM_FOOTER:
887 lzma_bufcpy(coder->header, &coder->header_pos,
888 sizeof(coder->header),
889 out, out_pos, out_size);
890 return coder->header_pos < sizeof(coder->header)
891 ? LZMA_OK : LZMA_STREAM_END;
892 }
893
894 assert(0);
895 return LZMA_PROG_ERROR;
896}
897
898
899static void
900stream_encoder_mt_end(void *coder_ptr, const lzma_allocator *allocator)
901{
902 lzma_stream_coder *coder = coder_ptr;
903
904 // Threads must be killed before the output queue can be freed.
905 threads_end(coder, allocator);
906 lzma_outq_end(&coder->outq, allocator);
907
908 lzma_filters_free(coder->filters, allocator);
909 lzma_filters_free(coder->filters_cache, allocator);
910
911 lzma_next_end(&coder->index_encoder, allocator);
912 lzma_index_end(coder->index, allocator);
913
914 mythread_cond_destroy(&coder->cond);
915 mythread_mutex_destroy(&coder->mutex);
916
917 lzma_free(coder, allocator);
918 return;
919}
920
921
922static lzma_ret
923stream_encoder_mt_update(void *coder_ptr, const lzma_allocator *allocator,
924 const lzma_filter *filters,
925 const lzma_filter *reversed_filters
926 lzma_attribute((__unused__)))
927{
928 lzma_stream_coder *coder = coder_ptr;
929
930 // Applications shouldn't attempt to change the options when
931 // we are already encoding the Index or Stream Footer.
932 if (coder->sequence > SEQ_BLOCK)
933 return LZMA_PROG_ERROR;
934
935 // For now the threaded encoder doesn't support changing
936 // the options in the middle of a Block.
937 if (coder->thr != NULL)
938 return LZMA_PROG_ERROR;
939
940 // Check if the filter chain seems mostly valid. See the comment
941 // in stream_encoder_mt_init().
942 if (lzma_raw_encoder_memusage(filters) == UINT64_MAX)
943 return LZMA_OPTIONS_ERROR;
944
945 // Make a copy to a temporary buffer first. This way the encoder
946 // state stays unchanged if an error occurs in lzma_filters_copy().
947 lzma_filter temp[LZMA_FILTERS_MAX + 1];
948 return_if_error(lzma_filters_copy(filters, temp, allocator));
949
950 // Free the options of the old chain as well as the cache.
951 lzma_filters_free(coder->filters, allocator);
952 lzma_filters_free(coder->filters_cache, allocator);
953
954 // Copy the new filter chain in place.
955 memcpy(coder->filters, temp, sizeof(temp));
956
957 return LZMA_OK;
958}
959
960
961/// Options handling for lzma_stream_encoder_mt_init() and
962/// lzma_stream_encoder_mt_memusage()
963static lzma_ret
964get_options(const lzma_mt *options, lzma_options_easy *opt_easy,
965 const lzma_filter **filters, uint64_t *block_size,
966 uint64_t *outbuf_size_max)
967{
968 // Validate some of the options.
969 if (options == NULL)
970 return LZMA_PROG_ERROR;
971
972 if (options->flags != 0 || options->threads == 0
973 || options->threads > LZMA_THREADS_MAX)
974 return LZMA_OPTIONS_ERROR;
975
976 if (options->filters != NULL) {
977 // Filter chain was given, use it as is.
978 *filters = options->filters;
979 } else {
980 // Use a preset.
981 if (lzma_easy_preset(opt_easy, options->preset))
982 return LZMA_OPTIONS_ERROR;
983
984 *filters = opt_easy->filters;
985 }
986
987 // If the Block size is not set, determine it from the filter chain.
988 if (options->block_size > 0)
989 *block_size = options->block_size;
990 else
991 *block_size = lzma_mt_block_size(*filters);
992
993 // UINT64_MAX > BLOCK_SIZE_MAX, so the second condition
994 // should be optimized out by any reasonable compiler.
995 // The second condition should be there in the unlikely event that
996 // the macros change and UINT64_MAX < BLOCK_SIZE_MAX.
997 if (*block_size > BLOCK_SIZE_MAX || *block_size == UINT64_MAX)
998 return LZMA_OPTIONS_ERROR;
999
1000 // Calculate the maximum amount output that a single output buffer
1001 // may need to hold. This is the same as the maximum total size of
1002 // a Block.
1003 *outbuf_size_max = lzma_block_buffer_bound64(*block_size);
1004 if (*outbuf_size_max == 0)
1005 return LZMA_MEM_ERROR;
1006
1007 return LZMA_OK;
1008}
1009
1010
1011static void
1012get_progress(void *coder_ptr, uint64_t *progress_in, uint64_t *progress_out)
1013{
1014 lzma_stream_coder *coder = coder_ptr;
1015
1016 // Lock coder->mutex to prevent finishing threads from moving their
1017 // progress info from the worker_thread structure to lzma_stream_coder.
1018 mythread_sync(coder->mutex) {
1019 *progress_in = coder->progress_in;
1020 *progress_out = coder->progress_out;
1021
1022 for (size_t i = 0; i < coder->threads_initialized; ++i) {
1023 mythread_sync(coder->threads[i].mutex) {
1024 *progress_in += coder->threads[i].progress_in;
1025 *progress_out += coder->threads[i]
1026 .progress_out;
1027 }
1028 }
1029 }
1030
1031 return;
1032}
1033
1034
1035static lzma_ret
1036stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator,
1037 const lzma_mt *options)
1038{
1039 lzma_next_coder_init(&stream_encoder_mt_init, next, allocator);
1040
1041 // Get the filter chain.
1042 lzma_options_easy easy;
1043 const lzma_filter *filters;
1044 uint64_t block_size;
1045 uint64_t outbuf_size_max;
1046 return_if_error(get_options(options, &easy, &filters,
1047 &block_size, &outbuf_size_max));
1048
1049#if SIZE_MAX < UINT64_MAX
1050 if (block_size > SIZE_MAX || outbuf_size_max > SIZE_MAX)
1051 return LZMA_MEM_ERROR;
1052#endif
1053
1054 // Validate the filter chain so that we can give an error in this
1055 // function instead of delaying it to the first call to lzma_code().
1056 // The memory usage calculation verifies the filter chain as
1057 // a side effect so we take advantage of that. It's not a perfect
1058 // check though as raw encoder allows LZMA1 too but such problems
1059 // will be caught eventually with Block Header encoder.
1060 if (lzma_raw_encoder_memusage(filters) == UINT64_MAX)
1061 return LZMA_OPTIONS_ERROR;
1062
1063 // Validate the Check ID.
1064 if ((unsigned int)(options->check) > LZMA_CHECK_ID_MAX)
1065 return LZMA_PROG_ERROR;
1066
1067 if (!lzma_check_is_supported(options->check))
1068 return LZMA_UNSUPPORTED_CHECK;
1069
1070 // Allocate and initialize the base structure if needed.
1071 lzma_stream_coder *coder = next->coder;
1072 if (coder == NULL) {
1073 coder = lzma_alloc(sizeof(lzma_stream_coder), allocator);
1074 if (coder == NULL)
1075 return LZMA_MEM_ERROR;
1076
1077 next->coder = coder;
1078
1079 // For the mutex and condition variable initializations
1080 // the error handling has to be done here because
1081 // stream_encoder_mt_end() doesn't know if they have
1082 // already been initialized or not.
1083 if (mythread_mutex_init(&coder->mutex)) {
1084 lzma_free(coder, allocator);
1085 next->coder = NULL;
1086 return LZMA_MEM_ERROR;
1087 }
1088
1089 if (mythread_cond_init(&coder->cond)) {
1090 mythread_mutex_destroy(&coder->mutex);
1091 lzma_free(coder, allocator);
1092 next->coder = NULL;
1093 return LZMA_MEM_ERROR;
1094 }
1095
1096 next->code = &stream_encode_mt;
1097 next->end = &stream_encoder_mt_end;
1098 next->get_progress = &get_progress;
1099 next->update = &stream_encoder_mt_update;
1100
1101 coder->filters[0].id = LZMA_VLI_UNKNOWN;
1102 coder->filters_cache[0].id = LZMA_VLI_UNKNOWN;
1103 coder->index_encoder = LZMA_NEXT_CODER_INIT;
1104 coder->index = NULL;
1105 memzero(&coder->outq, sizeof(coder->outq));
1106 coder->threads = NULL;
1107 coder->threads_max = 0;
1108 coder->threads_initialized = 0;
1109 }
1110
1111 // Basic initializations
1112 coder->sequence = SEQ_STREAM_HEADER;
1113 coder->block_size = (size_t)(block_size);
1114 coder->outbuf_alloc_size = (size_t)(outbuf_size_max);
1115 coder->thread_error = LZMA_OK;
1116 coder->thr = NULL;
1117
1118 // Allocate the thread-specific base structures.
1119 assert(options->threads > 0);
1120 if (coder->threads_max != options->threads) {
1121 threads_end(coder, allocator);
1122
1123 coder->threads = NULL;
1124 coder->threads_max = 0;
1125
1126 coder->threads_initialized = 0;
1127 coder->threads_free = NULL;
1128
1129 coder->threads = lzma_alloc(
1130 options->threads * sizeof(worker_thread),
1131 allocator);
1132 if (coder->threads == NULL)
1133 return LZMA_MEM_ERROR;
1134
1135 coder->threads_max = options->threads;
1136 } else {
1137 // Reuse the old structures and threads. Tell the running
1138 // threads to stop and wait until they have stopped.
1139 threads_stop(coder, true);
1140 }
1141
1142 // Output queue
1143 return_if_error(lzma_outq_init(&coder->outq, allocator,
1144 options->threads));
1145
1146 // Timeout
1147 coder->timeout = options->timeout;
1148
1149 // Free the old filter chain and the cache.
1150 lzma_filters_free(coder->filters, allocator);
1151 lzma_filters_free(coder->filters_cache, allocator);
1152
1153 // Copy the new filter chain.
1154 return_if_error(lzma_filters_copy(
1155 filters, coder->filters, allocator));
1156
1157 // Index
1158 lzma_index_end(coder->index, allocator);
1159 coder->index = lzma_index_init(allocator);
1160 if (coder->index == NULL)
1161 return LZMA_MEM_ERROR;
1162
1163 // Stream Header
1164 coder->stream_flags.version = 0;
1165 coder->stream_flags.check = options->check;
1166 return_if_error(lzma_stream_header_encode(
1167 &coder->stream_flags, coder->header));
1168
1169 coder->header_pos = 0;
1170
1171 // Progress info
1172 coder->progress_in = 0;
1173 coder->progress_out = LZMA_STREAM_HEADER_SIZE;
1174
1175 return LZMA_OK;
1176}
1177
1178
1179#ifdef HAVE_SYMBOL_VERSIONS_LINUX
1180// These are for compatibility with binaries linked against liblzma that
1181// has been patched with xz-5.2.2-compat-libs.patch from RHEL/CentOS 7.
1182// Actually that patch didn't create lzma_stream_encoder_mt@XZ_5.2.2
1183// but it has been added here anyway since someone might misread the
1184// RHEL patch and think both @XZ_5.1.2alpha and @XZ_5.2.2 exist.
1185LZMA_SYMVER_API("lzma_stream_encoder_mt@XZ_5.1.2alpha",
1186 lzma_ret, lzma_stream_encoder_mt_512a)(
1187 lzma_stream *strm, const lzma_mt *options)
1188 lzma_nothrow lzma_attr_warn_unused_result
1189 __attribute__((__alias__("lzma_stream_encoder_mt_52")));
1190
1191LZMA_SYMVER_API("lzma_stream_encoder_mt@XZ_5.2.2",
1192 lzma_ret, lzma_stream_encoder_mt_522)(
1193 lzma_stream *strm, const lzma_mt *options)
1194 lzma_nothrow lzma_attr_warn_unused_result
1195 __attribute__((__alias__("lzma_stream_encoder_mt_52")));
1196
1197LZMA_SYMVER_API("lzma_stream_encoder_mt@@XZ_5.2",
1198 lzma_ret, lzma_stream_encoder_mt_52)(
1199 lzma_stream *strm, const lzma_mt *options)
1200 lzma_nothrow lzma_attr_warn_unused_result;
1201
1202#define lzma_stream_encoder_mt lzma_stream_encoder_mt_52
1203#endif
1204extern LZMA_API(lzma_ret)
1205lzma_stream_encoder_mt(lzma_stream *strm, const lzma_mt *options)
1206{
1207 lzma_next_strm_init(stream_encoder_mt_init, strm, options);
1208
1209 strm->internal->supported_actions[LZMA_RUN] = true;
1210// strm->internal->supported_actions[LZMA_SYNC_FLUSH] = true;
1211 strm->internal->supported_actions[LZMA_FULL_FLUSH] = true;
1212 strm->internal->supported_actions[LZMA_FULL_BARRIER] = true;
1213 strm->internal->supported_actions[LZMA_FINISH] = true;
1214
1215 return LZMA_OK;
1216}
1217
1218
1219#ifdef HAVE_SYMBOL_VERSIONS_LINUX
1220LZMA_SYMVER_API("lzma_stream_encoder_mt_memusage@XZ_5.1.2alpha",
1221 uint64_t, lzma_stream_encoder_mt_memusage_512a)(
1222 const lzma_mt *options) lzma_nothrow lzma_attr_pure
1223 __attribute__((__alias__("lzma_stream_encoder_mt_memusage_52")));
1224
1225LZMA_SYMVER_API("lzma_stream_encoder_mt_memusage@XZ_5.2.2",
1226 uint64_t, lzma_stream_encoder_mt_memusage_522)(
1227 const lzma_mt *options) lzma_nothrow lzma_attr_pure
1228 __attribute__((__alias__("lzma_stream_encoder_mt_memusage_52")));
1229
1230LZMA_SYMVER_API("lzma_stream_encoder_mt_memusage@@XZ_5.2",
1231 uint64_t, lzma_stream_encoder_mt_memusage_52)(
1232 const lzma_mt *options) lzma_nothrow lzma_attr_pure;
1233
1234#define lzma_stream_encoder_mt_memusage lzma_stream_encoder_mt_memusage_52
1235#endif
1236// This function name is a monster but it's consistent with the older
1237// monster names. :-( 31 chars is the max that C99 requires so in that
1238// sense it's not too long. ;-)
1239extern LZMA_API(uint64_t)
1240lzma_stream_encoder_mt_memusage(const lzma_mt *options)
1241{
1242 lzma_options_easy easy;
1243 const lzma_filter *filters;
1244 uint64_t block_size;
1245 uint64_t outbuf_size_max;
1246
1247 if (get_options(options, &easy, &filters, &block_size,
1248 &outbuf_size_max) != LZMA_OK)
1249 return UINT64_MAX;
1250
1251 // Memory usage of the input buffers
1252 const uint64_t inbuf_memusage = options->threads * block_size;
1253
1254 // Memory usage of the filter encoders
1255 uint64_t filters_memusage = lzma_raw_encoder_memusage(filters);
1256 if (filters_memusage == UINT64_MAX)
1257 return UINT64_MAX;
1258
1259 filters_memusage *= options->threads;
1260
1261 // Memory usage of the output queue
1262 const uint64_t outq_memusage = lzma_outq_memusage(
1263 outbuf_size_max, options->threads);
1264 if (outq_memusage == UINT64_MAX)
1265 return UINT64_MAX;
1266
1267 // Sum them with overflow checking.
1268 uint64_t total_memusage = LZMA_MEMUSAGE_BASE
1269 + sizeof(lzma_stream_coder)
1270 + options->threads * sizeof(worker_thread);
1271
1272 if (UINT64_MAX - total_memusage < inbuf_memusage)
1273 return UINT64_MAX;
1274
1275 total_memusage += inbuf_memusage;
1276
1277 if (UINT64_MAX - total_memusage < filters_memusage)
1278 return UINT64_MAX;
1279
1280 total_memusage += filters_memusage;
1281
1282 if (UINT64_MAX - total_memusage < outq_memusage)
1283 return UINT64_MAX;
1284
1285 return total_memusage + outq_memusage;
1286}
Note: See TracBrowser for help on using the repository browser.

© 2025 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette