From e0c7476338cc42be3cf0331b6ce30db9a23c3775 Mon Sep 17 00:00:00 2001 From: QingQiz Date: Tue, 21 Jul 2020 13:05:47 +0800 Subject: [PATCH] feat: parallel run support for urls_size --- src/you_get/common.py | 75 ++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 74 insertions(+), 1 deletion(-) diff --git a/src/you_get/common.py b/src/you_get/common.py index 9c56b5c2..cb76e159 100755 --- a/src/you_get/common.py +++ b/src/you_get/common.py @@ -507,6 +507,71 @@ def post_content(url, headers={}, post_data={}, decoded=True, **kwargs): return data +def parallel_in_process(target, params_list, **kwargs): + import functools + from multiprocessing import Pool + + global lambda_to_function + def lambda_to_function(params): + return target(*params) + + return Pool(job).map(lambda_to_function, params_list) + + +def parallel_in_thread(target, params_list, sort, **kwargs): + import queue + from queue import Queue + from threading import Thread + + tasks = queue.Queue() + result = queue.Queue() + + for i in (params_list if not sort else enumerate(params_list)): + tasks.put(i) + + def action_sort(): + while not tasks.empty(): + idx, params = tasks.get(block=False) + result.put((idx, target(*params))) + + def action_not_sort(): + while not tasks.empty(): + params = tasks.get(block=False) + result.put(target(*params)) + + def f(): + try: + if sort: + action_sort() + else: + action_not_sort() + except queue.Empty: + return + + pool = [Thread(target=f) for i in range(job)] + + [i.start() for i in pool] + + ret = [result.get() for i in range(len(params_list))] + + if sort: + ret = sorted(ret) + return list(zip(*ret))[1] + else: + return ret + + +def parallel_run(target, params_list, sort_result=False, use_thread=True, **kwargs): + if job and job <= 1: + return [target(*i) for i in params_list] + + if use_thread: + res = parallel_in_thread(target, params_list, sort_result, **kwargs) + else: + res = parallel_in_process(target, params_list, **kwargs) + return res + + def url_size(url, faker=False, headers={}): if faker: response = urlopen_with_retry( @@ -522,7 +587,8 @@ def url_size(url, faker=False, headers={}): def urls_size(urls, faker=False, headers={}): - return sum([url_size(url, faker=faker, headers=headers) for url in urls]) + params = [[url, faker, headers] for url in urls] + return sum(parallel_run(url_size, params)) def get_head(url, headers=None, get_method='HEAD'): @@ -1550,6 +1616,10 @@ def script_main(download, download_playlist, **kwargs): '-k', '--insecure', action='store_true', default=False, help='ignore ssl errors' ) + download_grp.add_argument( + '-j', '--job', type=int, default=1, + help='max thread/process to use (default 1)' + ) proxy_grp = parser.add_argument_group('Proxy options') proxy_grp = proxy_grp.add_mutually_exclusive_group() @@ -1597,10 +1667,12 @@ def script_main(download, download_playlist, **kwargs): global output_filename global auto_rename global insecure + global job output_filename = args.output_filename extractor_proxy = args.extractor_proxy info_only = args.info + job = args.job if args.force: force = True if args.skip_existing_file_size_check: @@ -1669,6 +1741,7 @@ def script_main(download, download_playlist, **kwargs): output_dir=args.output_dir, merge=not args.no_merge, info_only=info_only, json_output=json_output, caption=caption, password=args.password, + job=args.job, **extra ) except KeyboardInterrupt: