feat: parallel run support for urls_size

This commit is contained in:
QingQiz 2020-07-21 13:05:47 +08:00
parent 5da4245ed0
commit e0c7476338

View File

@ -507,6 +507,71 @@ def post_content(url, headers={}, post_data={}, decoded=True, **kwargs):
return data
def parallel_in_process(target, params_list, **kwargs):
import functools
from multiprocessing import Pool
global lambda_to_function
def lambda_to_function(params):
return target(*params)
return Pool(job).map(lambda_to_function, params_list)
def parallel_in_thread(target, params_list, sort, **kwargs):
import queue
from queue import Queue
from threading import Thread
tasks = queue.Queue()
result = queue.Queue()
for i in (params_list if not sort else enumerate(params_list)):
tasks.put(i)
def action_sort():
while not tasks.empty():
idx, params = tasks.get(block=False)
result.put((idx, target(*params)))
def action_not_sort():
while not tasks.empty():
params = tasks.get(block=False)
result.put(target(*params))
def f():
try:
if sort:
action_sort()
else:
action_not_sort()
except queue.Empty:
return
pool = [Thread(target=f) for i in range(job)]
[i.start() for i in pool]
ret = [result.get() for i in range(len(params_list))]
if sort:
ret = sorted(ret)
return list(zip(*ret))[1]
else:
return ret
def parallel_run(target, params_list, sort_result=False, use_thread=True, **kwargs):
if job and job <= 1:
return [target(*i) for i in params_list]
if use_thread:
res = parallel_in_thread(target, params_list, sort_result, **kwargs)
else:
res = parallel_in_process(target, params_list, **kwargs)
return res
def url_size(url, faker=False, headers={}):
if faker:
response = urlopen_with_retry(
@ -522,7 +587,8 @@ def url_size(url, faker=False, headers={}):
def urls_size(urls, faker=False, headers={}):
return sum([url_size(url, faker=faker, headers=headers) for url in urls])
params = [[url, faker, headers] for url in urls]
return sum(parallel_run(url_size, params))
def get_head(url, headers=None, get_method='HEAD'):
@ -1550,6 +1616,10 @@ def script_main(download, download_playlist, **kwargs):
'-k', '--insecure', action='store_true', default=False,
help='ignore ssl errors'
)
download_grp.add_argument(
'-j', '--job', type=int, default=1,
help='max thread/process to use (default 1)'
)
proxy_grp = parser.add_argument_group('Proxy options')
proxy_grp = proxy_grp.add_mutually_exclusive_group()
@ -1597,10 +1667,12 @@ def script_main(download, download_playlist, **kwargs):
global output_filename
global auto_rename
global insecure
global job
output_filename = args.output_filename
extractor_proxy = args.extractor_proxy
info_only = args.info
job = args.job
if args.force:
force = True
if args.skip_existing_file_size_check:
@ -1669,6 +1741,7 @@ def script_main(download, download_playlist, **kwargs):
output_dir=args.output_dir, merge=not args.no_merge,
info_only=info_only, json_output=json_output, caption=caption,
password=args.password,
job=args.job,
**extra
)
except KeyboardInterrupt: