refactor: Optimized the implementation of thread pool

fix: progress bar: wrong index
This commit is contained in:
QingQiz 2020-07-25 22:47:30 +08:00
parent c83ca1fc07
commit 924ef8b269

View File

@ -517,60 +517,35 @@ def parallel_in_process(target, params_list, **kwargs):
return Pool(job).map(lambda_to_function, params_list)
def parallel_in_thread(target, params_list, sort, **kwargs):
import queue
def parallel_in_thread(target, params_list, **kwargs):
from threading import Thread
tasks = queue.Queue()
result = queue.Queue()
length = len(params_list)
step = (length + job - 1) // job
for i in (params_list if not sort else enumerate(params_list)):
tasks.put(i)
result = [None for i in range(length)]
def action_sort():
while not tasks.empty():
idx, params = tasks.get(block=False)
def do_with_retry(n, act, args):
while n >= 0:
try:
result.put((idx, target(*params)))
except Exception as e:
logging.warn(str(e))
tasks.put((idx, params))
return act(*args)
except:
n -= 1
def action_not_sort():
while not tasks.empty():
try:
params = tasks.get(block=False)
result.put(target(*params))
except Exception as e:
logging.warn(str(e))
tasks.put(params)
def action(j):
for i in range(step * j, min(length, step * (j + 1))):
params = params_list[i]
result[i] = (do_with_retry(3, target, params))
def f():
try:
if sort:
action_sort()
else:
action_not_sort()
except queue.Empty:
return
pool = [Thread(target=f) for i in range(job)]
pool = [Thread(target=action, args=(i,)) for i in range(job)]
[i.start() for i in pool]
[i.join() for i in pool]
ret = [result.get() for i in range(len(params_list))]
if not ret:
return []
if sort:
ret = sorted(ret)
return list(zip(*ret))[1]
return ret
return result
def parallel_run(target, params_list, sort_result=False, use_thread=True, **kwargs):
def parallel_run(target, params_list, use_thread=True, **kwargs):
if len(params_list) == 0:
return []
@ -578,7 +553,7 @@ def parallel_run(target, params_list, sort_result=False, use_thread=True, **kwar
return [target(*i) for i in params_list]
if use_thread:
res = parallel_in_thread(target, params_list, sort_result, **kwargs)
res = parallel_in_thread(target, params_list, **kwargs)
else:
res = parallel_in_process(target, params_list, **kwargs)
return res
@ -1078,6 +1053,7 @@ def download_urls(
else:
print('Downloading %s ...' % tr(output_filename))
bar.update()
bar.update_piece(0)
params = [[i, url] for i, url in enumerate(urls)]
@ -1085,11 +1061,11 @@ def download_urls(
output_filename_i = get_output_filename(urls, title, ext, output_dir, merge, part=i)
output_filepath_i = os.path.join(output_dir, output_filename_i)
# print 'Downloading %s [%s/%s]...' % (tr(filename), i + 1, len(urls))
bar.update_piece()
url_save(
url, output_filepath_i, bar, refer=refer, is_part=True, faker=faker,
headers=headers, **kwargs
)
bar.update_piece()
return output_filepath_i
parts = parallel_run(action, params, True)