From 924ef8b26953d45b8f0c42595fd9a9468b4890d1 Mon Sep 17 00:00:00 2001 From: QingQiz Date: Sat, 25 Jul 2020 22:47:30 +0800 Subject: [PATCH] refactor: Optimized the implementation of thread pool fix: progress bar: wrong index --- src/you_get/common.py | 64 ++++++++++++++----------------------------- 1 file changed, 20 insertions(+), 44 deletions(-) diff --git a/src/you_get/common.py b/src/you_get/common.py index f323828e..35b15cca 100755 --- a/src/you_get/common.py +++ b/src/you_get/common.py @@ -517,60 +517,35 @@ def parallel_in_process(target, params_list, **kwargs): return Pool(job).map(lambda_to_function, params_list) -def parallel_in_thread(target, params_list, sort, **kwargs): - import queue +def parallel_in_thread(target, params_list, **kwargs): from threading import Thread - tasks = queue.Queue() - result = queue.Queue() + length = len(params_list) + step = (length + job - 1) // job - for i in (params_list if not sort else enumerate(params_list)): - tasks.put(i) + result = [None for i in range(length)] - def action_sort(): - while not tasks.empty(): - idx, params = tasks.get(block=False) + def do_with_retry(n, act, args): + while n >= 0: try: - result.put((idx, target(*params))) - except Exception as e: - logging.warn(str(e)) - tasks.put((idx, params)) + return act(*args) + except: + n -= 1 - def action_not_sort(): - while not tasks.empty(): - try: - params = tasks.get(block=False) - result.put(target(*params)) - except Exception as e: - logging.warn(str(e)) - tasks.put(params) + def action(j): + for i in range(step * j, min(length, step * (j + 1))): + params = params_list[i] + result[i] = (do_with_retry(3, target, params)) - def f(): - try: - if sort: - action_sort() - else: - action_not_sort() - except queue.Empty: - return - - pool = [Thread(target=f) for i in range(job)] + pool = [Thread(target=action, args=(i,)) for i in range(job)] [i.start() for i in pool] + [i.join() for i in pool] - ret = [result.get() for i in range(len(params_list))] - - if not ret: - return [] - - if sort: - ret = sorted(ret) - return list(zip(*ret))[1] - - return ret + return result -def parallel_run(target, params_list, sort_result=False, use_thread=True, **kwargs): +def parallel_run(target, params_list, use_thread=True, **kwargs): if len(params_list) == 0: return [] @@ -578,7 +553,7 @@ def parallel_run(target, params_list, sort_result=False, use_thread=True, **kwar return [target(*i) for i in params_list] if use_thread: - res = parallel_in_thread(target, params_list, sort_result, **kwargs) + res = parallel_in_thread(target, params_list, **kwargs) else: res = parallel_in_process(target, params_list, **kwargs) return res @@ -1078,6 +1053,7 @@ def download_urls( else: print('Downloading %s ...' % tr(output_filename)) bar.update() + bar.update_piece(0) params = [[i, url] for i, url in enumerate(urls)] @@ -1085,11 +1061,11 @@ def download_urls( output_filename_i = get_output_filename(urls, title, ext, output_dir, merge, part=i) output_filepath_i = os.path.join(output_dir, output_filename_i) # print 'Downloading %s [%s/%s]...' % (tr(filename), i + 1, len(urls)) - bar.update_piece() url_save( url, output_filepath_i, bar, refer=refer, is_part=True, faker=faker, headers=headers, **kwargs ) + bar.update_piece() return output_filepath_i parts = parallel_run(action, params, True)