diff --git a/src/python/tests/integration/test_web/test_handler/test_controller.py b/src/python/tests/integration/test_web/test_handler/test_controller.py index 25d06f63..fc0a73af 100644 --- a/src/python/tests/integration/test_web/test_handler/test_controller.py +++ b/src/python/tests/integration/test_web/test_handler/test_controller.py @@ -20,11 +20,11 @@ def side_effect(cmd: Controller.Command): self.assertEqual(Controller.Command.Action.QUEUE, command.action) self.assertEqual("test1", command.filename) - uri = quote(quote("/value/with/slashes", safe=""), safe="") + uri = quote(quote("value/with/slashes", safe=""), safe="") print(self.test_app.get("/server/command/queue/" + uri)) command = self.controller.queue_command.call_args[0][0] self.assertEqual(Controller.Command.Action.QUEUE, command.action) - self.assertEqual("/value/with/slashes", command.filename) + self.assertEqual("value/with/slashes", command.filename) uri = quote(quote(" value with spaces", safe=""), safe="") print(self.test_app.get("/server/command/queue/" + uri)) @@ -56,11 +56,11 @@ def side_effect(cmd: Controller.Command): self.assertEqual(Controller.Command.Action.STOP, command.action) self.assertEqual("test1", command.filename) - uri = quote(quote("/value/with/slashes", safe=""), safe="") + uri = quote(quote("value/with/slashes", safe=""), safe="") print(self.test_app.get("/server/command/stop/" + uri)) command = self.controller.queue_command.call_args[0][0] self.assertEqual(Controller.Command.Action.STOP, command.action) - self.assertEqual("/value/with/slashes", command.filename) + self.assertEqual("value/with/slashes", command.filename) uri = quote(quote(" value with spaces", safe=""), safe="") print(self.test_app.get("/server/command/stop/" + uri)) @@ -92,11 +92,11 @@ def side_effect(cmd: Controller.Command): self.assertEqual(Controller.Command.Action.EXTRACT, command.action) self.assertEqual("test1", command.filename) - uri = quote(quote("/value/with/slashes", safe=""), safe="") + uri = quote(quote("value/with/slashes", safe=""), safe="") print(self.test_app.get("/server/command/extract/" + uri)) command = self.controller.queue_command.call_args[0][0] self.assertEqual(Controller.Command.Action.EXTRACT, command.action) - self.assertEqual("/value/with/slashes", command.filename) + self.assertEqual("value/with/slashes", command.filename) uri = quote(quote(" value with spaces", safe=""), safe="") print(self.test_app.get("/server/command/extract/" + uri)) @@ -128,11 +128,11 @@ def side_effect(cmd: Controller.Command): self.assertEqual(Controller.Command.Action.DELETE_LOCAL, command.action) self.assertEqual("test1", command.filename) - uri = quote(quote("/value/with/slashes", safe=""), safe="") + uri = quote(quote("value/with/slashes", safe=""), safe="") print(self.test_app.get("/server/command/delete_local/" + uri)) command = self.controller.queue_command.call_args[0][0] self.assertEqual(Controller.Command.Action.DELETE_LOCAL, command.action) - self.assertEqual("/value/with/slashes", command.filename) + self.assertEqual("value/with/slashes", command.filename) uri = quote(quote(" value with spaces", safe=""), safe="") print(self.test_app.get("/server/command/delete_local/" + uri)) @@ -164,11 +164,11 @@ def side_effect(cmd: Controller.Command): self.assertEqual(Controller.Command.Action.DELETE_REMOTE, command.action) self.assertEqual("test1", command.filename) - uri = quote(quote("/value/with/slashes", safe=""), safe="") + uri = quote(quote("value/with/slashes", safe=""), safe="") print(self.test_app.get("/server/command/delete_remote/" + uri)) command = self.controller.queue_command.call_args[0][0] self.assertEqual(Controller.Command.Action.DELETE_REMOTE, command.action) - self.assertEqual("/value/with/slashes", command.filename) + self.assertEqual("value/with/slashes", command.filename) uri = quote(quote(" value with spaces", safe=""), safe="") print(self.test_app.get("/server/command/delete_remote/" + uri)) @@ -187,3 +187,54 @@ def side_effect(cmd: Controller.Command): command = self.controller.queue_command.call_args[0][0] self.assertEqual(Controller.Command.Action.DELETE_REMOTE, command.action) self.assertEqual('value"with"doublequote', command.filename) + + +class TestControllerHandlerValidation(BaseTestWebApp): + """Tests for filename validation in controller handler.""" + + def test_control_char_newline_rejected(self): + uri = quote(quote("file\nname", safe=""), safe="") + resp = self.test_app.get("/server/command/queue/" + uri, expect_errors=True) + self.assertEqual(400, resp.status_int) + self.controller.queue_command.assert_not_called() + + def test_control_char_carriage_return_rejected(self): + uri = quote(quote("file\rname", safe=""), safe="") + resp = self.test_app.get("/server/command/queue/" + uri, expect_errors=True) + self.assertEqual(400, resp.status_int) + self.controller.queue_command.assert_not_called() + + def test_control_char_tab_rejected(self): + uri = quote(quote("file\tname", safe=""), safe="") + resp = self.test_app.get("/server/command/queue/" + uri, expect_errors=True) + self.assertEqual(400, resp.status_int) + self.controller.queue_command.assert_not_called() + + def test_control_char_soh_rejected(self): + uri = quote(quote("file\x01name", safe=""), safe="") + resp = self.test_app.get("/server/command/queue/" + uri, expect_errors=True) + self.assertEqual(400, resp.status_int) + self.controller.queue_command.assert_not_called() + + def test_control_char_del_rejected(self): + uri = quote(quote("file\x7fname", safe=""), safe="") + resp = self.test_app.get("/server/command/queue/" + uri, expect_errors=True) + self.assertEqual(400, resp.status_int) + self.controller.queue_command.assert_not_called() + + def test_normal_filename_accepted(self): + def side_effect(cmd: Controller.Command): + cmd.callbacks[0].on_success() + + self.controller.queue_command = MagicMock() + self.controller.queue_command.side_effect = side_effect + + resp = self.test_app.get("/server/command/queue/normal_file.txt") + self.assertEqual(200, resp.status_int) + command = self.controller.queue_command.call_args[0][0] + self.assertEqual("normal_file.txt", command.filename) + + def test_path_traversal_rejected(self): + uri = quote(quote("../etc/passwd", safe=""), safe="") + resp = self.test_app.get("/server/command/queue/" + uri, expect_errors=True) + self.assertEqual(400, resp.status_int) diff --git a/src/python/web/handler/controller.py b/src/python/web/handler/controller.py index 3d21abc3..3b5dbb69 100644 --- a/src/python/web/handler/controller.py +++ b/src/python/web/handler/controller.py @@ -22,6 +22,9 @@ def _validate_filename(file_name: str) -> bool: # Reject embedded null bytes (path truncation attack) if "\x00" in file_name: return False + # Reject control characters (0x00-0x1F, 0x7F) + if any(ord(c) < 0x20 or ord(c) == 0x7F for c in file_name): + return False # Reject absolute paths if os.path.isabs(file_name): return False