Encoding handling fixes:

1. Introduce string.safe_chars, safe_print as ways to ensure that a
string is encodable using the specified encoding.  Unsafe characters
are replaced with '?'.  safe_print delegates to print and satisfies
the same interface, so it can be used as a drop-in override for print
in any file.

2. Move get_filename to fs, since that's where it belongs (fs-related
filename handling).  Move appending of ID, part number, and extension
(when applicable) to get_filename, to avoid accidental truncation.

3. Remove common.tr, since the print override supercedes it.

4. Refactor of log module to work with changes (use print with different
files instead of direct writes to stdout, stderr).

5. Modify other files to accommodate the changes (remove calls to tr)

6. Random cleanup I found:

a. Some changes to impl of download_urls, download_urls_chunked (is this
one even used?)).
b. sina_download_by_id?
c. ffmpeg_convert_ts_to_mkv tries to convert multiple input files onto
the same output file, overwriting its own output each time?
d. @staticmethod annotations (IDE sads otherwise).

7. Tests for the new encoding handling.
This commit is contained in:
henryptung 2015-01-10 22:13:09 -08:00
parent 1b55b01b04
commit 79fd1255cb
16 changed files with 152 additions and 130 deletions

View File

@ -11,7 +11,8 @@ from urllib import request, parse
from .version import __version__ from .version import __version__
from .util import log from .util import log
from .util.strings import get_filename, unescape_html from .util.strings import unescape_html, safe_print as print
from .util.fs import get_filename
dry_run = False dry_run = False
force = False force = False
@ -27,18 +28,6 @@ fake_headers = {
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:13.0) Gecko/20100101 Firefox/13.0' 'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:13.0) Gecko/20100101 Firefox/13.0'
} }
if sys.stdout.isatty():
default_encoding = sys.stdout.encoding.lower()
else:
default_encoding = locale.getpreferredencoding().lower()
def tr(s):
if default_encoding == 'utf-8':
return s
else:
return s
#return str(s.encode('utf-8'))[2:-1]
# DEPRECATED in favor of match1() # DEPRECATED in favor of match1()
def r1(pattern, text): def r1(pattern, text):
m = re.search(pattern, text) m = re.search(pattern, text)
@ -272,7 +261,7 @@ def url_save(url, filepath, bar, refer = None, is_part = False, faker = False):
if not is_part: if not is_part:
if bar: if bar:
bar.done() bar.done()
print('Skipping %s: file already exists' % tr(os.path.basename(filepath))) print('Skipping %s: file already exists' % os.path.basename(filepath))
else: else:
if bar: if bar:
bar.update_received(file_size) bar.update_received(file_size)
@ -281,7 +270,7 @@ def url_save(url, filepath, bar, refer = None, is_part = False, faker = False):
if not is_part: if not is_part:
if bar: if bar:
bar.done() bar.done()
print('Overwriting %s' % tr(os.path.basename(filepath)), '...') print('Overwriting %s' % os.path.basename(filepath), '...')
elif not os.path.exists(os.path.dirname(filepath)): elif not os.path.exists(os.path.dirname(filepath)):
os.mkdir(os.path.dirname(filepath)) os.mkdir(os.path.dirname(filepath))
@ -348,7 +337,7 @@ def url_save_chunked(url, filepath, bar, refer = None, is_part = False, faker =
if not is_part: if not is_part:
if bar: if bar:
bar.done() bar.done()
print('Skipping %s: file already exists' % tr(os.path.basename(filepath))) print('Skipping %s: file already exists' % os.path.basename(filepath))
else: else:
if bar: if bar:
bar.update_received(os.path.getsize(filepath)) bar.update_received(os.path.getsize(filepath))
@ -357,7 +346,7 @@ def url_save_chunked(url, filepath, bar, refer = None, is_part = False, faker =
if not is_part: if not is_part:
if bar: if bar:
bar.done() bar.done()
print('Overwriting %s' % tr(os.path.basename(filepath)), '...') print('Overwriting %s' % os.path.basename(filepath), '...')
elif not os.path.exists(os.path.dirname(filepath)): elif not os.path.exists(os.path.dirname(filepath)):
os.mkdir(os.path.dirname(filepath)) os.mkdir(os.path.dirname(filepath))
@ -490,13 +479,10 @@ def download_urls(urls, title, ext, total_size, output_dir='.', refer=None, merg
total_size = urls_size(urls) total_size = urls_size(urls)
except: except:
import traceback import traceback
import sys
traceback.print_exc(file = sys.stdout) traceback.print_exc(file = sys.stdout)
pass pass
title = tr(get_filename(title)) filename = get_filename(title, ext)
filename = '%s.%s' % (title, ext)
filepath = os.path.join(output_dir, filename) filepath = os.path.join(output_dir, filename)
if total_size: if total_size:
if not force and os.path.exists(filepath) and os.path.getsize(filepath) >= total_size * 0.9: if not force and os.path.exists(filepath) and os.path.getsize(filepath) >= total_size * 0.9:
@ -507,35 +493,32 @@ def download_urls(urls, title, ext, total_size, output_dir='.', refer=None, merg
else: else:
bar = PiecesProgressBar(total_size, len(urls)) bar = PiecesProgressBar(total_size, len(urls))
print('Downloading %s ...' % filename)
if len(urls) == 1: if len(urls) == 1:
url = urls[0] url = urls[0]
print('Downloading %s ...' % tr(filename))
url_save(url, filepath, bar, refer = refer, faker = faker) url_save(url, filepath, bar, refer = refer, faker = faker)
bar.done() bar.done()
else: else:
parts = [] parts = []
print('Downloading %s.%s ...' % (tr(title), ext))
for i, url in enumerate(urls): for i, url in enumerate(urls):
filename = '%s[%02d].%s' % (title, i, ext) part_filepath = os.path.join(output_dir, get_filename(title, ext, part=i))
filepath = os.path.join(output_dir, filename) parts.append(part_filepath)
parts.append(filepath) #print('Downloading %s [%s/%s]...' % (filename, i + 1, len(urls)))
#print 'Downloading %s [%s/%s]...' % (tr(filename), i + 1, len(urls))
bar.update_piece(i + 1) bar.update_piece(i + 1)
url_save(url, filepath, bar, refer = refer, is_part = True, faker = faker) url_save(url, part_filepath, bar, refer = refer, is_part = True, faker = faker)
bar.done() bar.done()
from .processor import ffmpeg
if not merge: if not merge:
print() print()
return return
if ext in ['flv', 'f4v']: if ext in ['flv', 'f4v']:
try: try:
from .processor.ffmpeg import has_ffmpeg_installed if ffmpeg.has_ffmpeg_installed():
if has_ffmpeg_installed(): ffmpeg.ffmpeg_concat_flv_to_mp4(parts, filepath)
from .processor.ffmpeg import ffmpeg_concat_flv_to_mp4
ffmpeg_concat_flv_to_mp4(parts, os.path.join(output_dir, title + '.mp4'))
else: else:
from .processor.join_flv import concat_flv from .processor.join_flv import concat_flv
concat_flv(parts, os.path.join(output_dir, title + '.flv')) concat_flv(parts, filepath)
except: except:
raise raise
else: else:
@ -544,13 +527,11 @@ def download_urls(urls, title, ext, total_size, output_dir='.', refer=None, merg
elif ext == 'mp4': elif ext == 'mp4':
try: try:
from .processor.ffmpeg import has_ffmpeg_installed if ffmpeg.has_ffmpeg_installed():
if has_ffmpeg_installed(): ffmpeg.ffmpeg_concat_mp4_to_mp4(parts, filepath)
from .processor.ffmpeg import ffmpeg_concat_mp4_to_mp4
ffmpeg_concat_mp4_to_mp4(parts, os.path.join(output_dir, title + '.mp4'))
else: else:
from .processor.join_mp4 import concat_mp4 from .processor.join_mp4 import concat_mp4
concat_mp4(parts, os.path.join(output_dir, title + '.mp4')) concat_mp4(parts, filepath)
except: except:
raise raise
else: else:
@ -574,68 +555,59 @@ def download_urls_chunked(urls, title, ext, total_size, output_dir='.', refer=No
assert ext in ('ts') assert ext in ('ts')
title = tr(get_filename(title)) filename = get_filename(title, '.mkv')
filename = '%s.%s' % (title, 'ts')
filepath = os.path.join(output_dir, filename) filepath = os.path.join(output_dir, filename)
if total_size: if total_size:
if not force and os.path.exists(filepath[:-3] + '.mkv'): if not force and os.path.exists(filepath):
print('Skipping %s: file already exists' % filepath[:-3] + '.mkv') print('Skipping %s: file already exists' % filepath)
print() print()
return return
bar = SimpleProgressBar(total_size, len(urls)) bar = SimpleProgressBar(total_size, len(urls))
else: else:
bar = PiecesProgressBar(total_size, len(urls)) bar = PiecesProgressBar(total_size, len(urls))
print('Downloading %s ...' % filename)
if len(urls) == 1: if len(urls) == 1:
parts = [] temp_filepath = os.path.join(output_dir, get_filename(title, ext))
url = urls[0] url = urls[0]
print('Downloading %s ...' % tr(filename)) url_save_chunked(url, temp_filepath, bar, refer = refer, faker = faker)
filepath = os.path.join(output_dir, filename)
parts.append(filepath)
url_save_chunked(url, filepath, bar, refer = refer, faker = faker)
bar.done() bar.done()
from .processor import ffmpeg
if not merge: if not merge:
print() print()
return return
if ext == 'ts': if ext == 'ts':
from .processor.ffmpeg import has_ffmpeg_installed if ffmpeg.has_ffmpeg_installed():
if has_ffmpeg_installed(): if ffmpeg.ffmpeg_convert_ts_to_mkv(temp_filepath, filepath):
from .processor.ffmpeg import ffmpeg_convert_ts_to_mkv os.remove(temp_filepath)
if ffmpeg_convert_ts_to_mkv(parts, os.path.join(output_dir, title + '.mkv')):
for part in parts:
os.remove(part)
else: else:
os.remove(os.path.join(output_dir, title + '.mkv')) os.remove(filepath)
else: else:
print('No ffmpeg is found. Conversion aborted.') print('No ffmpeg is found. Conversion aborted.')
else: else:
print("Can't convert %s files" % ext) print("Can't convert %s files" % ext)
else: else:
parts = [] parts = []
print('Downloading %s.%s ...' % (tr(title), ext))
for i, url in enumerate(urls): for i, url in enumerate(urls):
filename = '%s[%02d].%s' % (title, i, ext) part_filepath = os.path.join(output_dir, get_filename(title, ext, part=i))
filepath = os.path.join(output_dir, filename) parts.append(part_filepath)
parts.append(filepath) #print('Downloading %s [%s/%s]...' % (filename, i + 1, len(urls)))
#print 'Downloading %s [%s/%s]...' % (tr(filename), i + 1, len(urls))
bar.update_piece(i + 1) bar.update_piece(i + 1)
url_save_chunked(url, filepath, bar, refer = refer, is_part = True, faker = faker) url_save_chunked(url, part_filepath, bar, refer = refer, is_part = True, faker = faker)
bar.done() bar.done()
from .processor import ffmpeg
if not merge: if not merge:
print() print()
return return
if ext == 'ts': if ext == 'ts':
from .processor.ffmpeg import has_ffmpeg_installed if ffmpeg.has_ffmpeg_installed():
if has_ffmpeg_installed(): if ffmpeg.ffmpeg_concat_ts_to_mkv(parts, filepath):
from .processor.ffmpeg import ffmpeg_concat_ts_to_mkv
if ffmpeg_concat_ts_to_mkv(parts, os.path.join(output_dir, title + '.mkv')):
for part in parts: for part in parts:
os.remove(part) os.remove(part)
else: else:
os.remove(os.path.join(output_dir, title + '.mkv')) os.remove(filepath)
else: else:
print('No ffmpeg is found. Merging aborted.') print('No ffmpeg is found. Merging aborted.')
else: else:
@ -717,7 +689,7 @@ def print_info(site_info, title, type, size):
type_info = "Unknown type (%s)" % type type_info = "Unknown type (%s)" % type
print("Video Site:", site_info) print("Video Site:", site_info)
print("Title: ", unescape_html(tr(title))) print("Title: ", unescape_html(title))
print("Type: ", type_info) print("Type: ", type_info)
print("Size: ", round(size / 1048576, 2), "MiB (" + str(size) + " Bytes)") print("Size: ", round(size / 1048576, 2), "MiB (" + str(size) + " Bytes)")
print() print()

View File

@ -2,6 +2,7 @@
from .common import match1, download_urls, parse_host, set_proxy, unset_proxy from .common import match1, download_urls, parse_host, set_proxy, unset_proxy
from .util import log from .util import log
from .util.strings import safe_print as print
class Extractor(): class Extractor():
def __init__(self, *args): def __init__(self, *args):

View File

@ -40,11 +40,11 @@ def acfun_download_by_vid(vid, title=None, output_dir='.', merge=True, info_only
raise NotImplementedError(sourceType) raise NotImplementedError(sourceType)
if not info_only: if not info_only:
title = get_filename(title) filename = get_filename(title, '.cmt.json', id=vid)
try: try:
print('Downloading %s ...\n' % (title + '.cmt.json')) print('Downloading %s ...\n' % filename)
cmt = get_srt_json(vid) cmt = get_srt_json(vid)
with open(os.path.join(output_dir, title + '.cmt.json'), 'w') as x: with open(os.path.join(output_dir, filename), 'w') as x:
x.write(cmt) x.write(cmt)
# print('Downloading %s ...\n' % (title + '.cmt_lock.json')) # print('Downloading %s ...\n' % (title + '.cmt_lock.json'))
# cmt = get_srt_lock_json(danmakuId) # cmt = get_srt_lock_json(danmakuId)

View File

@ -150,7 +150,7 @@ def bilibili_download(url, output_dir='.', merge=True, info_only=False):
bilibili_download_by_cids(cids, title, output_dir=output_dir, merge=merge, info_only=info_only) bilibili_download_by_cids(cids, title, output_dir=output_dir, merge=merge, info_only=info_only)
elif t == 'vid': elif t == 'vid':
sina_download_by_id(id, title, output_dir = output_dir, merge = merge, info_only = info_only) sina_download_by_vid(id, title, output_dir = output_dir, merge = merge, info_only = info_only)
elif t == 'ykid': elif t == 'ykid':
youku_download_by_vid(id, title=title, output_dir = output_dir, merge = merge, info_only = info_only) youku_download_by_vid(id, title=title, output_dir = output_dir, merge = merge, info_only = info_only)
elif t == 'uid': elif t == 'uid':
@ -159,10 +159,10 @@ def bilibili_download(url, output_dir='.', merge=True, info_only=False):
raise NotImplementedError(flashvars) raise NotImplementedError(flashvars)
if not info_only: if not info_only:
title = get_filename(title) filename = get_filename(title, '.cmt.xml', id=id)
print('Downloading %s ...\n' % (title + '.cmt.xml')) print('Downloading %s ...\n' % filename)
xml = get_srt_xml(id) xml = get_srt_xml(id)
with open(os.path.join(output_dir, title + '.cmt.xml'), 'w', encoding='utf-8') as x: with open(os.path.join(output_dir, filename), 'w', encoding='utf-8') as x:
x.write(xml) x.write(xml)
site_info = "bilibili.com" site_info = "bilibili.com"

View File

@ -107,12 +107,12 @@ def download_url_chunked(url, title, ext, size, output_dir = '.', refer = None,
filepath = os.path.join(output_dir, filename) filepath = os.path.join(output_dir, filename)
if not force and os.path.exists(filepath): if not force and os.path.exists(filepath):
print('Skipping %s: file already exists' % tr(filepath)) print('Skipping %s: file already exists' % filepath)
print() print()
return return
bar = DummyProgressBar() bar = DummyProgressBar()
print('Downloading %s ...' % tr(filename)) print('Downloading %s ...' % filename)
url_save_chunked(url, filepath, bar, refer = refer, faker = faker) url_save_chunked(url, filepath, bar, refer = refer, faker = faker)
bar.done() bar.done()

View File

@ -14,12 +14,12 @@ def parse_size(size):
else: else:
return 0 return 0
def dongting_download_lyric(lrc_url, file_name, output_dir): def dongting_download_lyric(lrc_url, basename, sid, output_dir):
j = get_html(lrc_url) j = get_html(lrc_url)
info = json.loads(j) info = json.loads(j)
lrc = j['data']['lrc'] lrc = info['data']['lrc']
filename = get_filename(file_name) filename = get_filename(basename, '.lrc', id=sid)
with open(output_dir + "/" + filename + '.lrc', 'w', encoding='utf-8') as x: with open(os.path.join(output_dir, filename), 'w', encoding='utf-8') as x:
x.write(lrc) x.write(lrc)
def dongting_download_song(sid, output_dir = '.', merge = True, info_only = False): def dongting_download_song(sid, output_dir = '.', merge = True, info_only = False):
@ -35,13 +35,13 @@ def dongting_download_song(sid, output_dir = '.', merge = True, info_only = Fals
print_info(site_info, song_title, ext, size) print_info(site_info, song_title, ext, size)
if not info_only: if not info_only:
file_name = "%s - %s - %s" % (song_title, album_name, artist) basename = "%s - %s - %s" % (song_title, album_name, artist)
download_urls([url], file_name, ext, size, output_dir, merge = merge) download_urls([url], basename, ext, size, output_dir, merge = merge)
lrc_url = ('http://lp.music.ttpod.com/lrc/down?' lrc_url = ('http://lp.music.ttpod.com/lrc/down?'
'lrcid=&artist=%s&title=%s') % ( 'lrcid=&artist=%s&title=%s') % (
parse.quote(artist), parse.quote(song_title)) parse.quote(artist), parse.quote(song_title))
try: try:
dongting_download_lyric(lrc_url, file_name, output_dir) dongting_download_lyric(lrc_url, basename, output_dir)
except: except:
pass pass

View File

@ -27,11 +27,11 @@ def location_dec(str):
out += char out += char
return parse.unquote(out).replace("^", "0") return parse.unquote(out).replace("^", "0")
def xiami_download_lyric(lrc_url, file_name, output_dir): def xiami_download_lyric(lrc_url, basename, sid, output_dir):
lrc = get_html(lrc_url, faker = True) lrc = get_html(lrc_url, faker = True)
filename = get_filename(file_name) filename = get_filename(basename, '.lrc', id=sid)
if len(lrc) > 0: if len(lrc) > 0:
with open(output_dir + "/" + filename + '.lrc', 'w', encoding='utf-8') as x: with open(os.path.join(output_dir, filename), 'w', encoding='utf-8') as x:
x.write(lrc) x.write(lrc)
def xiami_download_pic(pic_url, file_name, output_dir): def xiami_download_pic(pic_url, file_name, output_dir):
@ -61,10 +61,10 @@ def xiami_download_song(sid, output_dir = '.', merge = True, info_only = False):
print_info(site_info, song_title, ext, size) print_info(site_info, song_title, ext, size)
if not info_only: if not info_only:
file_name = "%s - %s - %s" % (song_title, album_name, artist) basename = "%s - %s - %s" % (song_title, album_name, artist)
download_urls([url], file_name, ext, size, output_dir, merge = merge, faker = True) download_urls([url], basename, ext, size, output_dir, merge = merge, faker = True)
try: try:
xiami_download_lyric(lrc_url, file_name, output_dir) xiami_download_lyric(lrc_url, basename, output_dir)
except: except:
pass pass

View File

@ -19,6 +19,7 @@ class Youku(VideoExtractor):
{'id': '3gphd', 'container': '3gp', 'video_profile': '高清3GP'}, {'id': '3gphd', 'container': '3gp', 'video_profile': '高清3GP'},
] ]
@staticmethod
def generate_ep(vid, ep): def generate_ep(vid, ep):
f_code_1 = 'becaf9be' f_code_1 = 'becaf9be'
f_code_2 = 'bf7e5f01' f_code_2 = 'bf7e5f01'
@ -49,9 +50,11 @@ class Youku(VideoExtractor):
new_ep = trans_e(f_code_2, '%s_%s_%s' % (sid, vid, token)) new_ep = trans_e(f_code_2, '%s_%s_%s' % (sid, vid, token))
return base64.b64encode(bytes(new_ep, 'latin')), sid, token return base64.b64encode(bytes(new_ep, 'latin')), sid, token
@staticmethod
def parse_m3u8(m3u8): def parse_m3u8(m3u8):
return re.findall(r'(http://[^?]+)\?ts_start=0', m3u8) return re.findall(r'(http://[^?]+)\?ts_start=0', m3u8)
@staticmethod
def get_vid_from_url(url): def get_vid_from_url(url):
"""Extracts video ID from URL. """Extracts video ID from URL.
""" """
@ -59,6 +62,7 @@ class Youku(VideoExtractor):
match1(url, r'player\.youku\.com/player\.php/sid/([a-zA-Z0-9=]+)/v\.swf') or \ match1(url, r'player\.youku\.com/player\.php/sid/([a-zA-Z0-9=]+)/v\.swf') or \
match1(url, r'loader\.swf\?VideoIDS=([a-zA-Z0-9=]+)') match1(url, r'loader\.swf\?VideoIDS=([a-zA-Z0-9=]+)')
@staticmethod
def get_playlist_id_from_url(url): def get_playlist_id_from_url(url):
"""Extracts playlist ID from URL. """Extracts playlist ID from URL.
""" """

View File

@ -23,13 +23,12 @@ FFMPEG, FFMPEG_VERSION = get_usable_ffmpeg('ffmpeg') or get_usable_ffmpeg('avcon
def has_ffmpeg_installed(): def has_ffmpeg_installed():
return FFMPEG is not None return FFMPEG is not None
def ffmpeg_convert_ts_to_mkv(files, output='output.mkv'): def ffmpeg_convert_ts_to_mkv(file, output='output.mkv'):
for file in files: if os.path.isfile(file):
if os.path.isfile(file): params = [FFMPEG, '-y', '-i']
params = [FFMPEG, '-y', '-i'] params.append(file)
params.append(file) params.append(output)
params.append(output) subprocess.call(params)
subprocess.call(params)
return return

View File

@ -2,6 +2,7 @@
import os.path import os.path
import subprocess import subprocess
from ..util.strings import safe_print as print
def get_usable_rtmpdump(cmd): def get_usable_rtmpdump(cmd):
try: try:

View File

@ -1,6 +1,8 @@
#!/usr/bin/env python #!/usr/bin/env python
import platform import platform
import sys
from .strings import safe_chars
def legitimize(text, os=platform.system()): def legitimize(text, os=platform.system()):
"""Converts a string to a valid filename. """Converts a string to a valid filename.
@ -41,5 +43,13 @@ def legitimize(text, os=platform.system()):
if text.startswith("."): if text.startswith("."):
text = text[1:] text = text[1:]
text = text[:82] # Trim to 82 Unicode characters long
return text return text
def get_filename(basename, ext, id=None, part=None, encoding=sys.getfilesystemencoding(), **kwargs):
safe_basename = safe_chars(basename, encoding=encoding)
if safe_basename != basename and id is not None:
safe_basename = safe_chars('%s - %s' % (basename, id), encoding=encoding)
safe_basename = safe_basename[:82] # Trim to 82 Unicode characters long
if part is not None:
safe_basename = '%s[%02d]' % (safe_basename, part)
return legitimize('%s.%s' % (safe_basename, ext), **kwargs)

View File

@ -2,6 +2,7 @@
# This file is Python 2 compliant. # This file is Python 2 compliant.
from .. import __name__ as library_name from .. import __name__ as library_name
from .strings import safe_print as print
import os, sys import os, sys
@ -62,15 +63,15 @@ def sprint(text, *colors):
def println(text, *colors): def println(text, *colors):
"""Print text to standard output.""" """Print text to standard output."""
sys.stdout.write(sprint(text, *colors) + "\n") print(sprint(text, *colors), file=sys.stdout)
def print_err(text, *colors): def print_err(text, *colors):
"""Print text to standard error.""" """Print text to standard error."""
sys.stderr.write(sprint(text, *colors) + "\n") print(sprint(text, *colors), file=sys.stderr)
def print_log(text, *colors): def print_log(text, *colors):
"""Print a log message to standard error.""" """Print a log message to standard error."""
sys.stderr.write(sprint("{}: {}".format(library_name, text), *colors) + "\n") print_err("{}: {}".format(library_name, text), *colors)
def i(message): def i(message):
"""Print a normal log message.""" """Print a normal log message."""

View File

@ -1,25 +1,29 @@
try: try:
# py 3.4 # py 3.4
from html import unescape as unescape_html from html import unescape as unescape_html
except ImportError: except ImportError:
import re import re
from html.entities import entitydefs from html.entities import entitydefs
def unescape_html(string): def unescape_html(string):
'''HTML entity decode''' '''HTML entity decode'''
string = re.sub(r'&#[^;]+;', _sharp2uni, string) string = re.sub(r'&#[^;]+;', _sharp2uni, string)
string = re.sub(r'&[^;]+;', lambda m: entitydefs[m.group(0)[1:-1]], string) string = re.sub(r'&[^;]+;', lambda m: entitydefs[m.group(0)[1:-1]], string)
return string return string
def _sharp2uni(m): def _sharp2uni(m):
'''&#...; ==> unicode''' '''&#...; ==> unicode'''
s = m.group(0)[2:].rstrip(';') s = m.group(0)[2:].rstrip(';')
if s.startswith('x'): if s.startswith('x'):
return chr(int('0'+s, 16)) return chr(int('0'+s, 16))
else: else:
return chr(int(s)) return chr(int(s))
from .fs import legitimize import sys
def get_filename(htmlstring): def safe_chars(s, encoding=sys.getdefaultencoding()):
return legitimize(unescape_html(htmlstring)) return s.encode(encoding, 'replace').decode(encoding)
def safe_print(*objects, file=sys.stdout, **kwargs):
safe_strs = [safe_chars(str(obj), encoding=file.encoding) for obj in objects]
print(*safe_strs, file=file, **kwargs)

View File

@ -1,11 +0,0 @@
#!/usr/bin/env python
import unittest
from you_get.util.fs import *
class TestUtil(unittest.TestCase):
def test_legitimize(self):
self.assertEqual(legitimize("1*2", os="Linux"), "1*2")
self.assertEqual(legitimize("1*2", os="Darwin"), "1*2")
self.assertEqual(legitimize("1*2", os="Windows"), "1-2")

28
tests/util/test_fs.py Normal file
View File

@ -0,0 +1,28 @@
#!/usr/bin/env python
import unittest
from you_get.util.fs import *
class TestFs(unittest.TestCase):
def test_legitimize(self):
self.assertEqual(legitimize("1*2", os="Linux"), "1*2")
self.assertEqual(legitimize("1*2", os="Darwin"), "1*2")
self.assertEqual(legitimize("1*2", os="Windows"), "1-2")
def test_get_filename_simple(self):
self.assertEqual('name.ext', get_filename('name', 'ext', os='Linux', encoding='utf-8'))
def test_get_filename_parts(self):
self.assertEqual('name[02].ext', get_filename('name', 'ext', part=2, os='Linux', encoding='utf-8'))
self.assertEqual('name(02).ext', get_filename('name', 'ext', part=2, os='Windows', encoding='utf-8'))
def test_get_filename_encoding_error(self):
self.assertEqual('name\u20AC.ext', get_filename('name\u20AC', 'ext', os='Linux', encoding='utf-8'))
self.assertEqual('name\u20AC.ext', get_filename('name\u20AC', 'ext', os='Windows', encoding='utf-8'))
self.assertEqual('name?.ext', get_filename('name\u20AC', 'ext', os='Linux', encoding='ascii'))
self.assertEqual('name-.ext', get_filename('name\u20AC', 'ext', os='Windows', encoding='ascii'))
def test_get_filename_id(self):
self.assertEqual('name\u20AC.ext', get_filename('name\u20AC', 'ext', os='Linux', id='hi', encoding='utf-8'))
self.assertEqual('name? - hi.ext', get_filename('name\u20AC', 'ext', os='Linux', id='hi', encoding='ascii'))

View File

@ -0,0 +1,13 @@
#!/usr/bin/env python
import unittest
from you_get.util.strings import *
class TestStrings(unittest.TestCase):
def test_safe_chars_simple(self):
self.assertEqual('', safe_chars('', encoding='utf-8'))
self.assertEqual('abc', safe_chars('abc', encoding='utf-8'))
def test_safe_chars_replace(self):
self.assertEqual('a?c', safe_chars('a\u20ACc', encoding='ascii'))