Handle Basic Auth `user:pass` in URLs

Fixes https://github.com/ytdl-org/youtube-dl/issues/20258, https://github.com/ytdl-org/youtube-dl/issues/26211
Authored by: hhirtz, pukkandan
This commit is contained in:
Hubert Hirtz 2021-04-19 14:07:45 +02:00 committed by pukkandan
parent f17c702270
commit 5435dcf96e
No known key found for this signature in database
GPG Key ID: 0F00D95A001F4698
2 changed files with 27 additions and 1 deletions

View File

@ -66,6 +66,7 @@ from yt_dlp.utils import (
sanitize_filename, sanitize_filename,
sanitize_path, sanitize_path,
sanitize_url, sanitize_url,
sanitized_Request,
expand_path, expand_path,
prepend_extension, prepend_extension,
replace_extension, replace_extension,
@ -239,6 +240,15 @@ class TestUtil(unittest.TestCase):
self.assertEqual(sanitize_url('rmtps://foo.bar'), 'rtmps://foo.bar') self.assertEqual(sanitize_url('rmtps://foo.bar'), 'rtmps://foo.bar')
self.assertEqual(sanitize_url('https://foo.bar'), 'https://foo.bar') self.assertEqual(sanitize_url('https://foo.bar'), 'https://foo.bar')
def test_extract_basic_auth(self):
auth_header = lambda url: sanitized_Request(url).get_header('Authorization')
self.assertFalse(auth_header('http://foo.bar'))
self.assertFalse(auth_header('http://:foo.bar'))
self.assertEqual(auth_header('http://@foo.bar'), 'Basic Og==')
self.assertEqual(auth_header('http://:pass@foo.bar'), 'Basic OnBhc3M=')
self.assertEqual(auth_header('http://user:@foo.bar'), 'Basic dXNlcjo=')
self.assertEqual(auth_header('http://user:pass@foo.bar'), 'Basic dXNlcjpwYXNz')
def test_expand_path(self): def test_expand_path(self):
def env(var): def env(var):
return '%{0}%'.format(var) if sys.platform == 'win32' else '${0}'.format(var) return '%{0}%'.format(var) if sys.platform == 'win32' else '${0}'.format(var)

View File

@ -2168,8 +2168,24 @@ def sanitize_url(url):
return escape_url(url) return escape_url(url)
def extract_basic_auth(url):
parts = compat_urlparse.urlsplit(url)
if parts.username is None:
return url, None
url = compat_urlparse.urlunsplit(parts._replace(netloc=(
parts.hostname if parts.port is None
else '%s:%d' % (parts.hostname, parts.port))))
auth_payload = base64.b64encode(
('%s:%s' % (parts.username, parts.password or '')).encode('utf-8'))
return url, 'Basic ' + auth_payload.decode('utf-8')
def sanitized_Request(url, *args, **kwargs): def sanitized_Request(url, *args, **kwargs):
return compat_urllib_request.Request(sanitize_url(url), *args, **kwargs) url, auth_header = extract_basic_auth(sanitize_url(url))
if auth_header is not None:
headers = args[1] if len(args) >= 2 else kwargs.setdefault('headers', {})
headers['Authorization'] = auth_header
return compat_urllib_request.Request(url, *args, **kwargs)
def expand_path(s): def expand_path(s):