[youtube] remove dead code

This commit is contained in:
Mort Yao 2024-06-23 14:43:43 +02:00
parent 567d1059fc
commit b0e6f0cadc
No known key found for this signature in database
GPG Key ID: 07DA00CB78203251

View File

@ -182,204 +182,54 @@ class YouTube(VideoExtractor):
if re.search('\Wlist=', self.url) and not kwargs.get('playlist'):
log.w('This video is from a playlist. (use --playlist to download all videos in the playlist.)')
# Get video info
# 'eurl' is a magic parameter that can bypass age restriction
# full form: 'eurl=https%3A%2F%2Fyoutube.googleapis.com%2Fv%2F{VIDEO_ID}'
#video_info = parse.parse_qs(get_content('https://www.youtube.com/get_video_info?video_id={}&eurl=https%3A%2F%2Fy'.format(self.vid)))
#logging.debug('STATUS: %s' % video_info['status'][0])
video_info = {'status': ['ok'], 'use_cipher_signature': 'True'}
# Extract from video page
logging.debug('Extracting from the video page...')
video_page = get_content('https://www.youtube.com/watch?v=%s' % self.vid)
ytplayer_config = None
if 'status' not in video_info:
log.wtf('[Failed] Unknown status.', exit_code=None)
raise
elif video_info['status'] == ['ok']:
if 'use_cipher_signature' not in video_info or video_info['use_cipher_signature'] == ['False']:
# FIXME: this is basically dead code, use_cipher_signature is always true
self.title = parse.unquote_plus(json.loads(video_info["player_response"][0])["videoDetails"]["title"])
# Parse video page (for DASH)
video_page = get_content('https://www.youtube.com/watch?v=%s' % self.vid)
try:
try:
# Complete ytplayer_config
ytplayer_config = json.loads(re.search('ytplayer.config\s*=\s*([^\n]+?});', video_page).group(1))
try:
jsUrl = re.search('([^"]*/base\.js)"', video_page).group(1)
except:
log.wtf('[Failed] Unable to find base.js on the video page')
# FIXME: do we still need this?
jsUrl = jsUrl.replace('\/', '/') # unescape URL (for age-restricted videos)
self.html5player = 'https://www.youtube.com' + jsUrl
logging.debug('Retrieving the player code...')
self.js = get_content(self.html5player).replace('\n', ' ')
# Workaround: get_video_info returns bad s. Why?
if 'url_encoded_fmt_stream_map' not in ytplayer_config['args']:
stream_list = json.loads(ytplayer_config['args']['player_response'])['streamingData']['formats']
else:
stream_list = ytplayer_config['args']['url_encoded_fmt_stream_map'].split(',')
#stream_list = ytplayer_config['args']['adaptive_fmts'].split(',')
logging.debug('Loading ytInitialPlayerResponse...')
ytInitialPlayerResponse = json.loads(re.search('ytInitialPlayerResponse\s*=\s*([^\n]+?});(\n|</script>)', video_page).group(1))
if 'assets' in ytplayer_config:
self.html5player = 'https://www.youtube.com' + ytplayer_config['assets']['js']
elif re.search('([^"]*/base\.js)"', video_page):
self.html5player = 'https://www.youtube.com' + re.search('([^"]*/base\.js)"', video_page).group(1)
self.html5player = self.html5player.replace('\/', '/') # unescape URL
else:
self.html5player = None
# Get the video title
self.title = ytInitialPlayerResponse["videoDetails"]["title"]
except:
# ytplayer_config = {args:{raw_player_response:ytInitialPlayerResponse}}
try: # FIXME: we should extract ytInitialPlayerResponse more reliably
ytInitialPlayerResponse = json.loads(re.search('ytInitialPlayerResponse\s*=\s*([^\n]+?});</script>', video_page).group(1))
except:
ytInitialPlayerResponse = json.loads(re.search('ytInitialPlayerResponse\s*=\s*([^\n]+?});', video_page).group(1))
stream_list = ytInitialPlayerResponse['streamingData']['formats']
#stream_list = ytInitialPlayerResponse['streamingData']['adaptiveFormats']
if re.search('([^"]*/base\.js)"', video_page):
self.html5player = 'https://www.youtube.com' + re.search('([^"]*/base\.js)"', video_page).group(1)
else:
self.html5player = None
except:
if 'url_encoded_fmt_stream_map' not in video_info:
stream_list = json.loads(video_info['player_response'][0])['streamingData']['formats']
else:
stream_list = video_info['url_encoded_fmt_stream_map'][0].split(',')
if re.search('([^"]*/base\.js)"', video_page):
self.html5player = 'https://www.youtube.com' + re.search('([^"]*/base\.js)"', video_page).group(1)
else:
self.html5player = None
else:
# Extract from video page
logging.debug('Extracting from the video page...')
video_page = get_content('https://www.youtube.com/watch?v=%s' % self.vid)
try:
jsUrl = re.search('([^"]*/base\.js)"', video_page).group(1)
except:
log.wtf('[Failed] Unable to find base.js on the video page')
# FIXME: do we still need this?
jsUrl = jsUrl.replace('\/', '/') # unescape URL (for age-restricted videos)
self.html5player = 'https://www.youtube.com' + jsUrl
logging.debug('Retrieving the player code...')
self.js = get_content(self.html5player).replace('\n', ' ')
logging.debug('Loading ytInitialPlayerResponse...')
ytInitialPlayerResponse = json.loads(re.search('ytInitialPlayerResponse\s*=\s*([^\n]+?});(\n|</script>)', video_page).group(1))
# Get the video title
self.title = ytInitialPlayerResponse["videoDetails"]["title"]
stream_list = ytInitialPlayerResponse['streamingData']['formats']
elif video_info['status'] == ['fail']:
# FIXME: this is basically dead code, status is always ok
logging.debug('ERRORCODE: %s' % video_info['errorcode'][0])
if video_info['errorcode'] == ['150']:
# FIXME: still relevant?
if cookies:
# Load necessary cookies into headers (for age-restricted videos)
consent, ssid, hsid, sid = 'YES', '', '', ''
for cookie in cookies:
if cookie.domain.endswith('.youtube.com'):
if cookie.name == 'SSID':
ssid = cookie.value
elif cookie.name == 'HSID':
hsid = cookie.value
elif cookie.name == 'SID':
sid = cookie.value
cookie_str = 'CONSENT=%s; SSID=%s; HSID=%s; SID=%s' % (consent, ssid, hsid, sid)
video_page = get_content('https://www.youtube.com/watch?v=%s' % self.vid,
headers={'Cookie': cookie_str})
else:
video_page = get_content('https://www.youtube.com/watch?v=%s' % self.vid)
try:
ytplayer_config = json.loads(re.search('ytplayer.config\s*=\s*([^\n]+});ytplayer', video_page).group(1))
except:
msg = re.search('class="message">([^<]+)<', video_page).group(1)
log.wtf('[Failed] Got message "%s". Try to login with --cookies.' % msg.strip())
if 'title' in ytplayer_config['args']:
# 150 Restricted from playback on certain sites
# Parse video page instead
self.title = ytplayer_config['args']['title']
self.html5player = 'https://www.youtube.com' + ytplayer_config['assets']['js']
stream_list = ytplayer_config['args']['url_encoded_fmt_stream_map'].split(',')
else:
log.wtf('[Error] The uploader has not made this video available in your country.', exit_code=None)
raise
#self.title = re.search('<meta name="title" content="([^"]+)"', video_page).group(1)
#stream_list = []
elif video_info['errorcode'] == ['100']:
log.wtf('[Failed] This video does not exist.', exit_code=None) #int(video_info['errorcode'][0])
raise
else:
log.wtf('[Failed] %s' % video_info['reason'][0], exit_code=None) #int(video_info['errorcode'][0])
raise
else:
log.wtf('[Failed] Invalid status.', exit_code=None)
raise
# FIXME: YouTube Live
if ytplayer_config and (ytplayer_config['args'].get('livestream') == '1' or ytplayer_config['args'].get('live_playback') == '1'):
if 'hlsvp' in ytplayer_config['args']:
hlsvp = ytplayer_config['args']['hlsvp']
else:
player_response= json.loads(ytplayer_config['args']['player_response'])
log.e('[Failed] %s' % player_response['playabilityStatus']['reason'], exit_code=1)
if 'info_only' in kwargs and kwargs['info_only']:
return
else:
download_url_ffmpeg(hlsvp, self.title, 'mp4')
exit(0)
stream_list = ytInitialPlayerResponse['streamingData']['formats']
for stream in stream_list:
if isinstance(stream, str):
# FIXME: dead code?
metadata = parse.parse_qs(stream)
stream_itag = metadata['itag'][0]
self.streams[stream_itag] = {
'itag': metadata['itag'][0],
'url': metadata['url'][0],
'sig': metadata['sig'][0] if 'sig' in metadata else None,
's': metadata['s'][0] if 's' in metadata else None,
'quality': metadata['quality'][0] if 'quality' in metadata else None,
#'quality': metadata['quality_label'][0] if 'quality_label' in metadata else None,
'type': metadata['type'][0],
'mime': metadata['type'][0].split(';')[0],
'container': mime_to_container(metadata['type'][0].split(';')[0]),
}
if 'signatureCipher' in stream:
logging.debug('Parsing signatureCipher for itag=%s...' % stream['itag'])
qs = parse_qs(stream['signatureCipher'])
#logging.debug(qs)
sp = qs['sp'][0]
sig = self.__class__.s_to_sig(self.js, qs['s'][0])
url = qs['url'][0] + '&{}={}'.format(sp, sig)
elif 'url' in stream:
url = stream['url']
else:
if 'signatureCipher' in stream:
logging.debug('Parsing signatureCipher for itag=%s...' % stream['itag'])
qs = parse_qs(stream['signatureCipher'])
#logging.debug(qs)
sp = qs['sp'][0]
sig = self.__class__.s_to_sig(self.js, qs['s'][0])
url = qs['url'][0] + '&{}={}'.format(sp, sig)
elif 'url' in stream:
url = stream['url']
else:
log.wtf('No signatureCipher or url for itag=%s' % stream['itag'])
url = self.__class__.dethrottle(self.js, url)
log.wtf('No signatureCipher or url for itag=%s' % stream['itag'])
url = self.__class__.dethrottle(self.js, url)
self.streams[str(stream['itag'])] = {
'itag': str(stream['itag']),
'url': url,
'quality': stream['quality'],
'type': stream['mimeType'],
'mime': stream['mimeType'].split(';')[0],
'container': mime_to_container(stream['mimeType'].split(';')[0]),
}
self.streams[str(stream['itag'])] = {
'itag': str(stream['itag']),
'url': url,
'quality': stream['quality'],
'type': stream['mimeType'],
'mime': stream['mimeType'].split(';')[0],
'container': mime_to_container(stream['mimeType'].split(';')[0]),
}
# FIXME: Prepare caption tracks
try:
try:
caption_tracks = json.loads(ytplayer_config['args']['player_response'])['captions']['playerCaptionsTracklistRenderer']['captionTracks']
except:
caption_tracks = ytInitialPlayerResponse['captions']['playerCaptionsTracklistRenderer']['captionTracks']
caption_tracks = ytInitialPlayerResponse['captions']['playerCaptionsTracklistRenderer']['captionTracks']
for ct in caption_tracks:
ttsurl, lang = ct['baseUrl'], ct['languageCode']
@ -408,138 +258,49 @@ class YouTube(VideoExtractor):
self.caption_tracks[lang] = srt
except: pass
# Prepare DASH streams (NOTE: not every video has DASH streams!)
try:
# Prepare DASH streams
if 'adaptiveFormats' in ytInitialPlayerResponse['streamingData']:
streams = ytInitialPlayerResponse['streamingData']['adaptiveFormats']
# FIXME: dead code?
dashmpd = ytplayer_config['args']['dashmpd']
dash_xml = parseString(get_content(dashmpd))
for aset in dash_xml.getElementsByTagName('AdaptationSet'):
mimeType = aset.getAttribute('mimeType')
if mimeType == 'audio/mp4':
rep = aset.getElementsByTagName('Representation')[-1]
burls = rep.getElementsByTagName('BaseURL')
dash_mp4_a_url = burls[0].firstChild.nodeValue
dash_mp4_a_size = burls[0].getAttribute('yt:contentLength')
if not dash_mp4_a_size:
try: dash_mp4_a_size = url_size(dash_mp4_a_url)
except: continue
elif mimeType == 'audio/webm':
rep = aset.getElementsByTagName('Representation')[-1]
burls = rep.getElementsByTagName('BaseURL')
dash_webm_a_url = burls[0].firstChild.nodeValue
dash_webm_a_size = burls[0].getAttribute('yt:contentLength')
if not dash_webm_a_size:
try: dash_webm_a_size = url_size(dash_webm_a_url)
except: continue
elif mimeType == 'video/mp4':
for rep in aset.getElementsByTagName('Representation'):
w = int(rep.getAttribute('width'))
h = int(rep.getAttribute('height'))
itag = rep.getAttribute('id')
burls = rep.getElementsByTagName('BaseURL')
dash_url = burls[0].firstChild.nodeValue
dash_size = burls[0].getAttribute('yt:contentLength')
if not dash_size:
try: dash_size = url_size(dash_url)
except: continue
dash_urls = self.__class__.chunk_by_range(dash_url, int(dash_size))
dash_mp4_a_urls = self.__class__.chunk_by_range(dash_mp4_a_url, int(dash_mp4_a_size))
self.dash_streams[itag] = {
'quality': '%sx%s' % (w, h),
'itag': itag,
'type': mimeType,
'mime': mimeType,
'container': 'mp4',
'src': [dash_urls, dash_mp4_a_urls],
'size': int(dash_size) + int(dash_mp4_a_size)
}
elif mimeType == 'video/webm':
for rep in aset.getElementsByTagName('Representation'):
w = int(rep.getAttribute('width'))
h = int(rep.getAttribute('height'))
itag = rep.getAttribute('id')
burls = rep.getElementsByTagName('BaseURL')
dash_url = burls[0].firstChild.nodeValue
dash_size = burls[0].getAttribute('yt:contentLength')
if not dash_size:
try: dash_size = url_size(dash_url)
except: continue
dash_urls = self.__class__.chunk_by_range(dash_url, int(dash_size))
dash_webm_a_urls = self.__class__.chunk_by_range(dash_webm_a_url, int(dash_webm_a_size))
self.dash_streams[itag] = {
'quality': '%sx%s' % (w, h),
'itag': itag,
'type': mimeType,
'mime': mimeType,
'container': 'webm',
'src': [dash_urls, dash_webm_a_urls],
'size': int(dash_size) + int(dash_webm_a_size)
}
except:
try:
# FIXME: dead code?
# Video info from video page (not always available)
streams = [dict([(i.split('=')[0],
parse.unquote(i.split('=')[1]))
for i in afmt.split('&')])
for afmt in ytplayer_config['args']['adaptive_fmts'].split(',')]
except:
if 'adaptive_fmts' in video_info:
# FIXME: dead code?
streams = [dict([(i.split('=')[0],
parse.unquote(i.split('=')[1]))
for i in afmt.split('&')])
for afmt in video_info['adaptive_fmts'][0].split(',')]
# streams without contentLength got broken urls, just remove them (#2767)
streams = [stream for stream in streams if 'contentLength' in stream]
for stream in streams:
stream['itag'] = str(stream['itag'])
if 'qualityLabel' in stream:
stream['quality_label'] = stream['qualityLabel']
del stream['qualityLabel']
if 'width' in stream:
stream['size'] = '{}x{}'.format(stream['width'], stream['height'])
del stream['width']
del stream['height']
stream['type'] = stream['mimeType']
stream['clen'] = stream['contentLength']
stream['init'] = '{}-{}'.format(
stream['initRange']['start'],
stream['initRange']['end'])
stream['index'] = '{}-{}'.format(
stream['indexRange']['start'],
stream['indexRange']['end'])
del stream['mimeType']
del stream['contentLength']
del stream['initRange']
del stream['indexRange']
if 'signatureCipher' in stream:
logging.debug('Parsing signatureCipher for itag=%s...' % stream['itag'])
qs = parse_qs(stream['signatureCipher'])
#logging.debug(qs)
sp = qs['sp'][0]
sig = self.__class__.s_to_sig(self.js, qs['s'][0])
url = qs['url'][0] + '&ratebypass=yes&{}={}'.format(sp, sig)
elif 'url' in stream:
url = stream['url']
else:
try:
try:
# FIXME: dead code?
streams = json.loads(video_info['player_response'][0])['streamingData']['adaptiveFormats']
except:
streams = ytInitialPlayerResponse['streamingData']['adaptiveFormats']
except: # no DASH stream at all
# FIXME: dead code?
return
# FIXME: dead code?
# streams without contentLength got broken urls, just remove them (#2767)
streams = [stream for stream in streams if 'contentLength' in stream]
for stream in streams:
stream['itag'] = str(stream['itag'])
if 'qualityLabel' in stream:
stream['quality_label'] = stream['qualityLabel']
del stream['qualityLabel']
if 'width' in stream:
stream['size'] = '{}x{}'.format(stream['width'], stream['height'])
del stream['width']
del stream['height']
stream['type'] = stream['mimeType']
stream['clen'] = stream['contentLength']
stream['init'] = '{}-{}'.format(
stream['initRange']['start'],
stream['initRange']['end'])
stream['index'] = '{}-{}'.format(
stream['indexRange']['start'],
stream['indexRange']['end'])
del stream['mimeType']
del stream['contentLength']
del stream['initRange']
del stream['indexRange']
if 'signatureCipher' in stream:
logging.debug('Parsing signatureCipher for itag=%s...' % stream['itag'])
qs = parse_qs(stream['signatureCipher'])
#logging.debug(qs)
sp = qs['sp'][0]
sig = self.__class__.s_to_sig(self.js, qs['s'][0])
url = qs['url'][0] + '&ratebypass=yes&{}={}'.format(sp, sig)
elif 'url' in stream:
url = stream['url']
else:
log.wtf('No signatureCipher or url for itag=%s' % stream['itag'])
url = self.__class__.dethrottle(self.js, url)
stream['url'] = url
log.wtf('No signatureCipher or url for itag=%s' % stream['itag'])
url = self.__class__.dethrottle(self.js, url)
stream['url'] = url
for stream in streams: # audio
if stream['type'].startswith('audio/mp4'):