From d5da265b5f7ebccc901c6d9b2815c0a2da2db5ea Mon Sep 17 00:00:00 2001 From: Nikolay Tatarinov Date: Tue, 24 Feb 2026 14:16:34 +0300 Subject: [PATCH] feat: enhance HTTP handling and configuration - Introduced a new utility function `safe_urlopen` to ensure only allowed URL schemes (http, https) are opened, enhancing security against path traversal vulnerabilities. - Updated the `run.py` and `calendar_ics.py` files to utilize `safe_urlopen` for HTTP requests, improving error handling and security. - Added `HTTP_HOST` configuration to the settings, allowing dynamic binding of the HTTP server host. - Revised the `.env.example` file to include the new `HTTP_HOST` variable with a description. - Enhanced tests for `safe_urlopen` to validate behavior with disallowed URL schemes and ensure proper integration in existing functionality. --- .coverage | Bin 53248 -> 53248 bytes .cursorignore | 1 + .env.example | 1 + docs/configuration.md | 1 + duty_teller/api/calendar_ics.py | 11 ++++-- duty_teller/config.py | 5 +++ duty_teller/run.py | 5 +-- duty_teller/utils/http_client.py | 36 ++++++++++++++++++++ tests/test_calendar_ics.py | 11 ++++++ tests/test_http_client.py | 56 +++++++++++++++++++++++++++++++ tests/test_run.py | 42 +++++++++++++++-------- 11 files changed, 150 insertions(+), 19 deletions(-) create mode 100644 .cursorignore create mode 100644 duty_teller/utils/http_client.py create mode 100644 tests/test_http_client.py diff --git a/.coverage b/.coverage index d5f46e177cf100634b9612a1bdbfa8b7fd5211ab..6586674399b1d9a3d04fd35db2800c73b93dafe6 100644 GIT binary patch delta 1042 zcmZvaZD>Sp(8&WnmQy9XcrM5O* zT|byOHa|G}0%xu>s4d9$LB}Qp9Znkj;MRe#59@?6+eU<05UO>G>#FhY+|xNG_~|*% z|2)tCz>kw3v*pKZ=fs3fm;0!kXb5V8dS2~Tm%%^aXRrk-$`8uN$`*N1J}dVXoz*L3 zX^n0Z_R-f<4a$Mu&Ze#pw?*lEb1R6@;Z!3HB_Zuix}&J-q^O3Pox|m_|&0vI@6iSrqe9b9AG<>lApC-upXr2DM*Ro2R@#*T)W`_UjA~y z2uL25gtXgOOXm!SRL7i$_g-oVR&91$C1-Co)6ui){cN@?{a#0APiMB6Xt^RLb@`w` zej=Z0N3^XZrCvZ~;zFlU#&I4sIqpNK{HT1QY?U9$U*lhK0k>maJ)kzo2jMMv48G>* zb!^Z|8ql6FL@Q-_sckLL>D{(g5T!$JHT_G)DMb<@qXOPp&L^cLlbj4KA_I(JrB>WE3u@Gb_!&H%#%s-JvmN}kS@|nHjzd=g|FgE_#8fpb2y6)yc##) z5O$%z(LMA#nm{G=C8Fpk`T*@kyU|XD2O_tuivU(Te*tQg9qQZ#BFsy)V_;QH;7q*o z@Uf6L%}47!L3SDU({Bd6>wM}Cr=QuFzSuuv-XAZH{CRDK-R)zemEMhjXynI)-nHT_8r-(FjpG_#*qZ7Y9f5 zh{7hXiXXM5s!klbW0r4BUpLK-56ta(@v?ZiroAdiyN4{ogF}rzkI2ID+Wupcp|R}? zQ^#lK_NQMFe!C~p$)VLYozzl!SckPlCv)T$2^8bQH!-ra4shyR+%!5Ghy_C2_=8-0 r0WP|qi`U1+L)%C7ShR+lh>J_TlS{aoONej@(yvC>gSz6(s2=|ZdU-%V delta 925 zcmZ9KSxgf_9L76z*F9inXzBLYwzOMoVm$!sL-j#LF|u1g(HD)HNCGAv1W`m0q6Ope zpaspuXo#jHCXyKMdL-b1`k)5oK}@{C+k`|kUVy0UbYpz*>HB}*|C`zUZMLz^+}LJ5 z!{nP4u7hy-Az2(2&x>ot@xnu)Stt^C{uaNBpTP}s9Z5@OHOGb(vuQ0kTNvUu)K=wG zZz=JUlZEqHA7g6DqH&mnRV&%4Mo5DyvR;~+jr6iA0uOmnGMRi;CEzxuh$aDtH1;r|2#ya0&y2Ov`;h*2HtOiTHJ;h~#67da7kdYxjaMW!~W{OIV`NBSNYjSNU;$wi|{4)QW?2X-mlFfMDK!M zdc&z$m0+PwTlSjnrkIeK8odKdH3(ulMm=PI^Emo%U1~n4_;{M+GuszyX@|EB>gzrY ze$cNK=&`h57gLv3+-N4%Egm3|c*|s`ouSrV2jwK>MO7 (cached_at_timestamp, raw_ics_bytes) @@ -16,11 +18,14 @@ FETCH_TIMEOUT_SECONDS = 15 def _fetch_ics(url: str) -> bytes | None: - """GET url, return response body or None on error.""" + """GET url, return response body or None on error. Only https/http schemes allowed.""" try: req = Request(url, headers={"User-Agent": "DutyTeller/1.0"}) - with urlopen(req, timeout=FETCH_TIMEOUT_SECONDS) as resp: + with safe_urlopen(req, timeout=FETCH_TIMEOUT_SECONDS) as resp: return resp.read() + except ValueError: + log.warning("ICS URL scheme not allowed (only https/http): %s", url) + return None except URLError as e: log.warning("Failed to fetch ICS from %s: %s", url, e) return None diff --git a/duty_teller/config.py b/duty_teller/config.py index bcf0297..d9a4987 100644 --- a/duty_teller/config.py +++ b/duty_teller/config.py @@ -53,6 +53,7 @@ class Settings: bot_token: str database_url: str mini_app_base_url: str + http_host: str http_port: int allowed_usernames: set[str] admin_usernames: set[str] @@ -90,10 +91,13 @@ class Settings: if raw_cors and raw_cors != "*" else ["*"] ) + raw_host = (os.getenv("HTTP_HOST") or "127.0.0.1").strip() + http_host = raw_host if raw_host else "127.0.0.1" return cls( bot_token=bot_token, database_url=os.getenv("DATABASE_URL", "sqlite:///data/duty_teller.db"), mini_app_base_url=os.getenv("MINI_APP_BASE_URL", "").rstrip("/"), + http_host=http_host, http_port=int(os.getenv("HTTP_PORT", "8080")), allowed_usernames=allowed, admin_usernames=admin, @@ -120,6 +124,7 @@ _settings = Settings.from_env() BOT_TOKEN = _settings.bot_token DATABASE_URL = _settings.database_url MINI_APP_BASE_URL = _settings.mini_app_base_url +HTTP_HOST = _settings.http_host HTTP_PORT = _settings.http_port ALLOWED_USERNAMES = _settings.allowed_usernames ADMIN_USERNAMES = _settings.admin_usernames diff --git a/duty_teller/run.py b/duty_teller/run.py index 3966ec5..dcb730f 100644 --- a/duty_teller/run.py +++ b/duty_teller/run.py @@ -11,6 +11,7 @@ from telegram.ext import ApplicationBuilder from duty_teller import config from duty_teller.config import require_bot_token from duty_teller.handlers import group_duty_pin, register_handlers +from duty_teller.utils.http_client import safe_urlopen logging.basicConfig( format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", @@ -39,7 +40,7 @@ def _set_default_menu_button_webapp() -> None: method="POST", ) try: - with urllib.request.urlopen(req, timeout=10) as resp: + with safe_urlopen(req, timeout=10) as resp: if resp.status == 200: logger.info("Default menu button set to Web App: %s", menu_url) else: @@ -54,7 +55,7 @@ def _run_uvicorn(web_app, port: int) -> None: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) server = uvicorn.Server( - uvicorn.Config(web_app, host="0.0.0.0", port=port, log_level="info"), + uvicorn.Config(web_app, host=config.HTTP_HOST, port=port, log_level="info"), ) loop.run_until_complete(server.serve()) diff --git a/duty_teller/utils/http_client.py b/duty_teller/utils/http_client.py new file mode 100644 index 0000000..83532fa --- /dev/null +++ b/duty_teller/utils/http_client.py @@ -0,0 +1,36 @@ +"""Safe HTTP client: open URLs only for allowed schemes (e.g. https, http) to avoid path traversal (CWE-22).""" + +from urllib.request import Request, urlopen +from urllib.parse import urlparse + + +def safe_urlopen( + request: Request, + timeout: float = 10, + allowed_schemes: tuple[str, ...] = ("https", "http"), +) -> "urlopen": + """Open URL only if its scheme is in allowed_schemes; otherwise raise ValueError. + + Use this instead of raw urlopen() to satisfy Bandit B310 and prevent opening + file:// or other non-HTTP schemes. + + Args: + request: urllib.request.Request (has .full_url or .get_full_url()). + timeout: Timeout in seconds for the request. + allowed_schemes: Tuple of lowercase scheme names, e.g. ("https", "http"). + + Returns: + Context manager from urlopen (use with 'with'). + + Raises: + ValueError: If the request URL scheme is not in allowed_schemes. + """ + url = request.get_full_url() + parsed = urlparse(url) + scheme = (parsed.scheme or "").lower() + if scheme not in allowed_schemes: + raise ValueError( + f"URL scheme not allowed: {scheme!r} (allowed: {allowed_schemes})" + ) + # Scheme validated above; only https/http reach here (B310). + return urlopen(request, timeout=timeout) # nosec B310 diff --git a/tests/test_calendar_ics.py b/tests/test_calendar_ics.py index 14cb196..45b30dc 100644 --- a/tests/test_calendar_ics.py +++ b/tests/test_calendar_ics.py @@ -129,6 +129,17 @@ class TestGetCalendarEvents: def test_empty_url_returns_empty(self): assert mod.get_calendar_events("", "2025-01-01", "2025-01-31") == [] + def test_disallowed_url_scheme_returns_empty(self): + """get_calendar_events: file:// or ftp:// URL does not call urlopen, returns [].""" + result = mod.get_calendar_events( + "file:///etc/passwd", "2025-01-01", "2025-01-31" + ) + assert result == [] + result = mod.get_calendar_events( + "ftp://example.com/cal.ics", "2025-01-01", "2025-01-31" + ) + assert result == [] + def test_from_after_to_returns_empty(self): assert ( mod.get_calendar_events( diff --git a/tests/test_http_client.py b/tests/test_http_client.py new file mode 100644 index 0000000..a05b891 --- /dev/null +++ b/tests/test_http_client.py @@ -0,0 +1,56 @@ +"""Tests for duty_teller.utils.http_client (safe_urlopen scheme check).""" + +from unittest.mock import MagicMock, patch + +import pytest +from urllib.request import Request + +from duty_teller.utils.http_client import safe_urlopen + + +def test_safe_urlopen_rejects_file_scheme(): + """safe_urlopen: file:// URL raises ValueError and does not call urlopen.""" + req = Request("file:///etc/passwd") + with patch("duty_teller.utils.http_client.urlopen") as mock_urlopen: + with pytest.raises(ValueError, match="scheme not allowed"): + with safe_urlopen(req, timeout=5): + pass + mock_urlopen.assert_not_called() + + +def test_safe_urlopen_rejects_ftp_scheme(): + """safe_urlopen: ftp:// URL raises ValueError and does not call urlopen.""" + req = Request("ftp://example.com/file.txt") + with patch("duty_teller.utils.http_client.urlopen") as mock_urlopen: + with pytest.raises(ValueError, match="scheme not allowed"): + with safe_urlopen(req, timeout=5): + pass + mock_urlopen.assert_not_called() + + +def test_safe_urlopen_allows_https(): + """safe_urlopen: https:// URL is allowed and urlopen is called.""" + req = Request("https://api.telegram.org/bot123/setChatMenuButton") + mock_resp = MagicMock() + mock_resp.__enter__ = MagicMock(return_value=mock_resp) + mock_resp.__exit__ = MagicMock(return_value=False) + with patch( + "duty_teller.utils.http_client.urlopen", return_value=mock_resp + ) as mock_urlopen: + with safe_urlopen(req, timeout=10) as resp: + assert resp is mock_resp + mock_urlopen.assert_called_once_with(req, timeout=10) + + +def test_safe_urlopen_allows_http_when_in_allowed_schemes(): + """safe_urlopen: http:// URL is allowed when allowed_schemes includes http.""" + req = Request("http://localhost:8080/health") + mock_resp = MagicMock() + mock_resp.__enter__ = MagicMock(return_value=mock_resp) + mock_resp.__exit__ = MagicMock(return_value=False) + with patch( + "duty_teller.utils.http_client.urlopen", return_value=mock_resp + ) as mock_urlopen: + with safe_urlopen(req, timeout=5, allowed_schemes=("https", "http")) as resp: + assert resp is mock_resp + mock_urlopen.assert_called_once_with(req, timeout=5) diff --git a/tests/test_run.py b/tests/test_run.py index f711d10..1b9d2f7 100644 --- a/tests/test_run.py +++ b/tests/test_run.py @@ -76,32 +76,32 @@ def test_set_default_menu_button_skips_when_not_https(): def test_set_default_menu_button_calls_telegram_api(): - """_set_default_menu_button_webapp: when URL is https, calls Telegram API (mocked).""" + """_set_default_menu_button_webapp: when URL is https, calls safe_urlopen (Telegram API mocked).""" with patch("duty_teller.run.config") as mock_cfg: mock_cfg.MINI_APP_BASE_URL = "https://example.com/" mock_cfg.BOT_TOKEN = "123:ABC" - with patch("urllib.request.urlopen") as mock_urlopen: + with patch("duty_teller.run.safe_urlopen") as mock_safe_urlopen: mock_resp = MagicMock() mock_resp.status = 200 mock_resp.__enter__ = MagicMock(return_value=mock_resp) mock_resp.__exit__ = MagicMock(return_value=False) - mock_urlopen.return_value = mock_resp + mock_safe_urlopen.return_value = mock_resp _set_default_menu_button_webapp() - mock_urlopen.assert_called_once() - req = mock_urlopen.call_args[0][0] - assert "setChatMenuButton" in req.full_url - assert "123:ABC" in req.full_url + mock_safe_urlopen.assert_called_once() + req = mock_safe_urlopen.call_args[0][0] + assert "setChatMenuButton" in req.get_full_url() + assert "123:ABC" in req.get_full_url() def test_set_default_menu_button_handles_urlopen_error(): - """_set_default_menu_button_webapp: on urlopen error, logs and does not raise.""" + """_set_default_menu_button_webapp: on safe_urlopen error, logs and does not raise.""" with patch("duty_teller.run.config") as mock_cfg: mock_cfg.MINI_APP_BASE_URL = "https://example.com/" mock_cfg.BOT_TOKEN = "123:ABC" - with patch("urllib.request.urlopen") as mock_urlopen: - mock_urlopen.side_effect = OSError("network error") + with patch("duty_teller.run.safe_urlopen") as mock_safe_urlopen: + mock_safe_urlopen.side_effect = OSError("network error") _set_default_menu_button_webapp() - mock_urlopen.assert_called_once() + mock_safe_urlopen.assert_called_once() def test_set_default_menu_button_handles_non_200_response(): @@ -109,11 +109,25 @@ def test_set_default_menu_button_handles_non_200_response(): with patch("duty_teller.run.config") as mock_cfg: mock_cfg.MINI_APP_BASE_URL = "https://example.com/" mock_cfg.BOT_TOKEN = "123:ABC" - with patch("urllib.request.urlopen") as mock_urlopen: + with patch("duty_teller.run.safe_urlopen") as mock_safe_urlopen: mock_resp = MagicMock() mock_resp.status = 400 mock_resp.__enter__ = MagicMock(return_value=mock_resp) mock_resp.__exit__ = MagicMock(return_value=False) - mock_urlopen.return_value = mock_resp + mock_safe_urlopen.return_value = mock_resp _set_default_menu_button_webapp() - mock_urlopen.assert_called_once() + mock_safe_urlopen.assert_called_once() + + +def test_run_uvicorn_uses_config_host(): + """_run_uvicorn: passes config.HTTP_HOST to uvicorn.Config.""" + with patch("uvicorn.Server") as mock_server_class: + mock_server = MagicMock() + mock_server.serve = MagicMock(return_value=asyncio.sleep(0)) + mock_server_class.return_value = mock_server + with patch("duty_teller.run.config") as mock_cfg: + mock_cfg.HTTP_HOST = "127.0.0.1" + _run_uvicorn(MagicMock(), 8080) + call_kwargs = mock_server_class.call_args[0][0] + assert call_kwargs.host == "127.0.0.1" + assert call_kwargs.port == 8080