/////////////////////////////////////////////////////////////////////////////// // /// \file stream_encoder_mt.c /// \brief Multithreaded .xz Stream encoder // // Author: Lasse Collin // // This file has been put into the public domain. // You can do whatever you want with this file. // /////////////////////////////////////////////////////////////////////////////// #include "filter_encoder.h" #include "easy_preset.h" #include "block_encoder.h" #include "block_buffer_encoder.h" #include "index_encoder.h" #include "outqueue.h" /// Maximum supported block size. This makes it simpler to prevent integer /// overflows if we are given unusually large block size. #define BLOCK_SIZE_MAX (UINT64_MAX / LZMA_THREADS_MAX) typedef enum { /// Waiting for work. THR_IDLE, /// Encoding is in progress. THR_RUN, /// Encoding is in progress but no more input data will /// be read. THR_FINISH, /// The main thread wants the thread to stop whatever it was doing /// but not exit. THR_STOP, /// The main thread wants the thread to exit. We could use /// cancellation but since there's stopped anyway, this is lazier. THR_EXIT, } worker_state; typedef struct lzma_stream_coder_s lzma_stream_coder; typedef struct worker_thread_s worker_thread; struct worker_thread_s { worker_state state; /// Input buffer of coder->block_size bytes. The main thread will /// put new input into this and update in_size accordingly. Once /// no more input is coming, state will be set to THR_FINISH. uint8_t *in; /// Amount of data available in the input buffer. This is modified /// only by the main thread. size_t in_size; /// Output buffer for this thread. This is set by the main /// thread every time a new Block is started with this thread /// structure. lzma_outbuf *outbuf; /// Pointer to the main structure is needed when putting this /// thread back to the stack of free threads. lzma_stream_coder *coder; /// The allocator is set by the main thread. Since a copy of the /// pointer is kept here, the application must not change the /// allocator before calling lzma_end(). const lzma_allocator *allocator; /// Amount of uncompressed data that has already been compressed. uint64_t progress_in; /// Amount of compressed data that is ready. uint64_t progress_out; /// Block encoder lzma_next_coder block_encoder; /// Compression options for this Block lzma_block block_options; /// Filter chain for this thread. By copying the filters array /// to each thread it is possible to change the filter chain /// between Blocks using lzma_filters_update(). lzma_filter filters[LZMA_FILTERS_MAX + 1]; /// Next structure in the stack of free worker threads. worker_thread *next; mythread_mutex mutex; mythread_cond cond; /// The ID of this thread is used to join the thread /// when it's not needed anymore. mythread thread_id; }; struct lzma_stream_coder_s { enum { SEQ_STREAM_HEADER, SEQ_BLOCK, SEQ_INDEX, SEQ_STREAM_FOOTER, } sequence; /// Start a new Block every block_size bytes of input unless /// LZMA_FULL_FLUSH or LZMA_FULL_BARRIER is used earlier. size_t block_size; /// The filter chain to use for the next Block. /// This can be updated using lzma_filters_update() /// after LZMA_FULL_BARRIER or LZMA_FULL_FLUSH. lzma_filter filters[LZMA_FILTERS_MAX + 1]; /// A copy of filters[] will be put here when attempting to get /// a new worker thread. This will be copied to a worker thread /// when a thread becomes free and then this cache is marked as /// empty by setting [0].id = LZMA_VLI_UNKNOWN. Without this cache /// the filter options from filters[] would get uselessly copied /// multiple times (allocated and freed) when waiting for a new free /// worker thread. /// /// This is freed if filters[] is updated via lzma_filters_update(). lzma_filter filters_cache[LZMA_FILTERS_MAX + 1]; /// Index to hold sizes of the Blocks lzma_index *index; /// Index encoder lzma_next_coder index_encoder; /// Stream Flags for encoding the Stream Header and Stream Footer. lzma_stream_flags stream_flags; /// Buffer to hold Stream Header and Stream Footer. uint8_t header[LZMA_STREAM_HEADER_SIZE]; /// Read position in header[] size_t header_pos; /// Output buffer queue for compressed data lzma_outq outq; /// How much memory to allocate for each lzma_outbuf.buf size_t outbuf_alloc_size; /// Maximum wait time if cannot use all the input and cannot /// fill the output buffer. This is in milliseconds. uint32_t timeout; /// Error code from a worker thread lzma_ret thread_error; /// Array of allocated thread-specific structures worker_thread *threads; /// Number of structures in "threads" above. This is also the /// number of threads that will be created at maximum. uint32_t threads_max; /// Number of thread structures that have been initialized, and /// thus the number of worker threads actually created so far. uint32_t threads_initialized; /// Stack of free threads. When a thread finishes, it puts itself /// back into this stack. This starts as empty because threads /// are created only when actually needed. worker_thread *threads_free; /// The most recent worker thread to which the main thread writes /// the new input from the application. worker_thread *thr; /// Amount of uncompressed data in Blocks that have already /// been finished. uint64_t progress_in; /// Amount of compressed data in Stream Header + Blocks that /// have already been finished. uint64_t progress_out; mythread_mutex mutex; mythread_cond cond; }; /// Tell the main thread that something has gone wrong. static void worker_error(worker_thread *thr, lzma_ret ret) { assert(ret != LZMA_OK); assert(ret != LZMA_STREAM_END); mythread_sync(thr->coder->mutex) { if (thr->coder->thread_error == LZMA_OK) thr->coder->thread_error = ret; mythread_cond_signal(&thr->coder->cond); } return; } static worker_state worker_encode(worker_thread *thr, size_t *out_pos, worker_state state) { assert(thr->progress_in == 0); assert(thr->progress_out == 0); // Set the Block options. thr->block_options = (lzma_block){ .version = 0, .check = thr->coder->stream_flags.check, .compressed_size = thr->outbuf->allocated, .uncompressed_size = thr->coder->block_size, .filters = thr->filters, }; // Calculate maximum size of the Block Header. This amount is // reserved in the beginning of the buffer so that Block Header // along with Compressed Size and Uncompressed Size can be // written there. lzma_ret ret = lzma_block_header_size(&thr->block_options); if (ret != LZMA_OK) { worker_error(thr, ret); return THR_STOP; } // Initialize the Block encoder. ret = lzma_block_encoder_init(&thr->block_encoder, thr->allocator, &thr->block_options); if (ret != LZMA_OK) { worker_error(thr, ret); return THR_STOP; } size_t in_pos = 0; size_t in_size = 0; *out_pos = thr->block_options.header_size; const size_t out_size = thr->outbuf->allocated; do { mythread_sync(thr->mutex) { // Store in_pos and *out_pos into *thr so that // an application may read them via // lzma_get_progress() to get progress information. // // NOTE: These aren't updated when the encoding // finishes. Instead, the final values are taken // later from thr->outbuf. thr->progress_in = in_pos; thr->progress_out = *out_pos; while (in_size == thr->in_size && thr->state == THR_RUN) mythread_cond_wait(&thr->cond, &thr->mutex); state = thr->state; in_size = thr->in_size; } // Return if we were asked to stop or exit. if (state >= THR_STOP) return state; lzma_action action = state == THR_FINISH ? LZMA_FINISH : LZMA_RUN; // Limit the amount of input given to the Block encoder // at once. This way this thread can react fairly quickly // if the main thread wants us to stop or exit. static const size_t in_chunk_max = 16384; size_t in_limit = in_size; if (in_size - in_pos > in_chunk_max) { in_limit = in_pos + in_chunk_max; action = LZMA_RUN; } ret = thr->block_encoder.code( thr->block_encoder.coder, thr->allocator, thr->in, &in_pos, in_limit, thr->outbuf->buf, out_pos, out_size, action); } while (ret == LZMA_OK && *out_pos < out_size); switch (ret) { case LZMA_STREAM_END: assert(state == THR_FINISH); // Encode the Block Header. By doing it after // the compression, we can store the Compressed Size // and Uncompressed Size fields. ret = lzma_block_header_encode(&thr->block_options, thr->outbuf->buf); if (ret != LZMA_OK) { worker_error(thr, ret); return THR_STOP; } break; case LZMA_OK: // The data was incompressible. Encode it using uncompressed // LZMA2 chunks. // // First wait that we have gotten all the input. mythread_sync(thr->mutex) { while (thr->state == THR_RUN) mythread_cond_wait(&thr->cond, &thr->mutex); state = thr->state; in_size = thr->in_size; } if (state >= THR_STOP) return state; // Do the encoding. This takes care of the Block Header too. *out_pos = 0; ret = lzma_block_uncomp_encode(&thr->block_options, thr->in, in_size, thr->outbuf->buf, out_pos, out_size); // It shouldn't fail. if (ret != LZMA_OK) { worker_error(thr, LZMA_PROG_ERROR); return THR_STOP; } break; default: worker_error(thr, ret); return THR_STOP; } // Set the size information that will be read by the main thread // to write the Index field. thr->outbuf->unpadded_size = lzma_block_unpadded_size(&thr->block_options); assert(thr->outbuf->unpadded_size != 0); thr->outbuf->uncompressed_size = thr->block_options.uncompressed_size; return THR_FINISH; } static MYTHREAD_RET_TYPE #ifndef VBOX worker_start(void *thr_ptr) #else worker_start(RTTHREAD hThread, void *thr_ptr) #endif { worker_thread *thr = thr_ptr; worker_state state = THR_IDLE; // Init to silence a warning while (true) { // Wait for work. mythread_sync(thr->mutex) { while (true) { // The thread is already idle so if we are // requested to stop, just set the state. if (thr->state == THR_STOP) { thr->state = THR_IDLE; mythread_cond_signal(&thr->cond); } state = thr->state; if (state != THR_IDLE) break; mythread_cond_wait(&thr->cond, &thr->mutex); } } size_t out_pos = 0; assert(state != THR_IDLE); assert(state != THR_STOP); if (state <= THR_FINISH) state = worker_encode(thr, &out_pos, state); if (state == THR_EXIT) break; // Mark the thread as idle unless the main thread has // told us to exit. Signal is needed for the case // where the main thread is waiting for the threads to stop. mythread_sync(thr->mutex) { if (thr->state != THR_EXIT) { thr->state = THR_IDLE; mythread_cond_signal(&thr->cond); } } mythread_sync(thr->coder->mutex) { // If no errors occurred, make the encoded data // available to be copied out. if (state == THR_FINISH) { thr->outbuf->pos = out_pos; thr->outbuf->finished = true; } // Update the main progress info. thr->coder->progress_in += thr->outbuf->uncompressed_size; thr->coder->progress_out += out_pos; thr->progress_in = 0; thr->progress_out = 0; // Return this thread to the stack of free threads. thr->next = thr->coder->threads_free; thr->coder->threads_free = thr; mythread_cond_signal(&thr->coder->cond); } } // Exiting, free the resources. lzma_filters_free(thr->filters, thr->allocator); mythread_mutex_destroy(&thr->mutex); mythread_cond_destroy(&thr->cond); lzma_next_end(&thr->block_encoder, thr->allocator); lzma_free(thr->in, thr->allocator); return MYTHREAD_RET_VALUE; } /// Make the threads stop but not exit. Optionally wait for them to stop. static void threads_stop(lzma_stream_coder *coder, bool wait_for_threads) { // Tell the threads to stop. for (uint32_t i = 0; i < coder->threads_initialized; ++i) { mythread_sync(coder->threads[i].mutex) { coder->threads[i].state = THR_STOP; mythread_cond_signal(&coder->threads[i].cond); } } if (!wait_for_threads) return; // Wait for the threads to settle in the idle state. for (uint32_t i = 0; i < coder->threads_initialized; ++i) { mythread_sync(coder->threads[i].mutex) { while (coder->threads[i].state != THR_IDLE) mythread_cond_wait(&coder->threads[i].cond, &coder->threads[i].mutex); } } return; } /// Stop the threads and free the resources associated with them. /// Wait until the threads have exited. static void threads_end(lzma_stream_coder *coder, const lzma_allocator *allocator) { for (uint32_t i = 0; i < coder->threads_initialized; ++i) { mythread_sync(coder->threads[i].mutex) { coder->threads[i].state = THR_EXIT; mythread_cond_signal(&coder->threads[i].cond); } } for (uint32_t i = 0; i < coder->threads_initialized; ++i) { int ret = mythread_join(coder->threads[i].thread_id); assert(ret == 0); (void)ret; } lzma_free(coder->threads, allocator); return; } /// Initialize a new worker_thread structure and create a new thread. static lzma_ret initialize_new_thread(lzma_stream_coder *coder, const lzma_allocator *allocator) { worker_thread *thr = &coder->threads[coder->threads_initialized]; thr->in = lzma_alloc(coder->block_size, allocator); if (thr->in == NULL) return LZMA_MEM_ERROR; if (mythread_mutex_init(&thr->mutex)) goto error_mutex; if (mythread_cond_init(&thr->cond)) goto error_cond; thr->state = THR_IDLE; thr->allocator = allocator; thr->coder = coder; thr->progress_in = 0; thr->progress_out = 0; thr->block_encoder = LZMA_NEXT_CODER_INIT; thr->filters[0].id = LZMA_VLI_UNKNOWN; if (mythread_create(&thr->thread_id, &worker_start, thr)) goto error_thread; ++coder->threads_initialized; coder->thr = thr; return LZMA_OK; error_thread: mythread_cond_destroy(&thr->cond); error_cond: mythread_mutex_destroy(&thr->mutex); error_mutex: lzma_free(thr->in, allocator); return LZMA_MEM_ERROR; } static lzma_ret get_thread(lzma_stream_coder *coder, const lzma_allocator *allocator) { // If there are no free output subqueues, there is no // point to try getting a thread. if (!lzma_outq_has_buf(&coder->outq)) return LZMA_OK; // That's also true if we cannot allocate memory for the output // buffer in the output queue. return_if_error(lzma_outq_prealloc_buf(&coder->outq, allocator, coder->outbuf_alloc_size)); // Make a thread-specific copy of the filter chain. Put it in // the cache array first so that if we cannot get a new thread yet, // the allocation is ready when we try again. if (coder->filters_cache[0].id == LZMA_VLI_UNKNOWN) return_if_error(lzma_filters_copy( coder->filters, coder->filters_cache, allocator)); // If there is a free structure on the stack, use it. mythread_sync(coder->mutex) { if (coder->threads_free != NULL) { coder->thr = coder->threads_free; coder->threads_free = coder->threads_free->next; } } if (coder->thr == NULL) { // If there are no uninitialized structures left, return. if (coder->threads_initialized == coder->threads_max) return LZMA_OK; // Initialize a new thread. return_if_error(initialize_new_thread(coder, allocator)); } // Reset the parts of the thread state that have to be done // in the main thread. mythread_sync(coder->thr->mutex) { coder->thr->state = THR_RUN; coder->thr->in_size = 0; coder->thr->outbuf = lzma_outq_get_buf(&coder->outq, NULL); // Free the old thread-specific filter options and replace // them with the already-allocated new options from // coder->filters_cache[]. Then mark the cache as empty. lzma_filters_free(coder->thr->filters, allocator); memcpy(coder->thr->filters, coder->filters_cache, sizeof(coder->filters_cache)); coder->filters_cache[0].id = LZMA_VLI_UNKNOWN; mythread_cond_signal(&coder->thr->cond); } return LZMA_OK; } static lzma_ret stream_encode_in(lzma_stream_coder *coder, const lzma_allocator *allocator, const uint8_t *restrict in, size_t *restrict in_pos, size_t in_size, lzma_action action) { while (*in_pos < in_size || (coder->thr != NULL && action != LZMA_RUN)) { if (coder->thr == NULL) { // Get a new thread. const lzma_ret ret = get_thread(coder, allocator); if (coder->thr == NULL) return ret; } // Copy the input data to thread's buffer. size_t thr_in_size = coder->thr->in_size; lzma_bufcpy(in, in_pos, in_size, coder->thr->in, &thr_in_size, coder->block_size); // Tell the Block encoder to finish if // - it has got block_size bytes of input; or // - all input was used and LZMA_FINISH, LZMA_FULL_FLUSH, // or LZMA_FULL_BARRIER was used. // // TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER. const bool finish = thr_in_size == coder->block_size || (*in_pos == in_size && action != LZMA_RUN); bool block_error = false; mythread_sync(coder->thr->mutex) { if (coder->thr->state == THR_IDLE) { // Something has gone wrong with the Block // encoder. It has set coder->thread_error // which we will read a few lines later. block_error = true; } else { // Tell the Block encoder its new amount // of input and update the state if needed. coder->thr->in_size = thr_in_size; if (finish) coder->thr->state = THR_FINISH; mythread_cond_signal(&coder->thr->cond); } } if (block_error) { lzma_ret ret; mythread_sync(coder->mutex) { ret = coder->thread_error; } return ret; } if (finish) coder->thr = NULL; } return LZMA_OK; } /// Wait until more input can be consumed, more output can be read, or /// an optional timeout is reached. static bool wait_for_work(lzma_stream_coder *coder, mythread_condtime *wait_abs, bool *has_blocked, bool has_input) { if (coder->timeout != 0 && !*has_blocked) { // Every time when stream_encode_mt() is called via // lzma_code(), *has_blocked starts as false. We set it // to true here and calculate the absolute time when // we must return if there's nothing to do. // // This way if we block multiple times for short moments // less than "timeout" milliseconds, we will return once // "timeout" amount of time has passed since the *first* // blocking occurred. If the absolute time was calculated // again every time we block, "timeout" would effectively // be meaningless if we never consecutively block longer // than "timeout" ms. *has_blocked = true; mythread_condtime_set(wait_abs, &coder->cond, coder->timeout); } bool timed_out = false; mythread_sync(coder->mutex) { // There are four things that we wait. If one of them // becomes possible, we return. // - If there is input left, we need to get a free // worker thread and an output buffer for it. // - Data ready to be read from the output queue. // - A worker thread indicates an error. // - Time out occurs. while ((!has_input || coder->threads_free == NULL || !lzma_outq_has_buf(&coder->outq)) && !lzma_outq_is_readable(&coder->outq) && coder->thread_error == LZMA_OK && !timed_out) { if (coder->timeout != 0) timed_out = mythread_cond_timedwait( &coder->cond, &coder->mutex, wait_abs) != 0; else mythread_cond_wait(&coder->cond, &coder->mutex); } } return timed_out; } static lzma_ret stream_encode_mt(void *coder_ptr, const lzma_allocator *allocator, const uint8_t *restrict in, size_t *restrict in_pos, size_t in_size, uint8_t *restrict out, size_t *restrict out_pos, size_t out_size, lzma_action action) { lzma_stream_coder *coder = coder_ptr; switch (coder->sequence) { case SEQ_STREAM_HEADER: lzma_bufcpy(coder->header, &coder->header_pos, sizeof(coder->header), out, out_pos, out_size); if (coder->header_pos < sizeof(coder->header)) return LZMA_OK; coder->header_pos = 0; coder->sequence = SEQ_BLOCK; // Fall through case SEQ_BLOCK: { // Initialized to silence warnings. lzma_vli unpadded_size = 0; lzma_vli uncompressed_size = 0; lzma_ret ret = LZMA_OK; // These are for wait_for_work(). bool has_blocked = false; mythread_condtime wait_abs; while (true) { mythread_sync(coder->mutex) { // Check for Block encoder errors. ret = coder->thread_error; if (ret != LZMA_OK) { assert(ret != LZMA_STREAM_END); break; // Break out of mythread_sync. } // Try to read compressed data to out[]. ret = lzma_outq_read(&coder->outq, allocator, out, out_pos, out_size, &unpadded_size, &uncompressed_size); } if (ret == LZMA_STREAM_END) { // End of Block. Add it to the Index. ret = lzma_index_append(coder->index, allocator, unpadded_size, uncompressed_size); if (ret != LZMA_OK) { threads_stop(coder, false); return ret; } // If we didn't fill the output buffer yet, // try to read more data. Maybe the next // outbuf has been finished already too. if (*out_pos < out_size) continue; } if (ret != LZMA_OK) { // coder->thread_error was set. threads_stop(coder, false); return ret; } // Try to give uncompressed data to a worker thread. ret = stream_encode_in(coder, allocator, in, in_pos, in_size, action); if (ret != LZMA_OK) { threads_stop(coder, false); return ret; } // See if we should wait or return. // // TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER. if (*in_pos == in_size) { // LZMA_RUN: More data is probably coming // so return to let the caller fill the // input buffer. if (action == LZMA_RUN) return LZMA_OK; // LZMA_FULL_BARRIER: The same as with // LZMA_RUN but tell the caller that the // barrier was completed. if (action == LZMA_FULL_BARRIER) return LZMA_STREAM_END; // Finishing or flushing isn't completed until // all input data has been encoded and copied // to the output buffer. if (lzma_outq_is_empty(&coder->outq)) { // LZMA_FINISH: Continue to encode // the Index field. if (action == LZMA_FINISH) break; // LZMA_FULL_FLUSH: Return to tell // the caller that flushing was // completed. if (action == LZMA_FULL_FLUSH) return LZMA_STREAM_END; } } // Return if there is no output space left. // This check must be done after testing the input // buffer, because we might want to use a different // return code. if (*out_pos == out_size) return LZMA_OK; // Neither in nor out has been used completely. // Wait until there's something we can do. if (wait_for_work(coder, &wait_abs, &has_blocked, *in_pos < in_size)) return LZMA_TIMED_OUT; } // All Blocks have been encoded and the threads have stopped. // Prepare to encode the Index field. return_if_error(lzma_index_encoder_init( &coder->index_encoder, allocator, coder->index)); coder->sequence = SEQ_INDEX; // Update the progress info to take the Index and // Stream Footer into account. Those are very fast to encode // so in terms of progress information they can be thought // to be ready to be copied out. coder->progress_out += lzma_index_size(coder->index) + LZMA_STREAM_HEADER_SIZE; } // Fall through case SEQ_INDEX: { // Call the Index encoder. It doesn't take any input, so // those pointers can be NULL. const lzma_ret ret = coder->index_encoder.code( coder->index_encoder.coder, allocator, NULL, NULL, 0, out, out_pos, out_size, LZMA_RUN); if (ret != LZMA_STREAM_END) return ret; // Encode the Stream Footer into coder->buffer. coder->stream_flags.backward_size = lzma_index_size(coder->index); if (lzma_stream_footer_encode(&coder->stream_flags, coder->header) != LZMA_OK) return LZMA_PROG_ERROR; coder->sequence = SEQ_STREAM_FOOTER; } // Fall through case SEQ_STREAM_FOOTER: lzma_bufcpy(coder->header, &coder->header_pos, sizeof(coder->header), out, out_pos, out_size); return coder->header_pos < sizeof(coder->header) ? LZMA_OK : LZMA_STREAM_END; } assert(0); return LZMA_PROG_ERROR; } static void stream_encoder_mt_end(void *coder_ptr, const lzma_allocator *allocator) { lzma_stream_coder *coder = coder_ptr; // Threads must be killed before the output queue can be freed. threads_end(coder, allocator); lzma_outq_end(&coder->outq, allocator); lzma_filters_free(coder->filters, allocator); lzma_filters_free(coder->filters_cache, allocator); lzma_next_end(&coder->index_encoder, allocator); lzma_index_end(coder->index, allocator); mythread_cond_destroy(&coder->cond); mythread_mutex_destroy(&coder->mutex); lzma_free(coder, allocator); return; } static lzma_ret stream_encoder_mt_update(void *coder_ptr, const lzma_allocator *allocator, const lzma_filter *filters, const lzma_filter *reversed_filters lzma_attribute((__unused__))) { lzma_stream_coder *coder = coder_ptr; // Applications shouldn't attempt to change the options when // we are already encoding the Index or Stream Footer. if (coder->sequence > SEQ_BLOCK) return LZMA_PROG_ERROR; // For now the threaded encoder doesn't support changing // the options in the middle of a Block. if (coder->thr != NULL) return LZMA_PROG_ERROR; // Check if the filter chain seems mostly valid. See the comment // in stream_encoder_mt_init(). if (lzma_raw_encoder_memusage(filters) == UINT64_MAX) return LZMA_OPTIONS_ERROR; // Make a copy to a temporary buffer first. This way the encoder // state stays unchanged if an error occurs in lzma_filters_copy(). lzma_filter temp[LZMA_FILTERS_MAX + 1]; return_if_error(lzma_filters_copy(filters, temp, allocator)); // Free the options of the old chain as well as the cache. lzma_filters_free(coder->filters, allocator); lzma_filters_free(coder->filters_cache, allocator); // Copy the new filter chain in place. memcpy(coder->filters, temp, sizeof(temp)); return LZMA_OK; } /// Options handling for lzma_stream_encoder_mt_init() and /// lzma_stream_encoder_mt_memusage() static lzma_ret get_options(const lzma_mt *options, lzma_options_easy *opt_easy, const lzma_filter **filters, uint64_t *block_size, uint64_t *outbuf_size_max) { // Validate some of the options. if (options == NULL) return LZMA_PROG_ERROR; if (options->flags != 0 || options->threads == 0 || options->threads > LZMA_THREADS_MAX) return LZMA_OPTIONS_ERROR; if (options->filters != NULL) { // Filter chain was given, use it as is. *filters = options->filters; } else { // Use a preset. if (lzma_easy_preset(opt_easy, options->preset)) return LZMA_OPTIONS_ERROR; *filters = opt_easy->filters; } // Block size if (options->block_size > 0) { if (options->block_size > BLOCK_SIZE_MAX) return LZMA_OPTIONS_ERROR; *block_size = options->block_size; } else { // Determine the Block size from the filter chain. *block_size = lzma_mt_block_size(*filters); if (*block_size == 0) return LZMA_OPTIONS_ERROR; assert(*block_size <= BLOCK_SIZE_MAX); } // Calculate the maximum amount output that a single output buffer // may need to hold. This is the same as the maximum total size of // a Block. *outbuf_size_max = lzma_block_buffer_bound64(*block_size); if (*outbuf_size_max == 0) return LZMA_MEM_ERROR; return LZMA_OK; } static void get_progress(void *coder_ptr, uint64_t *progress_in, uint64_t *progress_out) { lzma_stream_coder *coder = coder_ptr; // Lock coder->mutex to prevent finishing threads from moving their // progress info from the worker_thread structure to lzma_stream_coder. mythread_sync(coder->mutex) { *progress_in = coder->progress_in; *progress_out = coder->progress_out; for (size_t i = 0; i < coder->threads_initialized; ++i) { mythread_sync(coder->threads[i].mutex) { *progress_in += coder->threads[i].progress_in; *progress_out += coder->threads[i] .progress_out; } } } return; } static lzma_ret stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator, const lzma_mt *options) { lzma_next_coder_init(&stream_encoder_mt_init, next, allocator); // Get the filter chain. lzma_options_easy easy; const lzma_filter *filters; uint64_t block_size; uint64_t outbuf_size_max; return_if_error(get_options(options, &easy, &filters, &block_size, &outbuf_size_max)); #if SIZE_MAX < UINT64_MAX if (block_size > SIZE_MAX || outbuf_size_max > SIZE_MAX) return LZMA_MEM_ERROR; #endif // Validate the filter chain so that we can give an error in this // function instead of delaying it to the first call to lzma_code(). // The memory usage calculation verifies the filter chain as // a side effect so we take advantage of that. It's not a perfect // check though as raw encoder allows LZMA1 too but such problems // will be caught eventually with Block Header encoder. if (lzma_raw_encoder_memusage(filters) == UINT64_MAX) return LZMA_OPTIONS_ERROR; // Validate the Check ID. if ((unsigned int)(options->check) > LZMA_CHECK_ID_MAX) return LZMA_PROG_ERROR; if (!lzma_check_is_supported(options->check)) return LZMA_UNSUPPORTED_CHECK; // Allocate and initialize the base structure if needed. lzma_stream_coder *coder = next->coder; if (coder == NULL) { coder = lzma_alloc(sizeof(lzma_stream_coder), allocator); if (coder == NULL) return LZMA_MEM_ERROR; next->coder = coder; // For the mutex and condition variable initializations // the error handling has to be done here because // stream_encoder_mt_end() doesn't know if they have // already been initialized or not. if (mythread_mutex_init(&coder->mutex)) { lzma_free(coder, allocator); next->coder = NULL; return LZMA_MEM_ERROR; } if (mythread_cond_init(&coder->cond)) { mythread_mutex_destroy(&coder->mutex); lzma_free(coder, allocator); next->coder = NULL; return LZMA_MEM_ERROR; } next->code = &stream_encode_mt; next->end = &stream_encoder_mt_end; next->get_progress = &get_progress; next->update = &stream_encoder_mt_update; coder->filters[0].id = LZMA_VLI_UNKNOWN; coder->filters_cache[0].id = LZMA_VLI_UNKNOWN; coder->index_encoder = LZMA_NEXT_CODER_INIT; coder->index = NULL; memzero(&coder->outq, sizeof(coder->outq)); coder->threads = NULL; coder->threads_max = 0; coder->threads_initialized = 0; } // Basic initializations coder->sequence = SEQ_STREAM_HEADER; coder->block_size = (size_t)(block_size); coder->outbuf_alloc_size = (size_t)(outbuf_size_max); coder->thread_error = LZMA_OK; coder->thr = NULL; // Allocate the thread-specific base structures. assert(options->threads > 0); if (coder->threads_max != options->threads) { threads_end(coder, allocator); coder->threads = NULL; coder->threads_max = 0; coder->threads_initialized = 0; coder->threads_free = NULL; coder->threads = lzma_alloc( options->threads * sizeof(worker_thread), allocator); if (coder->threads == NULL) return LZMA_MEM_ERROR; coder->threads_max = options->threads; } else { // Reuse the old structures and threads. Tell the running // threads to stop and wait until they have stopped. threads_stop(coder, true); } // Output queue return_if_error(lzma_outq_init(&coder->outq, allocator, options->threads)); // Timeout coder->timeout = options->timeout; // Free the old filter chain and the cache. lzma_filters_free(coder->filters, allocator); lzma_filters_free(coder->filters_cache, allocator); // Copy the new filter chain. return_if_error(lzma_filters_copy( filters, coder->filters, allocator)); // Index lzma_index_end(coder->index, allocator); coder->index = lzma_index_init(allocator); if (coder->index == NULL) return LZMA_MEM_ERROR; // Stream Header coder->stream_flags.version = 0; coder->stream_flags.check = options->check; return_if_error(lzma_stream_header_encode( &coder->stream_flags, coder->header)); coder->header_pos = 0; // Progress info coder->progress_in = 0; coder->progress_out = LZMA_STREAM_HEADER_SIZE; return LZMA_OK; } #ifdef HAVE_SYMBOL_VERSIONS_LINUX // These are for compatibility with binaries linked against liblzma that // has been patched with xz-5.2.2-compat-libs.patch from RHEL/CentOS 7. // Actually that patch didn't create lzma_stream_encoder_mt@XZ_5.2.2 // but it has been added here anyway since someone might misread the // RHEL patch and think both @XZ_5.1.2alpha and @XZ_5.2.2 exist. LZMA_SYMVER_API("lzma_stream_encoder_mt@XZ_5.1.2alpha", lzma_ret, lzma_stream_encoder_mt_512a)( lzma_stream *strm, const lzma_mt *options) lzma_nothrow lzma_attr_warn_unused_result __attribute__((__alias__("lzma_stream_encoder_mt_52"))); LZMA_SYMVER_API("lzma_stream_encoder_mt@XZ_5.2.2", lzma_ret, lzma_stream_encoder_mt_522)( lzma_stream *strm, const lzma_mt *options) lzma_nothrow lzma_attr_warn_unused_result __attribute__((__alias__("lzma_stream_encoder_mt_52"))); LZMA_SYMVER_API("lzma_stream_encoder_mt@@XZ_5.2", lzma_ret, lzma_stream_encoder_mt_52)( lzma_stream *strm, const lzma_mt *options) lzma_nothrow lzma_attr_warn_unused_result; #define lzma_stream_encoder_mt lzma_stream_encoder_mt_52 #endif extern LZMA_API(lzma_ret) lzma_stream_encoder_mt(lzma_stream *strm, const lzma_mt *options) { lzma_next_strm_init(stream_encoder_mt_init, strm, options); strm->internal->supported_actions[LZMA_RUN] = true; // strm->internal->supported_actions[LZMA_SYNC_FLUSH] = true; strm->internal->supported_actions[LZMA_FULL_FLUSH] = true; strm->internal->supported_actions[LZMA_FULL_BARRIER] = true; strm->internal->supported_actions[LZMA_FINISH] = true; return LZMA_OK; } #ifdef HAVE_SYMBOL_VERSIONS_LINUX LZMA_SYMVER_API("lzma_stream_encoder_mt_memusage@XZ_5.1.2alpha", uint64_t, lzma_stream_encoder_mt_memusage_512a)( const lzma_mt *options) lzma_nothrow lzma_attr_pure __attribute__((__alias__("lzma_stream_encoder_mt_memusage_52"))); LZMA_SYMVER_API("lzma_stream_encoder_mt_memusage@XZ_5.2.2", uint64_t, lzma_stream_encoder_mt_memusage_522)( const lzma_mt *options) lzma_nothrow lzma_attr_pure __attribute__((__alias__("lzma_stream_encoder_mt_memusage_52"))); LZMA_SYMVER_API("lzma_stream_encoder_mt_memusage@@XZ_5.2", uint64_t, lzma_stream_encoder_mt_memusage_52)( const lzma_mt *options) lzma_nothrow lzma_attr_pure; #define lzma_stream_encoder_mt_memusage lzma_stream_encoder_mt_memusage_52 #endif // This function name is a monster but it's consistent with the older // monster names. :-( 31 chars is the max that C99 requires so in that // sense it's not too long. ;-) extern LZMA_API(uint64_t) lzma_stream_encoder_mt_memusage(const lzma_mt *options) { lzma_options_easy easy; const lzma_filter *filters; uint64_t block_size; uint64_t outbuf_size_max; if (get_options(options, &easy, &filters, &block_size, &outbuf_size_max) != LZMA_OK) return UINT64_MAX; // Memory usage of the input buffers const uint64_t inbuf_memusage = options->threads * block_size; // Memory usage of the filter encoders uint64_t filters_memusage = lzma_raw_encoder_memusage(filters); if (filters_memusage == UINT64_MAX) return UINT64_MAX; filters_memusage *= options->threads; // Memory usage of the output queue const uint64_t outq_memusage = lzma_outq_memusage( outbuf_size_max, options->threads); if (outq_memusage == UINT64_MAX) return UINT64_MAX; // Sum them with overflow checking. uint64_t total_memusage = LZMA_MEMUSAGE_BASE + sizeof(lzma_stream_coder) + options->threads * sizeof(worker_thread); if (UINT64_MAX - total_memusage < inbuf_memusage) return UINT64_MAX; total_memusage += inbuf_memusage; if (UINT64_MAX - total_memusage < filters_memusage) return UINT64_MAX; total_memusage += filters_memusage; if (UINT64_MAX - total_memusage < outq_memusage) return UINT64_MAX; return total_memusage + outq_memusage; }