refactor: improve code readability and structure in various components

- Refactored the `mini_app_short_name` assignment in `config.py` for better clarity.
- Enhanced the `app_config_js` function in `app.py` to improve formatting of the JavaScript response body.
- Added per-chat locks in `group_duty_pin.py` to prevent concurrent refreshes, improving message handling.
- Updated `_schedule_next_update` to include optional jitter for scheduling, enhancing performance during high-load scenarios.
- Cleaned up test files by removing unused imports and improving test descriptions for clarity.
This commit is contained in:
2026-03-03 17:52:23 +03:00
parent 37218a436a
commit 87e8417675
5 changed files with 142 additions and 63 deletions

View File

@@ -153,7 +153,9 @@ def app_config_js() -> Response:
log_level = _safe_js_string(config.LOG_LEVEL_STR.lower(), _VALID_LOG_LEVELS, "info") log_level = _safe_js_string(config.LOG_LEVEL_STR.lower(), _VALID_LOG_LEVELS, "info")
tz = _safe_tz_string(config.DUTY_DISPLAY_TZ) tz = _safe_tz_string(config.DUTY_DISPLAY_TZ)
tz_js = f'\nwindow.__DT_TZ = "{tz}";' if tz else "\nwindow.__DT_TZ = undefined;" tz_js = f'\nwindow.__DT_TZ = "{tz}";' if tz else "\nwindow.__DT_TZ = undefined;"
body = f'window.__DT_LANG = "{lang}";\nwindow.__DT_LOG_LEVEL = "{log_level}";{tz_js}' body = (
f'window.__DT_LANG = "{lang}";\nwindow.__DT_LOG_LEVEL = "{log_level}";{tz_js}'
)
return Response( return Response(
content=body, content=body,
media_type="application/javascript; charset=utf-8", media_type="application/javascript; charset=utf-8",
@@ -281,7 +283,6 @@ def get_personal_calendar_ical(
) )
webapp_path = config.PROJECT_ROOT / "webapp-next" / "out" webapp_path = config.PROJECT_ROOT / "webapp-next" / "out"
if webapp_path.is_dir(): if webapp_path.is_dir():
app.mount("/app", StaticFiles(directory=str(webapp_path), html=True), name="webapp") app.mount("/app", StaticFiles(directory=str(webapp_path), html=True), name="webapp")

View File

@@ -172,7 +172,9 @@ class Settings:
database_url=database_url, database_url=database_url,
bot_username=bot_username, bot_username=bot_username,
mini_app_base_url=os.getenv("MINI_APP_BASE_URL", "").rstrip("/"), mini_app_base_url=os.getenv("MINI_APP_BASE_URL", "").rstrip("/"),
mini_app_short_name=(os.getenv("MINI_APP_SHORT_NAME", "") or "").strip().strip("/"), mini_app_short_name=(os.getenv("MINI_APP_SHORT_NAME", "") or "")
.strip()
.strip("/"),
http_host=http_host, http_host=http_host,
http_port=http_port, http_port=http_port,
allowed_usernames=allowed, allowed_usernames=allowed,

View File

@@ -2,7 +2,8 @@
import asyncio import asyncio
import logging import logging
from datetime import datetime, timezone import random
from datetime import datetime, timedelta, timezone
from typing import Literal from typing import Literal
import duty_teller.config as config import duty_teller.config as config
@@ -29,6 +30,10 @@ from duty_teller.services.group_duty_pin_service import (
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Per-chat locks to prevent concurrent refresh for the same chat (avoids duplicate messages).
_refresh_locks: dict[int, asyncio.Lock] = {}
_lock_for_refresh_locks = asyncio.Lock()
JOB_NAME_PREFIX = "duty_pin_" JOB_NAME_PREFIX = "duty_pin_"
RETRY_WHEN_NO_DUTY_MINUTES = 15 RETRY_WHEN_NO_DUTY_MINUTES = 15
@@ -124,8 +129,12 @@ def _sync_untrust_group(chat_id: int) -> tuple[bool, int | None]:
async def _schedule_next_update( async def _schedule_next_update(
application, chat_id: int, when_utc: datetime | None application,
chat_id: int,
when_utc: datetime | None,
jitter_seconds: float | None = None,
) -> None: ) -> None:
"""Schedule the next pin refresh job. Optional jitter spreads jobs when scheduling many chats."""
job_queue = application.job_queue job_queue = application.job_queue
if job_queue is None: if job_queue is None:
logger.warning("Job queue not available, cannot schedule pin update") logger.warning("Job queue not available, cannot schedule pin update")
@@ -136,8 +145,10 @@ async def _schedule_next_update(
if when_utc is not None: if when_utc is not None:
now_utc = datetime.now(timezone.utc).replace(tzinfo=None) now_utc = datetime.now(timezone.utc).replace(tzinfo=None)
delay = when_utc - now_utc delay = when_utc - now_utc
if jitter_seconds is not None and jitter_seconds > 0:
delay += timedelta(seconds=random.uniform(0, jitter_seconds))
if delay.total_seconds() < 1: if delay.total_seconds() < 1:
delay = 1 delay = timedelta(seconds=1)
job_queue.run_once( job_queue.run_once(
update_group_pin, update_group_pin,
when=delay, when=delay,
@@ -146,8 +157,6 @@ async def _schedule_next_update(
) )
logger.info("Scheduled pin update for chat_id=%s at %s", chat_id, when_utc) logger.info("Scheduled pin update for chat_id=%s at %s", chat_id, when_utc)
else: else:
from datetime import timedelta
job_queue.run_once( job_queue.run_once(
update_group_pin, update_group_pin,
when=timedelta(minutes=RETRY_WHEN_NO_DUTY_MINUTES), when=timedelta(minutes=RETRY_WHEN_NO_DUTY_MINUTES),
@@ -168,11 +177,13 @@ async def _refresh_pin_for_chat(
Uses single DB session for message_id, text, next_shift_end (consolidated). Uses single DB session for message_id, text, next_shift_end (consolidated).
If the group is no longer trusted, removes pin record, job, and message; returns "untrusted". If the group is no longer trusted, removes pin record, job, and message; returns "untrusted".
Unpin is best-effort (e.g. if user already unpinned we still pin the new message and save state).
Per-chat lock prevents concurrent refresh for the same chat.
Returns: Returns:
"updated" if the message was sent, pinned and saved successfully; "updated" if the message was sent, pinned and saved successfully;
"no_message" if there is no pin record for this chat; "no_message" if there is no pin record for this chat;
"failed" if send_message or permissions failed; "failed" if send_message or pin failed;
"untrusted" if the group was removed from trusted list (pin record and message cleaned up). "untrusted" if the group was removed from trusted list (pin record and message cleaned up).
""" """
loop = asyncio.get_running_loop() loop = asyncio.get_running_loop()
@@ -205,42 +216,59 @@ async def _refresh_pin_for_chat(
logger.info("No pin record for chat_id=%s, skipping update", chat_id) logger.info("No pin record for chat_id=%s, skipping update", chat_id)
return "no_message" return "no_message"
old_message_id = message_id old_message_id = message_id
async with _lock_for_refresh_locks:
lock = _refresh_locks.setdefault(chat_id, asyncio.Lock())
try: try:
msg = await context.bot.send_message( async with lock:
chat_id=chat_id, try:
text=text, msg = await context.bot.send_message(
reply_markup=_get_contact_button_markup(config.DEFAULT_LANGUAGE), chat_id=chat_id,
) text=text,
except (BadRequest, Forbidden) as e: reply_markup=_get_contact_button_markup(config.DEFAULT_LANGUAGE),
logger.warning( )
"Failed to send duty message for pin refresh chat_id=%s: %s", chat_id, e except (BadRequest, Forbidden) as e:
) logger.warning(
await _schedule_next_update(context.application, chat_id, next_end) "Failed to send duty message for pin refresh chat_id=%s: %s",
return "failed" chat_id,
try: e,
await context.bot.unpin_chat_message(chat_id=chat_id) )
await context.bot.pin_chat_message( await _schedule_next_update(context.application, chat_id, next_end)
chat_id=chat_id, return "failed"
message_id=msg.message_id, try:
disable_notification=not config.DUTY_PIN_NOTIFY, await context.bot.unpin_chat_message(chat_id=chat_id)
) except (BadRequest, Forbidden) as e:
except (BadRequest, Forbidden) as e: logger.debug(
logger.warning("Unpin or pin after refresh failed chat_id=%s: %s", chat_id, e) "Unpin failed (e.g. no pinned message) chat_id=%s: %s", chat_id, e
await _schedule_next_update(context.application, chat_id, next_end) )
return "failed" try:
await loop.run_in_executor(None, _sync_save_pin, chat_id, msg.message_id) await context.bot.pin_chat_message(
if old_message_id is not None: chat_id=chat_id,
try: message_id=msg.message_id,
await context.bot.delete_message(chat_id=chat_id, message_id=old_message_id) disable_notification=not config.DUTY_PIN_NOTIFY,
except (BadRequest, Forbidden) as e: )
logger.warning( except (BadRequest, Forbidden) as e:
"Could not delete old pinned message %s in chat_id=%s: %s", logger.warning("Pin after refresh failed chat_id=%s: %s", chat_id, e)
old_message_id, await _schedule_next_update(context.application, chat_id, next_end)
chat_id, return "failed"
e, await loop.run_in_executor(None, _sync_save_pin, chat_id, msg.message_id)
) if old_message_id is not None:
await _schedule_next_update(context.application, chat_id, next_end) try:
return "updated" await context.bot.delete_message(
chat_id=chat_id, message_id=old_message_id
)
except (BadRequest, Forbidden) as e:
logger.warning(
"Could not delete old pinned message %s in chat_id=%s: %s",
old_message_id,
chat_id,
e,
)
await _schedule_next_update(context.application, chat_id, next_end)
return "updated"
finally:
async with _lock_for_refresh_locks:
_refresh_locks.pop(chat_id, None)
async def update_group_pin(context: ContextTypes.DEFAULT_TYPE) -> None: async def update_group_pin(context: ContextTypes.DEFAULT_TYPE) -> None:
@@ -338,12 +366,15 @@ def _get_all_pin_chat_ids_sync() -> list[int]:
async def restore_group_pin_jobs(application) -> None: async def restore_group_pin_jobs(application) -> None:
"""Restore scheduled pin-update jobs for all chats that have a pinned message (on startup).""" """Restore scheduled pin-update jobs for all chats that have a pinned message (on startup).
Uses jitter (060 s) per chat to avoid thundering herd when many groups share the same shift end.
"""
loop = asyncio.get_running_loop() loop = asyncio.get_running_loop()
chat_ids = await loop.run_in_executor(None, _get_all_pin_chat_ids_sync) chat_ids = await loop.run_in_executor(None, _get_all_pin_chat_ids_sync)
next_end = await loop.run_in_executor(None, _get_next_shift_end_sync) next_end = await loop.run_in_executor(None, _get_next_shift_end_sync)
for chat_id in chat_ids: for chat_id in chat_ids:
await _schedule_next_update(application, chat_id, next_end) await _schedule_next_update(application, chat_id, next_end, jitter_seconds=60.0)
logger.info("Restored %s group pin jobs", len(chat_ids)) logger.info("Restored %s group pin jobs", len(chat_ids))
@@ -407,6 +438,8 @@ async def pin_duty_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE) -> No
message_id=message_id, message_id=message_id,
disable_notification=True, disable_notification=True,
) )
next_end = await loop.run_in_executor(None, _get_next_shift_end_sync)
await _schedule_next_update(context.application, chat_id, next_end)
await update.message.reply_text(t(lang, "pin_duty.pinned")) await update.message.reply_text(t(lang, "pin_duty.pinned"))
except (BadRequest, Forbidden) as e: except (BadRequest, Forbidden) as e:
logger.warning("pin_duty failed chat_id=%s: %s", chat_id, e) logger.warning("pin_duty failed chat_id=%s: %s", chat_id, e)

View File

@@ -1,7 +1,6 @@
"""Tests for FastAPI app /api/duties.""" """Tests for FastAPI app /api/duties."""
import time import time
from pathlib import Path
from unittest.mock import ANY, patch from unittest.mock import ANY, patch
import pytest import pytest

View File

@@ -93,8 +93,9 @@ def test_get_contact_button_markup_returns_markup_when_username_set():
"""_get_contact_button_markup: BOT_USERNAME set, no short name -> t.me bot link with startapp=duty.""" """_get_contact_button_markup: BOT_USERNAME set, no short name -> t.me bot link with startapp=duty."""
from telegram import InlineKeyboardMarkup from telegram import InlineKeyboardMarkup
with patch.object(config, "BOT_USERNAME", "MyDutyBot"), patch.object( with (
config, "MINI_APP_SHORT_NAME", "" patch.object(config, "BOT_USERNAME", "MyDutyBot"),
patch.object(config, "MINI_APP_SHORT_NAME", ""),
): ):
with patch.object(mod, "t", return_value="View contacts"): with patch.object(mod, "t", return_value="View contacts"):
result = mod._get_contact_button_markup("en") result = mod._get_contact_button_markup("en")
@@ -113,8 +114,9 @@ def test_get_contact_button_markup_with_short_name_uses_direct_miniapp_link():
"""_get_contact_button_markup: MINI_APP_SHORT_NAME set -> direct Mini App URL with startapp=duty.""" """_get_contact_button_markup: MINI_APP_SHORT_NAME set -> direct Mini App URL with startapp=duty."""
from telegram import InlineKeyboardMarkup from telegram import InlineKeyboardMarkup
with patch.object(config, "BOT_USERNAME", "MyDutyBot"), patch.object( with (
config, "MINI_APP_SHORT_NAME", "DutyApp" patch.object(config, "BOT_USERNAME", "MyDutyBot"),
patch.object(config, "MINI_APP_SHORT_NAME", "DutyApp"),
): ):
with patch.object(mod, "t", return_value="View contacts"): with patch.object(mod, "t", return_value="View contacts"):
result = mod._get_contact_button_markup("en") result = mod._get_contact_button_markup("en")
@@ -294,8 +296,8 @@ async def test_update_group_pin_send_raises_no_unpin_pin_schedule_still_called()
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_update_group_pin_repin_raises_still_schedules_next(): async def test_update_group_pin_unpin_raises_pin_succeeds_saves_and_schedules():
"""update_group_pin: send_message ok, unpin or pin raises -> no _sync_save_pin, schedule still called, log.""" """update_group_pin: send_message ok, unpin raises (e.g. no pinned message), pin succeeds -> save_pin and schedule called."""
new_msg = MagicMock() new_msg = MagicMock()
new_msg.message_id = 888 new_msg.message_id = 888
context = MagicMock() context = MagicMock()
@@ -304,9 +306,45 @@ async def test_update_group_pin_repin_raises_still_schedules_next():
context.bot = MagicMock() context.bot = MagicMock()
context.bot.send_message = AsyncMock(return_value=new_msg) context.bot.send_message = AsyncMock(return_value=new_msg)
context.bot.unpin_chat_message = AsyncMock( context.bot.unpin_chat_message = AsyncMock(
side_effect=Forbidden("Not enough rights") side_effect=BadRequest("Chat has no pinned message")
) )
context.bot.pin_chat_message = AsyncMock() context.bot.pin_chat_message = AsyncMock()
context.bot.delete_message = AsyncMock()
context.application = MagicMock()
with patch.object(config, "DUTY_PIN_NOTIFY", True):
with patch.object(mod, "_sync_is_trusted", return_value=True):
with patch.object(
mod, "_sync_get_pin_refresh_data", return_value=(3, "Text", None)
):
with patch.object(
mod, "_schedule_next_update", AsyncMock()
) as mock_schedule:
with patch.object(mod, "_sync_save_pin") as mock_save:
await mod.update_group_pin(context)
context.bot.send_message.assert_called_once_with(
chat_id=222, text="Text", reply_markup=None
)
context.bot.unpin_chat_message.assert_called_once_with(chat_id=222)
context.bot.pin_chat_message.assert_called_once_with(
chat_id=222, message_id=888, disable_notification=False
)
mock_save.assert_called_once_with(222, 888)
mock_schedule.assert_called_once_with(context.application, 222, None)
@pytest.mark.asyncio
async def test_update_group_pin_pin_raises_no_save_still_schedules_next():
"""update_group_pin: send_message ok, unpin ok, pin raises -> no _sync_save_pin, schedule still called, log."""
new_msg = MagicMock()
new_msg.message_id = 888
context = MagicMock()
context.job = MagicMock()
context.job.data = {"chat_id": 222}
context.bot = MagicMock()
context.bot.send_message = AsyncMock(return_value=new_msg)
context.bot.unpin_chat_message = AsyncMock()
context.bot.pin_chat_message = AsyncMock(side_effect=Forbidden("Not enough rights"))
context.application = MagicMock() context.application = MagicMock()
with patch.object(config, "DUTY_PIN_NOTIFY", True): with patch.object(config, "DUTY_PIN_NOTIFY", True):
@@ -325,7 +363,7 @@ async def test_update_group_pin_repin_raises_still_schedules_next():
) )
mock_save.assert_not_called() mock_save.assert_not_called()
mock_logger.warning.assert_called_once() mock_logger.warning.assert_called_once()
assert "Unpin or pin" in mock_logger.warning.call_args[0][0] assert "Pin after refresh failed" in mock_logger.warning.call_args[0][0]
mock_schedule.assert_called_once_with(context.application, 222, None) mock_schedule.assert_called_once_with(context.application, 222, None)
@@ -388,7 +426,7 @@ async def test_pin_duty_cmd_group_only_reply():
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_pin_duty_cmd_group_pins_and_replies_pinned(): async def test_pin_duty_cmd_group_pins_and_replies_pinned():
"""pin_duty_cmd in group with existing pin record -> pin and reply pinned.""" """pin_duty_cmd in group with existing pin record -> pin, schedule next update, reply pinned."""
update = MagicMock() update = MagicMock()
update.message = MagicMock() update.message = MagicMock()
update.message.reply_text = AsyncMock() update.message.reply_text = AsyncMock()
@@ -399,16 +437,22 @@ async def test_pin_duty_cmd_group_pins_and_replies_pinned():
context = MagicMock() context = MagicMock()
context.bot = MagicMock() context.bot = MagicMock()
context.bot.pin_chat_message = AsyncMock() context.bot.pin_chat_message = AsyncMock()
context.application = MagicMock()
with patch("duty_teller.handlers.group_duty_pin.get_lang", return_value="en"): with patch("duty_teller.handlers.group_duty_pin.get_lang", return_value="en"):
with patch.object(mod, "_sync_is_trusted", return_value=True): with patch.object(mod, "_sync_is_trusted", return_value=True):
with patch.object(mod, "_sync_get_message_id", return_value=5): with patch.object(mod, "_sync_get_message_id", return_value=5):
with patch("duty_teller.handlers.group_duty_pin.t") as mock_t: with patch.object(mod, "_get_next_shift_end_sync", return_value=None):
mock_t.return_value = "Pinned" with patch.object(
await mod.pin_duty_cmd(update, context) mod, "_schedule_next_update", AsyncMock()
) as mock_schedule:
with patch("duty_teller.handlers.group_duty_pin.t") as mock_t:
mock_t.return_value = "Pinned"
await mod.pin_duty_cmd(update, context)
context.bot.pin_chat_message.assert_called_once_with( context.bot.pin_chat_message.assert_called_once_with(
chat_id=100, message_id=5, disable_notification=True chat_id=100, message_id=5, disable_notification=True
) )
mock_schedule.assert_called_once_with(context.application, 100, None)
update.message.reply_text.assert_called_once_with("Pinned") update.message.reply_text.assert_called_once_with("Pinned")
@@ -908,8 +952,8 @@ async def test_my_chat_member_handler_bot_removed_deletes_pin_and_jobs():
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_restore_group_pin_jobs_calls_schedule_for_each_chat(): async def test_restore_group_pin_jobs_calls_schedule_for_each_chat_with_jitter():
"""restore_group_pin_jobs: for each chat_id from _get_all_pin_chat_ids_sync, calls _schedule_next_update.""" """restore_group_pin_jobs: for each chat_id calls _schedule_next_update with jitter_seconds=60."""
application = MagicMock() application = MagicMock()
application.job_queue = MagicMock() application.job_queue = MagicMock()
application.job_queue.get_jobs_by_name = MagicMock(return_value=[]) application.job_queue.get_jobs_by_name = MagicMock(return_value=[])
@@ -922,8 +966,8 @@ async def test_restore_group_pin_jobs_calls_schedule_for_each_chat():
) as mock_schedule: ) as mock_schedule:
await mod.restore_group_pin_jobs(application) await mod.restore_group_pin_jobs(application)
assert mock_schedule.call_count == 2 assert mock_schedule.call_count == 2
mock_schedule.assert_any_call(application, 10, None) mock_schedule.assert_any_call(application, 10, None, jitter_seconds=60.0)
mock_schedule.assert_any_call(application, 20, None) mock_schedule.assert_any_call(application, 20, None, jitter_seconds=60.0)
# --- _refresh_pin_for_chat untrusted --- # --- _refresh_pin_for_chat untrusted ---