diff --git a/duty_teller/api/app.py b/duty_teller/api/app.py index ea029b1..5e1d6d8 100644 --- a/duty_teller/api/app.py +++ b/duty_teller/api/app.py @@ -153,7 +153,9 @@ def app_config_js() -> Response: log_level = _safe_js_string(config.LOG_LEVEL_STR.lower(), _VALID_LOG_LEVELS, "info") tz = _safe_tz_string(config.DUTY_DISPLAY_TZ) tz_js = f'\nwindow.__DT_TZ = "{tz}";' if tz else "\nwindow.__DT_TZ = undefined;" - body = f'window.__DT_LANG = "{lang}";\nwindow.__DT_LOG_LEVEL = "{log_level}";{tz_js}' + body = ( + f'window.__DT_LANG = "{lang}";\nwindow.__DT_LOG_LEVEL = "{log_level}";{tz_js}' + ) return Response( content=body, media_type="application/javascript; charset=utf-8", @@ -281,7 +283,6 @@ def get_personal_calendar_ical( ) - webapp_path = config.PROJECT_ROOT / "webapp-next" / "out" if webapp_path.is_dir(): app.mount("/app", StaticFiles(directory=str(webapp_path), html=True), name="webapp") diff --git a/duty_teller/config.py b/duty_teller/config.py index 49fb88e..a6abb30 100644 --- a/duty_teller/config.py +++ b/duty_teller/config.py @@ -172,7 +172,9 @@ class Settings: database_url=database_url, bot_username=bot_username, mini_app_base_url=os.getenv("MINI_APP_BASE_URL", "").rstrip("/"), - mini_app_short_name=(os.getenv("MINI_APP_SHORT_NAME", "") or "").strip().strip("/"), + mini_app_short_name=(os.getenv("MINI_APP_SHORT_NAME", "") or "") + .strip() + .strip("/"), http_host=http_host, http_port=http_port, allowed_usernames=allowed, diff --git a/duty_teller/handlers/group_duty_pin.py b/duty_teller/handlers/group_duty_pin.py index 9177e1c..b0cc50e 100644 --- a/duty_teller/handlers/group_duty_pin.py +++ b/duty_teller/handlers/group_duty_pin.py @@ -2,7 +2,8 @@ import asyncio import logging -from datetime import datetime, timezone +import random +from datetime import datetime, timedelta, timezone from typing import Literal import duty_teller.config as config @@ -29,6 +30,10 @@ from duty_teller.services.group_duty_pin_service import ( logger = logging.getLogger(__name__) +# Per-chat locks to prevent concurrent refresh for the same chat (avoids duplicate messages). +_refresh_locks: dict[int, asyncio.Lock] = {} +_lock_for_refresh_locks = asyncio.Lock() + JOB_NAME_PREFIX = "duty_pin_" RETRY_WHEN_NO_DUTY_MINUTES = 15 @@ -124,8 +129,12 @@ def _sync_untrust_group(chat_id: int) -> tuple[bool, int | None]: async def _schedule_next_update( - application, chat_id: int, when_utc: datetime | None + application, + chat_id: int, + when_utc: datetime | None, + jitter_seconds: float | None = None, ) -> None: + """Schedule the next pin refresh job. Optional jitter spreads jobs when scheduling many chats.""" job_queue = application.job_queue if job_queue is None: logger.warning("Job queue not available, cannot schedule pin update") @@ -136,8 +145,10 @@ async def _schedule_next_update( if when_utc is not None: now_utc = datetime.now(timezone.utc).replace(tzinfo=None) delay = when_utc - now_utc + if jitter_seconds is not None and jitter_seconds > 0: + delay += timedelta(seconds=random.uniform(0, jitter_seconds)) if delay.total_seconds() < 1: - delay = 1 + delay = timedelta(seconds=1) job_queue.run_once( update_group_pin, when=delay, @@ -146,8 +157,6 @@ async def _schedule_next_update( ) logger.info("Scheduled pin update for chat_id=%s at %s", chat_id, when_utc) else: - from datetime import timedelta - job_queue.run_once( update_group_pin, when=timedelta(minutes=RETRY_WHEN_NO_DUTY_MINUTES), @@ -168,11 +177,13 @@ async def _refresh_pin_for_chat( Uses single DB session for message_id, text, next_shift_end (consolidated). If the group is no longer trusted, removes pin record, job, and message; returns "untrusted". + Unpin is best-effort (e.g. if user already unpinned we still pin the new message and save state). + Per-chat lock prevents concurrent refresh for the same chat. Returns: "updated" if the message was sent, pinned and saved successfully; "no_message" if there is no pin record for this chat; - "failed" if send_message or permissions failed; + "failed" if send_message or pin failed; "untrusted" if the group was removed from trusted list (pin record and message cleaned up). """ loop = asyncio.get_running_loop() @@ -205,42 +216,59 @@ async def _refresh_pin_for_chat( logger.info("No pin record for chat_id=%s, skipping update", chat_id) return "no_message" old_message_id = message_id + + async with _lock_for_refresh_locks: + lock = _refresh_locks.setdefault(chat_id, asyncio.Lock()) try: - msg = await context.bot.send_message( - chat_id=chat_id, - text=text, - reply_markup=_get_contact_button_markup(config.DEFAULT_LANGUAGE), - ) - except (BadRequest, Forbidden) as e: - logger.warning( - "Failed to send duty message for pin refresh chat_id=%s: %s", chat_id, e - ) - await _schedule_next_update(context.application, chat_id, next_end) - return "failed" - try: - await context.bot.unpin_chat_message(chat_id=chat_id) - await context.bot.pin_chat_message( - chat_id=chat_id, - message_id=msg.message_id, - disable_notification=not config.DUTY_PIN_NOTIFY, - ) - except (BadRequest, Forbidden) as e: - logger.warning("Unpin or pin after refresh failed chat_id=%s: %s", chat_id, e) - await _schedule_next_update(context.application, chat_id, next_end) - return "failed" - await loop.run_in_executor(None, _sync_save_pin, chat_id, msg.message_id) - if old_message_id is not None: - try: - await context.bot.delete_message(chat_id=chat_id, message_id=old_message_id) - except (BadRequest, Forbidden) as e: - logger.warning( - "Could not delete old pinned message %s in chat_id=%s: %s", - old_message_id, - chat_id, - e, - ) - await _schedule_next_update(context.application, chat_id, next_end) - return "updated" + async with lock: + try: + msg = await context.bot.send_message( + chat_id=chat_id, + text=text, + reply_markup=_get_contact_button_markup(config.DEFAULT_LANGUAGE), + ) + except (BadRequest, Forbidden) as e: + logger.warning( + "Failed to send duty message for pin refresh chat_id=%s: %s", + chat_id, + e, + ) + await _schedule_next_update(context.application, chat_id, next_end) + return "failed" + try: + await context.bot.unpin_chat_message(chat_id=chat_id) + except (BadRequest, Forbidden) as e: + logger.debug( + "Unpin failed (e.g. no pinned message) chat_id=%s: %s", chat_id, e + ) + try: + await context.bot.pin_chat_message( + chat_id=chat_id, + message_id=msg.message_id, + disable_notification=not config.DUTY_PIN_NOTIFY, + ) + except (BadRequest, Forbidden) as e: + logger.warning("Pin after refresh failed chat_id=%s: %s", chat_id, e) + await _schedule_next_update(context.application, chat_id, next_end) + return "failed" + await loop.run_in_executor(None, _sync_save_pin, chat_id, msg.message_id) + if old_message_id is not None: + try: + await context.bot.delete_message( + chat_id=chat_id, message_id=old_message_id + ) + except (BadRequest, Forbidden) as e: + logger.warning( + "Could not delete old pinned message %s in chat_id=%s: %s", + old_message_id, + chat_id, + e, + ) + await _schedule_next_update(context.application, chat_id, next_end) + return "updated" + finally: + async with _lock_for_refresh_locks: + _refresh_locks.pop(chat_id, None) async def update_group_pin(context: ContextTypes.DEFAULT_TYPE) -> None: @@ -338,12 +366,15 @@ def _get_all_pin_chat_ids_sync() -> list[int]: async def restore_group_pin_jobs(application) -> None: - """Restore scheduled pin-update jobs for all chats that have a pinned message (on startup).""" + """Restore scheduled pin-update jobs for all chats that have a pinned message (on startup). + + Uses jitter (0–60 s) per chat to avoid thundering herd when many groups share the same shift end. + """ loop = asyncio.get_running_loop() chat_ids = await loop.run_in_executor(None, _get_all_pin_chat_ids_sync) next_end = await loop.run_in_executor(None, _get_next_shift_end_sync) for chat_id in chat_ids: - await _schedule_next_update(application, chat_id, next_end) + await _schedule_next_update(application, chat_id, next_end, jitter_seconds=60.0) logger.info("Restored %s group pin jobs", len(chat_ids)) @@ -407,6 +438,8 @@ async def pin_duty_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE) -> No message_id=message_id, disable_notification=True, ) + next_end = await loop.run_in_executor(None, _get_next_shift_end_sync) + await _schedule_next_update(context.application, chat_id, next_end) await update.message.reply_text(t(lang, "pin_duty.pinned")) except (BadRequest, Forbidden) as e: logger.warning("pin_duty failed chat_id=%s: %s", chat_id, e) diff --git a/tests/test_app.py b/tests/test_app.py index e786fdf..4d1c8de 100644 --- a/tests/test_app.py +++ b/tests/test_app.py @@ -1,7 +1,6 @@ """Tests for FastAPI app /api/duties.""" import time -from pathlib import Path from unittest.mock import ANY, patch import pytest diff --git a/tests/test_handlers_group_duty_pin.py b/tests/test_handlers_group_duty_pin.py index 0bd6284..b4109af 100644 --- a/tests/test_handlers_group_duty_pin.py +++ b/tests/test_handlers_group_duty_pin.py @@ -93,8 +93,9 @@ def test_get_contact_button_markup_returns_markup_when_username_set(): """_get_contact_button_markup: BOT_USERNAME set, no short name -> t.me bot link with startapp=duty.""" from telegram import InlineKeyboardMarkup - with patch.object(config, "BOT_USERNAME", "MyDutyBot"), patch.object( - config, "MINI_APP_SHORT_NAME", "" + with ( + patch.object(config, "BOT_USERNAME", "MyDutyBot"), + patch.object(config, "MINI_APP_SHORT_NAME", ""), ): with patch.object(mod, "t", return_value="View contacts"): result = mod._get_contact_button_markup("en") @@ -113,8 +114,9 @@ def test_get_contact_button_markup_with_short_name_uses_direct_miniapp_link(): """_get_contact_button_markup: MINI_APP_SHORT_NAME set -> direct Mini App URL with startapp=duty.""" from telegram import InlineKeyboardMarkup - with patch.object(config, "BOT_USERNAME", "MyDutyBot"), patch.object( - config, "MINI_APP_SHORT_NAME", "DutyApp" + with ( + patch.object(config, "BOT_USERNAME", "MyDutyBot"), + patch.object(config, "MINI_APP_SHORT_NAME", "DutyApp"), ): with patch.object(mod, "t", return_value="View contacts"): result = mod._get_contact_button_markup("en") @@ -294,8 +296,8 @@ async def test_update_group_pin_send_raises_no_unpin_pin_schedule_still_called() @pytest.mark.asyncio -async def test_update_group_pin_repin_raises_still_schedules_next(): - """update_group_pin: send_message ok, unpin or pin raises -> no _sync_save_pin, schedule still called, log.""" +async def test_update_group_pin_unpin_raises_pin_succeeds_saves_and_schedules(): + """update_group_pin: send_message ok, unpin raises (e.g. no pinned message), pin succeeds -> save_pin and schedule called.""" new_msg = MagicMock() new_msg.message_id = 888 context = MagicMock() @@ -304,9 +306,45 @@ async def test_update_group_pin_repin_raises_still_schedules_next(): context.bot = MagicMock() context.bot.send_message = AsyncMock(return_value=new_msg) context.bot.unpin_chat_message = AsyncMock( - side_effect=Forbidden("Not enough rights") + side_effect=BadRequest("Chat has no pinned message") ) context.bot.pin_chat_message = AsyncMock() + context.bot.delete_message = AsyncMock() + context.application = MagicMock() + + with patch.object(config, "DUTY_PIN_NOTIFY", True): + with patch.object(mod, "_sync_is_trusted", return_value=True): + with patch.object( + mod, "_sync_get_pin_refresh_data", return_value=(3, "Text", None) + ): + with patch.object( + mod, "_schedule_next_update", AsyncMock() + ) as mock_schedule: + with patch.object(mod, "_sync_save_pin") as mock_save: + await mod.update_group_pin(context) + context.bot.send_message.assert_called_once_with( + chat_id=222, text="Text", reply_markup=None + ) + context.bot.unpin_chat_message.assert_called_once_with(chat_id=222) + context.bot.pin_chat_message.assert_called_once_with( + chat_id=222, message_id=888, disable_notification=False + ) + mock_save.assert_called_once_with(222, 888) + mock_schedule.assert_called_once_with(context.application, 222, None) + + +@pytest.mark.asyncio +async def test_update_group_pin_pin_raises_no_save_still_schedules_next(): + """update_group_pin: send_message ok, unpin ok, pin raises -> no _sync_save_pin, schedule still called, log.""" + new_msg = MagicMock() + new_msg.message_id = 888 + context = MagicMock() + context.job = MagicMock() + context.job.data = {"chat_id": 222} + context.bot = MagicMock() + context.bot.send_message = AsyncMock(return_value=new_msg) + context.bot.unpin_chat_message = AsyncMock() + context.bot.pin_chat_message = AsyncMock(side_effect=Forbidden("Not enough rights")) context.application = MagicMock() with patch.object(config, "DUTY_PIN_NOTIFY", True): @@ -325,7 +363,7 @@ async def test_update_group_pin_repin_raises_still_schedules_next(): ) mock_save.assert_not_called() mock_logger.warning.assert_called_once() - assert "Unpin or pin" in mock_logger.warning.call_args[0][0] + assert "Pin after refresh failed" in mock_logger.warning.call_args[0][0] mock_schedule.assert_called_once_with(context.application, 222, None) @@ -388,7 +426,7 @@ async def test_pin_duty_cmd_group_only_reply(): @pytest.mark.asyncio async def test_pin_duty_cmd_group_pins_and_replies_pinned(): - """pin_duty_cmd in group with existing pin record -> pin and reply pinned.""" + """pin_duty_cmd in group with existing pin record -> pin, schedule next update, reply pinned.""" update = MagicMock() update.message = MagicMock() update.message.reply_text = AsyncMock() @@ -399,16 +437,22 @@ async def test_pin_duty_cmd_group_pins_and_replies_pinned(): context = MagicMock() context.bot = MagicMock() context.bot.pin_chat_message = AsyncMock() + context.application = MagicMock() with patch("duty_teller.handlers.group_duty_pin.get_lang", return_value="en"): with patch.object(mod, "_sync_is_trusted", return_value=True): with patch.object(mod, "_sync_get_message_id", return_value=5): - with patch("duty_teller.handlers.group_duty_pin.t") as mock_t: - mock_t.return_value = "Pinned" - await mod.pin_duty_cmd(update, context) + with patch.object(mod, "_get_next_shift_end_sync", return_value=None): + with patch.object( + mod, "_schedule_next_update", AsyncMock() + ) as mock_schedule: + with patch("duty_teller.handlers.group_duty_pin.t") as mock_t: + mock_t.return_value = "Pinned" + await mod.pin_duty_cmd(update, context) context.bot.pin_chat_message.assert_called_once_with( chat_id=100, message_id=5, disable_notification=True ) + mock_schedule.assert_called_once_with(context.application, 100, None) update.message.reply_text.assert_called_once_with("Pinned") @@ -908,8 +952,8 @@ async def test_my_chat_member_handler_bot_removed_deletes_pin_and_jobs(): @pytest.mark.asyncio -async def test_restore_group_pin_jobs_calls_schedule_for_each_chat(): - """restore_group_pin_jobs: for each chat_id from _get_all_pin_chat_ids_sync, calls _schedule_next_update.""" +async def test_restore_group_pin_jobs_calls_schedule_for_each_chat_with_jitter(): + """restore_group_pin_jobs: for each chat_id calls _schedule_next_update with jitter_seconds=60.""" application = MagicMock() application.job_queue = MagicMock() application.job_queue.get_jobs_by_name = MagicMock(return_value=[]) @@ -922,8 +966,8 @@ async def test_restore_group_pin_jobs_calls_schedule_for_each_chat(): ) as mock_schedule: await mod.restore_group_pin_jobs(application) assert mock_schedule.call_count == 2 - mock_schedule.assert_any_call(application, 10, None) - mock_schedule.assert_any_call(application, 20, None) + mock_schedule.assert_any_call(application, 10, None, jitter_seconds=60.0) + mock_schedule.assert_any_call(application, 20, None, jitter_seconds=60.0) # --- _refresh_pin_for_chat untrusted ---