bazarr/libs/waitress/tests/test_proxy_headers.py

725 lines
25 KiB
Python

import unittest
from waitress.compat import tobytes
class TestProxyHeadersMiddleware(unittest.TestCase):
def _makeOne(self, app, **kw):
from waitress.proxy_headers import proxy_headers_middleware
return proxy_headers_middleware(app, **kw)
def _callFUT(self, app, **kw):
response = DummyResponse()
environ = DummyEnviron(**kw)
def start_response(status, response_headers):
response.status = status
response.headers = response_headers
response.steps = list(app(environ, start_response))
response.body = b"".join(tobytes(s) for s in response.steps)
return response
def test_get_environment_values_w_scheme_override_untrusted(self):
inner = DummyApp()
app = self._makeOne(inner)
response = self._callFUT(
app, headers={"X_FOO": "BAR", "X_FORWARDED_PROTO": "https",}
)
self.assertEqual(response.status, "200 OK")
self.assertEqual(inner.environ["wsgi.url_scheme"], "http")
def test_get_environment_values_w_scheme_override_trusted(self):
inner = DummyApp()
app = self._makeOne(
inner,
trusted_proxy="192.168.1.1",
trusted_proxy_headers={"x-forwarded-proto"},
)
response = self._callFUT(
app,
addr=["192.168.1.1", 8080],
headers={"X_FOO": "BAR", "X_FORWARDED_PROTO": "https",},
)
environ = inner.environ
self.assertEqual(response.status, "200 OK")
self.assertEqual(environ["SERVER_PORT"], "443")
self.assertEqual(environ["SERVER_NAME"], "localhost")
self.assertEqual(environ["REMOTE_ADDR"], "192.168.1.1")
self.assertEqual(environ["HTTP_X_FOO"], "BAR")
def test_get_environment_values_w_bogus_scheme_override(self):
inner = DummyApp()
app = self._makeOne(
inner,
trusted_proxy="192.168.1.1",
trusted_proxy_headers={"x-forwarded-proto"},
)
response = self._callFUT(
app,
addr=["192.168.1.1", 80],
headers={
"X_FOO": "BAR",
"X_FORWARDED_PROTO": "http://p02n3e.com?url=http",
},
)
self.assertEqual(response.status, "400 Bad Request")
self.assertIn(b'Header "X-Forwarded-Proto" malformed', response.body)
def test_get_environment_warning_other_proxy_headers(self):
inner = DummyApp()
logger = DummyLogger()
app = self._makeOne(
inner,
trusted_proxy="192.168.1.1",
trusted_proxy_count=1,
trusted_proxy_headers={"forwarded"},
log_untrusted=True,
logger=logger,
)
response = self._callFUT(
app,
addr=["192.168.1.1", 80],
headers={
"X_FORWARDED_FOR": "[2001:db8::1]",
"FORWARDED": "For=198.51.100.2;host=example.com:8080;proto=https",
},
)
self.assertEqual(response.status, "200 OK")
self.assertEqual(len(logger.logged), 1)
environ = inner.environ
self.assertNotIn("HTTP_X_FORWARDED_FOR", environ)
self.assertEqual(environ["REMOTE_ADDR"], "198.51.100.2")
self.assertEqual(environ["SERVER_NAME"], "example.com")
self.assertEqual(environ["HTTP_HOST"], "example.com:8080")
self.assertEqual(environ["SERVER_PORT"], "8080")
self.assertEqual(environ["wsgi.url_scheme"], "https")
def test_get_environment_contains_all_headers_including_untrusted(self):
inner = DummyApp()
app = self._makeOne(
inner,
trusted_proxy="192.168.1.1",
trusted_proxy_count=1,
trusted_proxy_headers={"x-forwarded-by"},
clear_untrusted=False,
)
headers_orig = {
"X_FORWARDED_FOR": "198.51.100.2",
"X_FORWARDED_BY": "Waitress",
"X_FORWARDED_PROTO": "https",
"X_FORWARDED_HOST": "example.org",
}
response = self._callFUT(
app, addr=["192.168.1.1", 80], headers=headers_orig.copy(),
)
self.assertEqual(response.status, "200 OK")
environ = inner.environ
for k, expected in headers_orig.items():
result = environ["HTTP_%s" % k]
self.assertEqual(result, expected)
def test_get_environment_contains_only_trusted_headers(self):
inner = DummyApp()
app = self._makeOne(
inner,
trusted_proxy="192.168.1.1",
trusted_proxy_count=1,
trusted_proxy_headers={"x-forwarded-by"},
clear_untrusted=True,
)
response = self._callFUT(
app,
addr=["192.168.1.1", 80],
headers={
"X_FORWARDED_FOR": "198.51.100.2",
"X_FORWARDED_BY": "Waitress",
"X_FORWARDED_PROTO": "https",
"X_FORWARDED_HOST": "example.org",
},
)
self.assertEqual(response.status, "200 OK")
environ = inner.environ
self.assertEqual(environ["HTTP_X_FORWARDED_BY"], "Waitress")
self.assertNotIn("HTTP_X_FORWARDED_FOR", environ)
self.assertNotIn("HTTP_X_FORWARDED_PROTO", environ)
self.assertNotIn("HTTP_X_FORWARDED_HOST", environ)
def test_get_environment_clears_headers_if_untrusted_proxy(self):
inner = DummyApp()
app = self._makeOne(
inner,
trusted_proxy="192.168.1.1",
trusted_proxy_count=1,
trusted_proxy_headers={"x-forwarded-by"},
clear_untrusted=True,
)
response = self._callFUT(
app,
addr=["192.168.1.255", 80],
headers={
"X_FORWARDED_FOR": "198.51.100.2",
"X_FORWARDED_BY": "Waitress",
"X_FORWARDED_PROTO": "https",
"X_FORWARDED_HOST": "example.org",
},
)
self.assertEqual(response.status, "200 OK")
environ = inner.environ
self.assertNotIn("HTTP_X_FORWARDED_BY", environ)
self.assertNotIn("HTTP_X_FORWARDED_FOR", environ)
self.assertNotIn("HTTP_X_FORWARDED_PROTO", environ)
self.assertNotIn("HTTP_X_FORWARDED_HOST", environ)
def test_parse_proxy_headers_forwarded_for(self):
inner = DummyApp()
app = self._makeOne(
inner, trusted_proxy="*", trusted_proxy_headers={"x-forwarded-for"},
)
response = self._callFUT(app, headers={"X_FORWARDED_FOR": "192.0.2.1"})
self.assertEqual(response.status, "200 OK")
environ = inner.environ
self.assertEqual(environ["REMOTE_ADDR"], "192.0.2.1")
def test_parse_proxy_headers_forwarded_for_v6_missing_brackets(self):
inner = DummyApp()
app = self._makeOne(
inner, trusted_proxy="*", trusted_proxy_headers={"x-forwarded-for"},
)
response = self._callFUT(app, headers={"X_FORWARDED_FOR": "2001:db8::0"})
self.assertEqual(response.status, "200 OK")
environ = inner.environ
self.assertEqual(environ["REMOTE_ADDR"], "2001:db8::0")
def test_parse_proxy_headers_forwared_for_multiple(self):
inner = DummyApp()
app = self._makeOne(
inner,
trusted_proxy="*",
trusted_proxy_count=2,
trusted_proxy_headers={"x-forwarded-for"},
)
response = self._callFUT(
app, headers={"X_FORWARDED_FOR": "192.0.2.1, 198.51.100.2, 203.0.113.1"}
)
self.assertEqual(response.status, "200 OK")
environ = inner.environ
self.assertEqual(environ["REMOTE_ADDR"], "198.51.100.2")
def test_parse_forwarded_multiple_proxies_trust_only_two(self):
inner = DummyApp()
app = self._makeOne(
inner,
trusted_proxy="*",
trusted_proxy_count=2,
trusted_proxy_headers={"forwarded"},
)
response = self._callFUT(
app,
headers={
"FORWARDED": (
"For=192.0.2.1;host=fake.com, "
"For=198.51.100.2;host=example.com:8080, "
"For=203.0.113.1"
),
},
)
self.assertEqual(response.status, "200 OK")
environ = inner.environ
self.assertEqual(environ["REMOTE_ADDR"], "198.51.100.2")
self.assertEqual(environ["SERVER_NAME"], "example.com")
self.assertEqual(environ["HTTP_HOST"], "example.com:8080")
self.assertEqual(environ["SERVER_PORT"], "8080")
def test_parse_forwarded_multiple_proxies(self):
inner = DummyApp()
app = self._makeOne(
inner,
trusted_proxy="*",
trusted_proxy_count=2,
trusted_proxy_headers={"forwarded"},
)
response = self._callFUT(
app,
headers={
"FORWARDED": (
'for="[2001:db8::1]:3821";host="example.com:8443";proto="https", '
'for=192.0.2.1;host="example.internal:8080"'
),
},
)
self.assertEqual(response.status, "200 OK")
environ = inner.environ
self.assertEqual(environ["REMOTE_ADDR"], "2001:db8::1")
self.assertEqual(environ["REMOTE_PORT"], "3821")
self.assertEqual(environ["SERVER_NAME"], "example.com")
self.assertEqual(environ["HTTP_HOST"], "example.com:8443")
self.assertEqual(environ["SERVER_PORT"], "8443")
self.assertEqual(environ["wsgi.url_scheme"], "https")
def test_parse_forwarded_multiple_proxies_minimal(self):
inner = DummyApp()
app = self._makeOne(
inner,
trusted_proxy="*",
trusted_proxy_count=2,
trusted_proxy_headers={"forwarded"},
)
response = self._callFUT(
app,
headers={
"FORWARDED": (
'for="[2001:db8::1]";proto="https", '
'for=192.0.2.1;host="example.org"'
),
},
)
self.assertEqual(response.status, "200 OK")
environ = inner.environ
self.assertEqual(environ["REMOTE_ADDR"], "2001:db8::1")
self.assertEqual(environ["SERVER_NAME"], "example.org")
self.assertEqual(environ["HTTP_HOST"], "example.org")
self.assertEqual(environ["SERVER_PORT"], "443")
self.assertEqual(environ["wsgi.url_scheme"], "https")
def test_parse_proxy_headers_forwarded_host_with_port(self):
inner = DummyApp()
app = self._makeOne(
inner,
trusted_proxy="*",
trusted_proxy_count=2,
trusted_proxy_headers={
"x-forwarded-for",
"x-forwarded-proto",
"x-forwarded-host",
},
)
response = self._callFUT(
app,
headers={
"X_FORWARDED_FOR": "192.0.2.1, 198.51.100.2, 203.0.113.1",
"X_FORWARDED_PROTO": "http",
"X_FORWARDED_HOST": "example.com:8080",
},
)
self.assertEqual(response.status, "200 OK")
environ = inner.environ
self.assertEqual(environ["REMOTE_ADDR"], "198.51.100.2")
self.assertEqual(environ["SERVER_NAME"], "example.com")
self.assertEqual(environ["HTTP_HOST"], "example.com:8080")
self.assertEqual(environ["SERVER_PORT"], "8080")
def test_parse_proxy_headers_forwarded_host_without_port(self):
inner = DummyApp()
app = self._makeOne(
inner,
trusted_proxy="*",
trusted_proxy_count=2,
trusted_proxy_headers={
"x-forwarded-for",
"x-forwarded-proto",
"x-forwarded-host",
},
)
response = self._callFUT(
app,
headers={
"X_FORWARDED_FOR": "192.0.2.1, 198.51.100.2, 203.0.113.1",
"X_FORWARDED_PROTO": "http",
"X_FORWARDED_HOST": "example.com",
},
)
self.assertEqual(response.status, "200 OK")
environ = inner.environ
self.assertEqual(environ["REMOTE_ADDR"], "198.51.100.2")
self.assertEqual(environ["SERVER_NAME"], "example.com")
self.assertEqual(environ["HTTP_HOST"], "example.com")
self.assertEqual(environ["SERVER_PORT"], "80")
def test_parse_proxy_headers_forwarded_host_with_forwarded_port(self):
inner = DummyApp()
app = self._makeOne(
inner,
trusted_proxy="*",
trusted_proxy_count=2,
trusted_proxy_headers={
"x-forwarded-for",
"x-forwarded-proto",
"x-forwarded-host",
"x-forwarded-port",
},
)
response = self._callFUT(
app,
headers={
"X_FORWARDED_FOR": "192.0.2.1, 198.51.100.2, 203.0.113.1",
"X_FORWARDED_PROTO": "http",
"X_FORWARDED_HOST": "example.com",
"X_FORWARDED_PORT": "8080",
},
)
self.assertEqual(response.status, "200 OK")
environ = inner.environ
self.assertEqual(environ["REMOTE_ADDR"], "198.51.100.2")
self.assertEqual(environ["SERVER_NAME"], "example.com")
self.assertEqual(environ["HTTP_HOST"], "example.com:8080")
self.assertEqual(environ["SERVER_PORT"], "8080")
def test_parse_proxy_headers_forwarded_host_multiple_with_forwarded_port(self):
inner = DummyApp()
app = self._makeOne(
inner,
trusted_proxy="*",
trusted_proxy_count=2,
trusted_proxy_headers={
"x-forwarded-for",
"x-forwarded-proto",
"x-forwarded-host",
"x-forwarded-port",
},
)
response = self._callFUT(
app,
headers={
"X_FORWARDED_FOR": "192.0.2.1, 198.51.100.2, 203.0.113.1",
"X_FORWARDED_PROTO": "http",
"X_FORWARDED_HOST": "example.com, example.org",
"X_FORWARDED_PORT": "8080",
},
)
self.assertEqual(response.status, "200 OK")
environ = inner.environ
self.assertEqual(environ["REMOTE_ADDR"], "198.51.100.2")
self.assertEqual(environ["SERVER_NAME"], "example.com")
self.assertEqual(environ["HTTP_HOST"], "example.com:8080")
self.assertEqual(environ["SERVER_PORT"], "8080")
def test_parse_proxy_headers_forwarded_host_multiple_with_forwarded_port_limit_one_trusted(
self,
):
inner = DummyApp()
app = self._makeOne(
inner,
trusted_proxy="*",
trusted_proxy_count=1,
trusted_proxy_headers={
"x-forwarded-for",
"x-forwarded-proto",
"x-forwarded-host",
"x-forwarded-port",
},
)
response = self._callFUT(
app,
headers={
"X_FORWARDED_FOR": "192.0.2.1, 198.51.100.2, 203.0.113.1",
"X_FORWARDED_PROTO": "http",
"X_FORWARDED_HOST": "example.com, example.org",
"X_FORWARDED_PORT": "8080",
},
)
self.assertEqual(response.status, "200 OK")
environ = inner.environ
self.assertEqual(environ["REMOTE_ADDR"], "203.0.113.1")
self.assertEqual(environ["SERVER_NAME"], "example.org")
self.assertEqual(environ["HTTP_HOST"], "example.org:8080")
self.assertEqual(environ["SERVER_PORT"], "8080")
def test_parse_forwarded(self):
inner = DummyApp()
app = self._makeOne(
inner,
trusted_proxy="*",
trusted_proxy_count=1,
trusted_proxy_headers={"forwarded"},
)
response = self._callFUT(
app,
headers={
"FORWARDED": "For=198.51.100.2:5858;host=example.com:8080;proto=https",
},
)
self.assertEqual(response.status, "200 OK")
environ = inner.environ
self.assertEqual(environ["REMOTE_ADDR"], "198.51.100.2")
self.assertEqual(environ["REMOTE_PORT"], "5858")
self.assertEqual(environ["SERVER_NAME"], "example.com")
self.assertEqual(environ["HTTP_HOST"], "example.com:8080")
self.assertEqual(environ["SERVER_PORT"], "8080")
self.assertEqual(environ["wsgi.url_scheme"], "https")
def test_parse_forwarded_empty_pair(self):
inner = DummyApp()
app = self._makeOne(
inner,
trusted_proxy="*",
trusted_proxy_count=1,
trusted_proxy_headers={"forwarded"},
)
response = self._callFUT(
app, headers={"FORWARDED": "For=198.51.100.2;;proto=https;by=_unused",}
)
self.assertEqual(response.status, "200 OK")
environ = inner.environ
self.assertEqual(environ["REMOTE_ADDR"], "198.51.100.2")
def test_parse_forwarded_pair_token_whitespace(self):
inner = DummyApp()
app = self._makeOne(
inner,
trusted_proxy="*",
trusted_proxy_count=1,
trusted_proxy_headers={"forwarded"},
)
response = self._callFUT(
app, headers={"FORWARDED": "For=198.51.100.2; proto =https",}
)
self.assertEqual(response.status, "400 Bad Request")
self.assertIn(b'Header "Forwarded" malformed', response.body)
def test_parse_forwarded_pair_value_whitespace(self):
inner = DummyApp()
app = self._makeOne(
inner,
trusted_proxy="*",
trusted_proxy_count=1,
trusted_proxy_headers={"forwarded"},
)
response = self._callFUT(
app, headers={"FORWARDED": 'For= "198.51.100.2"; proto =https',}
)
self.assertEqual(response.status, "400 Bad Request")
self.assertIn(b'Header "Forwarded" malformed', response.body)
def test_parse_forwarded_pair_no_equals(self):
inner = DummyApp()
app = self._makeOne(
inner,
trusted_proxy="*",
trusted_proxy_count=1,
trusted_proxy_headers={"forwarded"},
)
response = self._callFUT(app, headers={"FORWARDED": "For"})
self.assertEqual(response.status, "400 Bad Request")
self.assertIn(b'Header "Forwarded" malformed', response.body)
def test_parse_forwarded_warning_unknown_token(self):
inner = DummyApp()
logger = DummyLogger()
app = self._makeOne(
inner,
trusted_proxy="*",
trusted_proxy_count=1,
trusted_proxy_headers={"forwarded"},
logger=logger,
)
response = self._callFUT(
app,
headers={
"FORWARDED": (
"For=198.51.100.2;host=example.com:8080;proto=https;"
'unknown="yolo"'
),
},
)
self.assertEqual(response.status, "200 OK")
self.assertEqual(len(logger.logged), 1)
self.assertIn("Unknown Forwarded token", logger.logged[0])
environ = inner.environ
self.assertEqual(environ["REMOTE_ADDR"], "198.51.100.2")
self.assertEqual(environ["SERVER_NAME"], "example.com")
self.assertEqual(environ["HTTP_HOST"], "example.com:8080")
self.assertEqual(environ["SERVER_PORT"], "8080")
self.assertEqual(environ["wsgi.url_scheme"], "https")
def test_parse_no_valid_proxy_headers(self):
inner = DummyApp()
app = self._makeOne(inner, trusted_proxy="*", trusted_proxy_count=1,)
response = self._callFUT(
app,
headers={
"X_FORWARDED_FOR": "198.51.100.2",
"FORWARDED": "For=198.51.100.2;host=example.com:8080;proto=https",
},
)
self.assertEqual(response.status, "200 OK")
environ = inner.environ
self.assertEqual(environ["REMOTE_ADDR"], "127.0.0.1")
self.assertEqual(environ["SERVER_NAME"], "localhost")
self.assertEqual(environ["HTTP_HOST"], "192.168.1.1:80")
self.assertEqual(environ["SERVER_PORT"], "8080")
self.assertEqual(environ["wsgi.url_scheme"], "http")
def test_parse_multiple_x_forwarded_proto(self):
inner = DummyApp()
logger = DummyLogger()
app = self._makeOne(
inner,
trusted_proxy="*",
trusted_proxy_count=1,
trusted_proxy_headers={"x-forwarded-proto"},
logger=logger,
)
response = self._callFUT(app, headers={"X_FORWARDED_PROTO": "http, https",})
self.assertEqual(response.status, "400 Bad Request")
self.assertIn(b'Header "X-Forwarded-Proto" malformed', response.body)
def test_parse_multiple_x_forwarded_port(self):
inner = DummyApp()
logger = DummyLogger()
app = self._makeOne(
inner,
trusted_proxy="*",
trusted_proxy_count=1,
trusted_proxy_headers={"x-forwarded-port"},
logger=logger,
)
response = self._callFUT(app, headers={"X_FORWARDED_PORT": "443, 80",})
self.assertEqual(response.status, "400 Bad Request")
self.assertIn(b'Header "X-Forwarded-Port" malformed', response.body)
def test_parse_forwarded_port_wrong_proto_port_80(self):
inner = DummyApp()
app = self._makeOne(
inner,
trusted_proxy="*",
trusted_proxy_count=1,
trusted_proxy_headers={
"x-forwarded-port",
"x-forwarded-host",
"x-forwarded-proto",
},
)
response = self._callFUT(
app,
headers={
"X_FORWARDED_PORT": "80",
"X_FORWARDED_PROTO": "https",
"X_FORWARDED_HOST": "example.com",
},
)
self.assertEqual(response.status, "200 OK")
environ = inner.environ
self.assertEqual(environ["SERVER_NAME"], "example.com")
self.assertEqual(environ["HTTP_HOST"], "example.com:80")
self.assertEqual(environ["SERVER_PORT"], "80")
self.assertEqual(environ["wsgi.url_scheme"], "https")
def test_parse_forwarded_port_wrong_proto_port_443(self):
inner = DummyApp()
app = self._makeOne(
inner,
trusted_proxy="*",
trusted_proxy_count=1,
trusted_proxy_headers={
"x-forwarded-port",
"x-forwarded-host",
"x-forwarded-proto",
},
)
response = self._callFUT(
app,
headers={
"X_FORWARDED_PORT": "443",
"X_FORWARDED_PROTO": "http",
"X_FORWARDED_HOST": "example.com",
},
)
self.assertEqual(response.status, "200 OK")
environ = inner.environ
self.assertEqual(environ["SERVER_NAME"], "example.com")
self.assertEqual(environ["HTTP_HOST"], "example.com:443")
self.assertEqual(environ["SERVER_PORT"], "443")
self.assertEqual(environ["wsgi.url_scheme"], "http")
def test_parse_forwarded_for_bad_quote(self):
inner = DummyApp()
app = self._makeOne(
inner,
trusted_proxy="*",
trusted_proxy_count=1,
trusted_proxy_headers={"x-forwarded-for"},
)
response = self._callFUT(app, headers={"X_FORWARDED_FOR": '"foo'})
self.assertEqual(response.status, "400 Bad Request")
self.assertIn(b'Header "X-Forwarded-For" malformed', response.body)
def test_parse_forwarded_host_bad_quote(self):
inner = DummyApp()
app = self._makeOne(
inner,
trusted_proxy="*",
trusted_proxy_count=1,
trusted_proxy_headers={"x-forwarded-host"},
)
response = self._callFUT(app, headers={"X_FORWARDED_HOST": '"foo'})
self.assertEqual(response.status, "400 Bad Request")
self.assertIn(b'Header "X-Forwarded-Host" malformed', response.body)
class DummyLogger(object):
def __init__(self):
self.logged = []
def warning(self, msg, *args):
self.logged.append(msg % args)
class DummyApp(object):
def __call__(self, environ, start_response):
self.environ = environ
start_response("200 OK", [("Content-Type", "text/plain")])
yield "hello"
class DummyResponse(object):
status = None
headers = None
body = None
def DummyEnviron(
addr=("127.0.0.1", 8080), scheme="http", server="localhost", headers=None,
):
environ = {
"REMOTE_ADDR": addr[0],
"REMOTE_HOST": addr[0],
"REMOTE_PORT": addr[1],
"SERVER_PORT": str(addr[1]),
"SERVER_NAME": server,
"wsgi.url_scheme": scheme,
"HTTP_HOST": "192.168.1.1:80",
}
if headers:
environ.update(
{
"HTTP_" + key.upper().replace("-", "_"): value
for key, value in headers.items()
}
)
return environ