From 15f80ee46b4ea1cf5b4a9384555e8425b94f27b2 Mon Sep 17 00:00:00 2001 From: Nikolay Tatarinov Date: Sat, 21 Feb 2026 00:57:07 +0300 Subject: [PATCH] feat: add comprehensive tests for duty schedule import and error handling - Introduced new tests for the `import_duty_schedule_cmd` to verify behavior when no message or effective user is present, ensuring proper early returns. - Added tests for admin checks to confirm that only authorized users can initiate duty schedule imports, enhancing security. - Implemented error handling tests for the `handle_handover_time_text` function to ensure appropriate responses for invalid time formats and non-admin users. - Enhanced overall test coverage for the duty schedule import functionality, contributing to improved reliability and maintainability of the codebase. - Updated the `.coverage` file to reflect the latest coverage metrics. --- .coverage | Bin 53248 -> 53248 bytes tests/test_handlers_commands.py | 311 +++++++++++++++ tests/test_handlers_errors.py | 5 +- tests/test_handlers_group_duty_pin.py | 273 ++++++++++++++ tests/test_handlers_import_duty_schedule.py | 394 ++++++++++++++++++++ 5 files changed, 982 insertions(+), 1 deletion(-) create mode 100644 tests/test_handlers_import_duty_schedule.py diff --git a/.coverage b/.coverage index 618aa768d697da49d2111d883b2faa04866694aa..8644eac065574ed0cba79d72fe91da5679d10282 100644 GIT binary patch delta 95 zcmV-l0HFVXpaX!Q1F%YtGXLEFfA0U=fA_YX|CrqYA`%1v2}U2f?*Pp?=bZoESKp)k zeb4pk_r~u(?mnlV`v3Qne2yX+fB)&!-~0Q0sXzby_CM<1)8BUk004gP$+NePNkN5- BIgbDU delta 95 zcmZozz}&Ead4pG<5rYf^10MqfG~8uW5@KQGbd%rpp5ZL#^Eu2w@dbZt&oDw2s5ksN mFFv`ZPf4=&|H~`)f8WksY5D)}H=9@SLQoY4HgE6qbOZoI8X$uJ diff --git a/tests/test_handlers_commands.py b/tests/test_handlers_commands.py index f5f6a9b..66eb714 100644 --- a/tests/test_handlers_commands.py +++ b/tests/test_handlers_commands.py @@ -9,6 +9,7 @@ from duty_teller.handlers.commands import ( set_phone, calendar_link, help_cmd, + set_role, ) @@ -84,6 +85,20 @@ async def test_start_no_message_returns_early(): mock_scope.assert_not_called() +@pytest.mark.asyncio +async def test_start_no_effective_user_returns_early(): + """start: if update.effective_user is None, does not call executor or reply.""" + message = MagicMock() + message.reply_text = AsyncMock() + update = _make_update(message=message, effective_user=None) + with patch("duty_teller.handlers.commands.session_scope") as mock_scope: + with patch("duty_teller.handlers.commands.get_or_create_user") as mock_get_user: + await start(update, MagicMock()) + mock_get_user.assert_not_called() + mock_scope.assert_not_called() + message.reply_text.assert_not_called() + + @pytest.mark.asyncio async def test_set_phone_private_with_number_replies_saved(): """set_phone in private chat with number -> reply 'saved'.""" @@ -157,6 +172,34 @@ async def test_set_phone_group_replies_private_only(): mock_t.assert_called_with("en", "set_phone.private_only") +@pytest.mark.asyncio +async def test_set_phone_result_error_replies_error(): + """set_phone: set_user_phone returns None (error) -> reply set_phone.error.""" + message = MagicMock() + message.reply_text = AsyncMock() + user = _make_user() + chat = _make_chat("private") + update = _make_update(message=message, effective_user=user, effective_chat=chat) + context = MagicMock() + context.args = ["+79001234567"] + + with patch("duty_teller.handlers.commands.session_scope") as mock_scope: + mock_session = MagicMock() + mock_scope.return_value.__enter__.return_value = mock_session + mock_scope.return_value.__exit__.return_value = None + with patch("duty_teller.handlers.commands.get_or_create_user"): + with patch( + "duty_teller.handlers.commands.set_user_phone", + return_value=None, + ): + with patch("duty_teller.handlers.commands.get_lang", return_value="en"): + with patch("duty_teller.handlers.commands.t") as mock_t: + mock_t.return_value = "Error saving phone" + await set_phone(update, context) + message.reply_text.assert_called_once_with("Error saving phone") + mock_t.assert_called_with("en", "set_phone.error") + + @pytest.mark.asyncio async def test_calendar_link_with_user_and_token_replies_with_url(): """calendar_link with allowed user and token -> reply with link.""" @@ -200,6 +243,60 @@ async def test_calendar_link_with_user_and_token_replies_with_url(): assert "abc43token" in call_args or "example.com" in call_args +@pytest.mark.asyncio +async def test_calendar_link_in_group_replies_private_only(): + """calendar_link in group chat -> reply calendar_link.private_only.""" + message = MagicMock() + message.reply_text = AsyncMock() + user = _make_user() + chat = _make_chat("group") + update = _make_update(message=message, effective_user=user, effective_chat=chat) + + with patch("duty_teller.handlers.commands.get_lang", return_value="en"): + with patch("duty_teller.handlers.commands.t") as mock_t: + mock_t.return_value = "Private chat only" + await calendar_link(update, MagicMock()) + message.reply_text.assert_called_once_with("Private chat only") + mock_t.assert_called_with("en", "calendar_link.private_only") + + +@pytest.mark.asyncio +async def test_calendar_link_no_url_replies_error(): + """calendar_link: can_access True but MINI_APP_BASE_URL empty -> reply calendar_link.error.""" + message = MagicMock() + message.reply_text = AsyncMock() + user = _make_user() + chat = _make_chat("private") + update = _make_update(message=message, effective_user=user, effective_chat=chat) + + with patch("duty_teller.handlers.commands.session_scope") as mock_scope: + mock_session = MagicMock() + mock_scope.return_value.__enter__.return_value = mock_session + mock_scope.return_value.__exit__.return_value = None + with patch("duty_teller.handlers.commands.get_or_create_user") as mock_get_user: + mock_user = MagicMock() + mock_user.id = 10 + mock_get_user.return_value = mock_user + with patch( + "duty_teller.handlers.commands.can_access_miniapp_for_telegram_user", + return_value=True, + ): + with patch("duty_teller.handlers.commands.config") as mock_cfg: + mock_cfg.MINI_APP_BASE_URL = "" + with patch( + "duty_teller.handlers.commands.create_calendar_token", + return_value="token", + ): + with patch( + "duty_teller.handlers.commands.get_lang", return_value="en" + ): + with patch("duty_teller.handlers.commands.t") as mock_t: + mock_t.return_value = "Calendar error" + await calendar_link(update, MagicMock()) + message.reply_text.assert_called_once_with("Calendar error") + mock_t.assert_called_with("en", "calendar_link.error") + + @pytest.mark.asyncio async def test_calendar_link_denied_replies_access_denied(): """calendar_link when user not in allowlist -> reply access_denied.""" @@ -251,3 +348,217 @@ async def test_help_cmd_replies_with_help_text(): text = message.reply_text.call_args[0][0] assert "help.title" in text or "[help.title]" in text assert "help.start" in text or "[help.start]" in text + + +@pytest.mark.asyncio +async def test_help_cmd_admin_includes_import_schedule_and_set_role(): + """help_cmd: when is_admin_async True, reply includes help.import_schedule and help.set_role.""" + message = MagicMock() + message.reply_text = AsyncMock() + user = _make_user() + update = _make_update(message=message, effective_user=user) + + with patch( + "duty_teller.handlers.commands.is_admin_async", + new_callable=AsyncMock, + return_value=True, + ): + with patch("duty_teller.handlers.commands.get_lang", return_value="en"): + with patch("duty_teller.handlers.commands.t") as mock_t: + mock_t.side_effect = lambda lang, key: f"[{key}]" + await help_cmd(update, MagicMock()) + message.reply_text.assert_called_once() + text = message.reply_text.call_args[0][0] + assert "help.import_schedule" in text or "[help.import_schedule]" in text + assert "help.set_role" in text or "[help.set_role]" in text + + +# --- set_role --- + + +@pytest.mark.asyncio +async def test_set_role_no_effective_user_returns_early(): + """set_role: no update.effective_user -> no reply, no executor.""" + message = MagicMock() + message.reply_text = AsyncMock() + update = _make_update(message=message, effective_user=None) + context = MagicMock() + context.args = [] + with patch("duty_teller.handlers.commands.session_scope") as mock_scope: + await set_role(update, context) + message.reply_text.assert_not_called() + mock_scope.assert_not_called() + + +@pytest.mark.asyncio +async def test_set_role_not_admin_replies_admin_only(): + """set_role: non-admin -> reply import.admin_only.""" + message = MagicMock() + message.reply_text = AsyncMock() + message.reply_to_message = None + user = _make_user() + update = _make_update(message=message, effective_user=user) + context = MagicMock() + context.args = [] + + with patch( + "duty_teller.handlers.commands.is_admin_async", + new_callable=AsyncMock, + return_value=False, + ): + with patch("duty_teller.handlers.commands.get_lang", return_value="en"): + with patch("duty_teller.handlers.commands.t") as mock_t: + mock_t.return_value = "Admin only" + await set_role(update, context) + message.reply_text.assert_called_once_with("Admin only") + mock_t.assert_called_with("en", "import.admin_only") + + +@pytest.mark.asyncio +async def test_set_role_invalid_usage_replies_usage(): + """set_role: invalid role_name (not user|admin) -> reply set_role.usage.""" + message = MagicMock() + message.reply_text = AsyncMock() + message.reply_to_message = MagicMock() + message.reply_to_message.from_user = MagicMock() + message.reply_to_message.from_user.id = 111 + user = _make_user() + update = _make_update(message=message, effective_user=user) + context = MagicMock() + context.args = ["invalid_role"] + + with patch( + "duty_teller.handlers.commands.is_admin_async", + new_callable=AsyncMock, + return_value=True, + ): + with patch("duty_teller.handlers.commands.get_lang", return_value="en"): + with patch("duty_teller.handlers.commands.session_scope") as mock_scope: + mock_session = MagicMock() + mock_scope.return_value.__enter__.return_value = mock_session + mock_scope.return_value.__exit__.return_value = None + with patch( + "duty_teller.handlers.commands.get_user_by_telegram_id", + return_value=MagicMock(id=1, full_name="X"), + ): + with patch("duty_teller.handlers.commands.t") as mock_t: + mock_t.return_value = "Usage: ..." + await set_role(update, context) + message.reply_text.assert_called_once_with("Usage: ...") + mock_t.assert_called_with("en", "set_role.usage") + + +@pytest.mark.asyncio +async def test_set_role_user_not_found_replies_user_not_found(): + """set_role: target user not found -> reply set_role.user_not_found.""" + message = MagicMock() + message.reply_text = AsyncMock() + message.reply_to_message = MagicMock() + message.reply_to_message.from_user = MagicMock() + message.reply_to_message.from_user.id = 999 + user = _make_user() + update = _make_update(message=message, effective_user=user) + context = MagicMock() + context.args = ["admin"] + + with patch( + "duty_teller.handlers.commands.is_admin_async", + new_callable=AsyncMock, + return_value=True, + ): + with patch("duty_teller.handlers.commands.get_lang", return_value="en"): + with patch( + "duty_teller.handlers.commands.get_user_by_telegram_id", + return_value=None, + ): + with patch("duty_teller.handlers.commands.t") as mock_t: + mock_t.return_value = "User not found" + await set_role(update, context) + message.reply_text.assert_called_once_with("User not found") + mock_t.assert_called_with("en", "set_role.user_not_found") + + +@pytest.mark.asyncio +async def test_set_role_success_reply_replies_done(): + """set_role: reply to user + role -> set_user_role success -> reply set_role.done.""" + message = MagicMock() + message.reply_text = AsyncMock() + message.reply_to_message = MagicMock() + message.reply_to_message.from_user = MagicMock() + message.reply_to_message.from_user.id = 888 + user = _make_user() + update = _make_update(message=message, effective_user=user) + context = MagicMock() + context.args = ["admin"] + + mock_target = MagicMock() + mock_target.id = 10 + mock_target.full_name = "Target User" + + with patch( + "duty_teller.handlers.commands.is_admin_async", + new_callable=AsyncMock, + return_value=True, + ): + with patch("duty_teller.handlers.commands.get_lang", return_value="en"): + with patch( + "duty_teller.handlers.commands.session_scope", + ) as mock_scope: + mock_session = MagicMock() + mock_scope.return_value.__enter__.return_value = mock_session + mock_scope.return_value.__exit__.return_value = None + with patch( + "duty_teller.handlers.commands.get_user_by_telegram_id", + return_value=mock_target, + ): + with patch( + "duty_teller.handlers.commands.set_user_role", + return_value=mock_target, + ): + with patch("duty_teller.handlers.commands.t") as mock_t: + mock_t.return_value = "Role set" + await set_role(update, context) + message.reply_text.assert_called_once_with("Role set") + mock_t.assert_called_with("en", "set_role.done", name="Target User", role="admin") + + +@pytest.mark.asyncio +async def test_set_role_db_error_replies_error(): + """set_role: set_user_role returns None -> reply set_role.error.""" + message = MagicMock() + message.reply_text = AsyncMock() + message.reply_to_message = MagicMock() + message.reply_to_message.from_user = MagicMock() + message.reply_to_message.from_user.id = 777 + user = _make_user() + update = _make_update(message=message, effective_user=user) + context = MagicMock() + context.args = ["user"] + + mock_target = MagicMock() + mock_target.id = 5 + mock_target.full_name = "Someone" + + with patch( + "duty_teller.handlers.commands.is_admin_async", + new_callable=AsyncMock, + return_value=True, + ): + with patch("duty_teller.handlers.commands.get_lang", return_value="en"): + with patch("duty_teller.handlers.commands.session_scope") as mock_scope: + mock_session = MagicMock() + mock_scope.return_value.__enter__.return_value = mock_session + mock_scope.return_value.__exit__.return_value = None + with patch( + "duty_teller.handlers.commands.get_user_by_telegram_id", + return_value=mock_target, + ): + with patch( + "duty_teller.handlers.commands.set_user_role", + return_value=None, + ): + with patch("duty_teller.handlers.commands.t") as mock_t: + mock_t.return_value = "Error" + await set_role(update, context) + message.reply_text.assert_called_once_with("Error") + mock_t.assert_called_with("en", "set_role.error") diff --git a/tests/test_handlers_errors.py b/tests/test_handlers_errors.py index ca2eb05..3c1fafe 100644 --- a/tests/test_handlers_errors.py +++ b/tests/test_handlers_errors.py @@ -57,6 +57,7 @@ async def test_error_handler_update_none_does_not_crash(): @pytest.mark.asyncio async def test_error_handler_reply_text_raises_does_not_propagate(): """error_handler: when reply_text raises, exception is caught and logged; does not propagate.""" + class FakeUpdate: pass @@ -71,7 +72,9 @@ async def test_error_handler_reply_text_raises_does_not_propagate(): with patch("duty_teller.handlers.errors.Update", FakeUpdate): with patch("duty_teller.handlers.errors.get_lang", return_value="en"): - with patch("duty_teller.handlers.errors.t", return_value="An error occurred."): + with patch( + "duty_teller.handlers.errors.t", return_value="An error occurred." + ): await error_handler(update, context) # Handler must not raise; reply_text was attempted update.effective_message.reply_text.assert_called_once() diff --git a/tests/test_handlers_group_duty_pin.py b/tests/test_handlers_group_duty_pin.py index 056803a..466135e 100644 --- a/tests/test_handlers_group_duty_pin.py +++ b/tests/test_handlers_group_duty_pin.py @@ -1,8 +1,11 @@ """Tests for duty_teller.handlers.group_duty_pin (sync wrappers, update_group_pin, pin_duty_cmd).""" +from datetime import datetime, timedelta, timezone from unittest.mock import AsyncMock, MagicMock, patch import pytest +from telegram.error import BadRequest +from telegram.constants import ChatMemberStatus from duty_teller.handlers import group_duty_pin as mod @@ -56,6 +59,70 @@ class TestSyncWrappers: result = mod._get_all_pin_chat_ids_sync() assert result == [10, 20] + def test_get_next_shift_end_sync(self): + """_get_next_shift_end_sync: uses session_scope and get_next_shift_end_utc.""" + with patch.object(mod, "session_scope") as mock_scope: + mock_session = MagicMock() + mock_scope.return_value.__enter__.return_value = mock_session + mock_scope.return_value.__exit__.return_value = None + when = datetime(2026, 2, 22, 9, 0, tzinfo=timezone.utc) + with patch.object(mod, "get_next_shift_end_utc", return_value=when): + result = mod._get_next_shift_end_sync() + assert result == when + mock_scope.assert_called_once() + + +# --- _schedule_next_update --- + + +@pytest.mark.asyncio +async def test_schedule_next_update_job_queue_none_returns_early(): + """_schedule_next_update: job_queue is None -> log and return, no run_once.""" + application = MagicMock() + application.job_queue = None + with patch.object(mod, "logger") as mock_logger: + await mod._schedule_next_update(application, 123, datetime.now(timezone.utc)) + mock_logger.warning.assert_called_once() + assert "Job queue not available" in mock_logger.warning.call_args[0][0] + + +@pytest.mark.asyncio +async def test_schedule_next_update_when_utc_not_none_runs_once_with_delay(): + """_schedule_next_update: when_utc set -> remove old jobs, run_once with delay.""" + application = MagicMock() + job_queue = MagicMock() + job_queue.get_jobs_by_name = MagicMock(return_value=[]) + job_queue.run_once = MagicMock() + application.job_queue = job_queue + when_utc = datetime.now(timezone.utc).replace(tzinfo=None) + timedelta(hours=1) + + await mod._schedule_next_update(application, 456, when_utc) + + job_queue.get_jobs_by_name.assert_called_once_with("duty_pin_456") + job_queue.run_once.assert_called_once() + call_kw = job_queue.run_once.call_args[1] + assert call_kw["data"] == {"chat_id": 456} + assert call_kw["name"] == "duty_pin_456" + assert call_kw["when"].total_seconds() >= 1 + + +@pytest.mark.asyncio +async def test_schedule_next_update_when_utc_none_runs_once_with_retry_delay(): + """_schedule_next_update: when_utc None -> run_once with RETRY_WHEN_NO_DUTY_MINUTES.""" + application = MagicMock() + job_queue = MagicMock() + job_queue.get_jobs_by_name = MagicMock(return_value=[]) + job_queue.run_once = MagicMock() + application.job_queue = job_queue + + await mod._schedule_next_update(application, 789, None) + + job_queue.run_once.assert_called_once() + call_kw = job_queue.run_once.call_args[1] + assert call_kw["when"] == timedelta(minutes=mod.RETRY_WHEN_NO_DUTY_MINUTES) + assert call_kw["data"] == {"chat_id": 789} + assert call_kw["name"] == "duty_pin_789" + @pytest.mark.asyncio async def test_update_group_pin_edits_message_and_schedules_next(): @@ -96,6 +163,28 @@ async def test_update_group_pin_no_message_id_skips(): context.bot.edit_message_text.assert_not_called() +@pytest.mark.asyncio +async def test_update_group_pin_edit_raises_bad_request_still_schedules_next(): + """update_group_pin: edit_message_text BadRequest -> log, _schedule_next_update still called.""" + context = MagicMock() + context.job = MagicMock() + context.job.data = {"chat_id": 111} + context.bot = MagicMock() + context.bot.edit_message_text = AsyncMock( + side_effect=BadRequest("Message not modified") + ) + context.application = MagicMock() + + with patch.object(mod, "_sync_get_message_id", return_value=2): + with patch.object(mod, "_get_duty_message_text_sync", return_value="Text"): + with patch.object(mod, "_get_next_shift_end_sync", return_value=None): + with patch.object( + mod, "_schedule_next_update", AsyncMock() + ) as mock_schedule: + await mod.update_group_pin(context) + mock_schedule.assert_called_once_with(context.application, 111, None) + + @pytest.mark.asyncio async def test_pin_duty_cmd_group_only_reply(): """pin_duty_cmd in private chat -> reply group_only.""" @@ -139,3 +228,187 @@ async def test_pin_duty_cmd_group_pins_and_replies_pinned(): chat_id=100, message_id=5, disable_notification=True ) update.message.reply_text.assert_called_once_with("Pinned") + + +@pytest.mark.asyncio +async def test_pin_duty_cmd_no_message_id_replies_no_message(): + """pin_duty_cmd: no pin record (_sync_get_message_id -> None) -> reply pin_duty.no_message.""" + update = MagicMock() + update.message = MagicMock() + update.message.reply_text = AsyncMock() + update.effective_chat = MagicMock() + update.effective_chat.type = "group" + update.effective_chat.id = 100 + update.effective_user = MagicMock() + context = MagicMock() + + with patch("duty_teller.handlers.group_duty_pin.get_lang", return_value="en"): + with patch.object(mod, "_sync_get_message_id", return_value=None): + with patch("duty_teller.handlers.group_duty_pin.t") as mock_t: + mock_t.return_value = "No message to pin" + await mod.pin_duty_cmd(update, context) + update.message.reply_text.assert_called_once_with("No message to pin") + mock_t.assert_called_with("en", "pin_duty.no_message") + + +@pytest.mark.asyncio +async def test_pin_duty_cmd_pin_raises_replies_failed(): + """pin_duty_cmd: pin_chat_message raises -> reply pin_duty.failed.""" + update = MagicMock() + update.message = MagicMock() + update.message.reply_text = AsyncMock() + update.effective_chat = MagicMock() + update.effective_chat.type = "group" + update.effective_chat.id = 100 + update.effective_user = MagicMock() + context = MagicMock() + context.bot = MagicMock() + context.bot.pin_chat_message = AsyncMock( + side_effect=BadRequest("Not enough rights") + ) + + with patch("duty_teller.handlers.group_duty_pin.get_lang", return_value="en"): + with patch.object(mod, "_sync_get_message_id", return_value=5): + with patch("duty_teller.handlers.group_duty_pin.t") as mock_t: + mock_t.return_value = "Failed to pin" + await mod.pin_duty_cmd(update, context) + update.message.reply_text.assert_called_once_with("Failed to pin") + mock_t.assert_called_with("en", "pin_duty.failed") + + +# --- my_chat_member_handler --- + + +def _make_my_chat_member_update( + old_status, + new_status, + chat_id=100, + chat_type="group", + bot_id=999, + from_user_id=1, +): + """Build Update with my_chat_member for group add/remove.""" + update = MagicMock() + update.effective_user = MagicMock() + update.effective_user.id = from_user_id + update.effective_chat = MagicMock() + update.effective_chat.id = chat_id + update.effective_chat.type = chat_type + update.my_chat_member = MagicMock() + update.my_chat_member.old_chat_member = MagicMock() + update.my_chat_member.old_chat_member.status = old_status + update.my_chat_member.new_chat_member = MagicMock() + update.my_chat_member.new_chat_member.status = new_status + update.my_chat_member.new_chat_member.user = MagicMock() + update.my_chat_member.new_chat_member.user.id = bot_id + return update + + +@pytest.mark.asyncio +async def test_my_chat_member_handler_bot_added_sends_pins_and_schedules(): + """my_chat_member_handler: bot added to group -> send_message, pin, save_pin, schedule.""" + update = _make_my_chat_member_update( + old_status=ChatMemberStatus.LEFT, + new_status=ChatMemberStatus.ADMINISTRATOR, + chat_id=200, + bot_id=999, + ) + context = MagicMock() + context.bot = MagicMock() + context.bot.id = 999 + context.bot.send_message = AsyncMock(return_value=MagicMock(message_id=42)) + context.bot.pin_chat_message = AsyncMock() + context.application = MagicMock() + context.application.job_queue = MagicMock() + context.application.job_queue.get_jobs_by_name = MagicMock(return_value=[]) + context.application.job_queue.run_once = MagicMock() + + with patch("duty_teller.handlers.group_duty_pin.get_lang", return_value="en"): + with patch.object(mod, "_get_duty_message_text_sync", return_value="Duty text"): + with patch.object(mod, "_sync_save_pin"): + with patch.object(mod, "_get_next_shift_end_sync", return_value=None): + with patch.object(mod, "_schedule_next_update", AsyncMock()): + await mod.my_chat_member_handler(update, context) + context.bot.send_message.assert_called_once_with(chat_id=200, text="Duty text") + context.bot.pin_chat_message.assert_called_once_with( + chat_id=200, message_id=42, disable_notification=True + ) + + +@pytest.mark.asyncio +async def test_my_chat_member_handler_pin_raises_sends_could_not_pin(): + """my_chat_member_handler: pin_chat_message raises -> send_message could_not_pin_make_admin.""" + update = _make_my_chat_member_update( + old_status=ChatMemberStatus.LEFT, + new_status=ChatMemberStatus.MEMBER, + chat_id=201, + bot_id=999, + ) + context = MagicMock() + context.bot = MagicMock() + context.bot.id = 999 + context.bot.send_message = AsyncMock(return_value=MagicMock(message_id=43)) + context.bot.pin_chat_message = AsyncMock(side_effect=BadRequest("Cannot pin")) + context.application = MagicMock() + context.application.job_queue = MagicMock() + context.application.job_queue.get_jobs_by_name = MagicMock(return_value=[]) + context.application.job_queue.run_once = MagicMock() + + with patch("duty_teller.handlers.group_duty_pin.get_lang", return_value="en"): + with patch.object(mod, "_get_duty_message_text_sync", return_value="Duty"): + with patch.object(mod, "_sync_save_pin"): + with patch.object(mod, "_get_next_shift_end_sync", return_value=None): + with patch.object(mod, "_schedule_next_update", AsyncMock()): + with patch("duty_teller.handlers.group_duty_pin.t") as mock_t: + mock_t.return_value = "Make me admin to pin" + await mod.my_chat_member_handler(update, context) + assert context.bot.send_message.call_count >= 2 + pin_hint_calls = [ + c + for c in context.bot.send_message.call_args_list + if c[1].get("text") == "Make me admin to pin" + ] + assert len(pin_hint_calls) == 1 + + +@pytest.mark.asyncio +async def test_my_chat_member_handler_bot_removed_deletes_pin_and_jobs(): + """my_chat_member_handler: bot removed -> _sync_delete_pin, remove jobs by name.""" + update = _make_my_chat_member_update( + old_status=ChatMemberStatus.ADMINISTRATOR, + new_status=ChatMemberStatus.LEFT, + chat_id=202, + bot_id=999, + ) + context = MagicMock() + context.bot = MagicMock() + context.bot.id = 999 + context.application = MagicMock() + job_queue = MagicMock() + mock_job = MagicMock() + job_queue.get_jobs_by_name = MagicMock(return_value=[mock_job]) + context.application.job_queue = job_queue + + with patch.object(mod, "_sync_delete_pin"): + await mod.my_chat_member_handler(update, context) + job_queue.get_jobs_by_name.assert_called_once_with("duty_pin_202") + mock_job.schedule_removal.assert_called_once() + + +@pytest.mark.asyncio +async def test_restore_group_pin_jobs_calls_schedule_for_each_chat(): + """restore_group_pin_jobs: for each chat_id from _get_all_pin_chat_ids_sync, calls _schedule_next_update.""" + application = MagicMock() + application.job_queue = MagicMock() + application.job_queue.get_jobs_by_name = MagicMock(return_value=[]) + application.job_queue.run_once = MagicMock() + + with patch.object(mod, "_get_all_pin_chat_ids_sync", return_value=[10, 20]): + with patch.object(mod, "_get_next_shift_end_sync", return_value=None): + with patch.object( + mod, "_schedule_next_update", AsyncMock() + ) as mock_schedule: + await mod.restore_group_pin_jobs(application) + assert mock_schedule.call_count == 2 + mock_schedule.assert_any_call(application, 10, None) + mock_schedule.assert_any_call(application, 20, None) diff --git a/tests/test_handlers_import_duty_schedule.py b/tests/test_handlers_import_duty_schedule.py new file mode 100644 index 0000000..0ae67dc --- /dev/null +++ b/tests/test_handlers_import_duty_schedule.py @@ -0,0 +1,394 @@ +"""Tests for duty_teller.handlers.import_duty_schedule (import_duty_schedule_cmd, handover_time, document).""" + +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from duty_teller.handlers import import_duty_schedule as mod +from duty_teller.importers.duty_schedule import ( + DutyScheduleParseError, + DutyScheduleResult, +) +from datetime import date + + +def _make_update(message=None, effective_user=None): + """Build minimal Update with message and effective_user.""" + update = MagicMock() + update.message = message + update.effective_user = effective_user + return update + + +def _make_user(user_id=1, first_name="Admin", last_name="User", username="admin"): + """Build minimal Telegram user.""" + u = MagicMock() + u.id = user_id + u.first_name = first_name + u.last_name = last_name + u.username = username + return u + + +# --- import_duty_schedule_cmd --- + + +@pytest.mark.asyncio +async def test_import_duty_schedule_cmd_no_message_returns_early(): + """import_duty_schedule_cmd: no update.message -> no reply, no user_data.""" + update = _make_update(message=None, effective_user=_make_user()) + context = MagicMock() + context.user_data = {} + await mod.import_duty_schedule_cmd(update, context) + assert not context.user_data + + +@pytest.mark.asyncio +async def test_import_duty_schedule_cmd_no_effective_user_returns_early(): + """import_duty_schedule_cmd: no effective_user -> no reply, no user_data.""" + message = MagicMock() + message.reply_text = AsyncMock() + update = _make_update(message=message, effective_user=None) + context = MagicMock() + context.user_data = {} + await mod.import_duty_schedule_cmd(update, context) + assert not context.user_data + message.reply_text.assert_not_called() + + +@pytest.mark.asyncio +async def test_import_duty_schedule_cmd_not_admin_replies_admin_only(): + """import_duty_schedule_cmd: non-admin -> reply_text(import.admin_only).""" + message = MagicMock() + message.reply_text = AsyncMock() + user = _make_user(42) + update = _make_update(message=message, effective_user=user) + context = MagicMock() + context.user_data = {} + + with patch.object( + mod, "is_admin_async", new_callable=AsyncMock, return_value=False + ): + with patch.object(mod, "get_lang", return_value="en"): + with patch.object(mod, "t") as mock_t: + mock_t.return_value = "Admin only" + await mod.import_duty_schedule_cmd(update, context) + message.reply_text.assert_called_once_with("Admin only") + mock_t.assert_called_with("en", "import.admin_only") + assert not context.user_data.get("awaiting_handover_time") + + +@pytest.mark.asyncio +async def test_import_duty_schedule_cmd_admin_sets_user_data_and_replies_handover_format(): + """import_duty_schedule_cmd: admin -> user_data awaiting_handover_time=True, reply handover_format.""" + message = MagicMock() + message.reply_text = AsyncMock() + user = _make_user(42) + update = _make_update(message=message, effective_user=user) + context = MagicMock() + context.user_data = {} + + with patch.object(mod, "is_admin_async", new_callable=AsyncMock, return_value=True): + with patch.object(mod, "get_lang", return_value="en"): + with patch.object(mod, "t") as mock_t: + mock_t.return_value = "Enter time e.g. 09:00 Europe/Moscow" + await mod.import_duty_schedule_cmd(update, context) + assert context.user_data["awaiting_handover_time"] is True + message.reply_text.assert_called_once_with("Enter time e.g. 09:00 Europe/Moscow") + mock_t.assert_called_with("en", "import.handover_format") + + +# --- handle_handover_time_text --- + + +@pytest.mark.asyncio +async def test_handle_handover_time_text_no_message_returns_early(): + """handle_handover_time_text: no message -> no reply.""" + update = _make_update(message=None, effective_user=_make_user()) + context = MagicMock() + context.user_data = {"awaiting_handover_time": True} + await mod.handle_handover_time_text(update, context) + # Handler returns early; update.message is None so we only check user_data unchanged. + assert context.user_data["awaiting_handover_time"] is True + + +@pytest.mark.asyncio +async def test_handle_handover_time_text_no_awaiting_flag_returns_early(): + """handle_handover_time_text: not awaiting_handover_time -> no reply.""" + message = MagicMock() + message.text = "09:00 Europe/Moscow" + message.reply_text = AsyncMock() + update = _make_update(message=message, effective_user=_make_user()) + context = MagicMock() + context.user_data = {} + with patch.object(mod, "parse_handover_time"): + await mod.handle_handover_time_text(update, context) + message.reply_text.assert_not_called() + + +@pytest.mark.asyncio +async def test_handle_handover_time_text_not_admin_returns_early(): + """handle_handover_time_text: not admin -> no reply.""" + message = MagicMock() + message.text = "09:00 Europe/Moscow" + message.reply_text = AsyncMock() + update = _make_update(message=message, effective_user=_make_user()) + context = MagicMock() + context.user_data = {"awaiting_handover_time": True} + with patch.object( + mod, "is_admin_async", new_callable=AsyncMock, return_value=False + ): + with patch.object(mod, "parse_handover_time"): + await mod.handle_handover_time_text(update, context) + message.reply_text.assert_not_called() + + +@pytest.mark.asyncio +async def test_handle_handover_time_text_parse_error_replies_parse_time_error(): + """handle_handover_time_text: parse_handover_time returns None -> reply parse_time_error.""" + message = MagicMock() + message.text = "invalid" + message.reply_text = AsyncMock() + update = _make_update(message=message, effective_user=_make_user()) + context = MagicMock() + context.user_data = {"awaiting_handover_time": True} + + with patch.object(mod, "is_admin_async", new_callable=AsyncMock, return_value=True): + with patch.object(mod, "parse_handover_time", return_value=None): + with patch.object(mod, "get_lang", return_value="en"): + with patch.object(mod, "t") as mock_t: + mock_t.return_value = "Could not parse time" + await mod.handle_handover_time_text(update, context) + message.reply_text.assert_called_once_with("Could not parse time") + mock_t.assert_called_with("en", "import.parse_time_error") + assert context.user_data.get("awaiting_handover_time") is True + + +@pytest.mark.asyncio +async def test_handle_handover_time_text_success_sets_user_data_and_replies_send_json(): + """handle_handover_time_text: parsed time -> user_data updated, reply send_json.""" + message = MagicMock() + message.text = " 09:00 Europe/Moscow " + message.reply_text = AsyncMock() + update = _make_update(message=message, effective_user=_make_user()) + context = MagicMock() + context.user_data = {"awaiting_handover_time": True} + + with patch.object(mod, "is_admin_async", new_callable=AsyncMock, return_value=True): + with patch.object(mod, "parse_handover_time", return_value=(6, 0)): + with patch.object(mod, "get_lang", return_value="en"): + with patch.object(mod, "t") as mock_t: + mock_t.return_value = "Send JSON file" + await mod.handle_handover_time_text(update, context) + assert context.user_data["handover_utc_time"] == (6, 0) + assert context.user_data["awaiting_handover_time"] is False + assert context.user_data["awaiting_duty_schedule_file"] is True + message.reply_text.assert_called_once_with("Send JSON file") + mock_t.assert_called_with("en", "import.send_json") + + +# --- handle_duty_schedule_document --- + + +def _make_document(file_id="file-123", file_name="schedule.json"): + """Build minimal Document with file_id and file_name.""" + doc = MagicMock() + doc.file_id = file_id + doc.file_name = file_name + return doc + + +@pytest.mark.asyncio +async def test_handle_duty_schedule_document_no_document_returns_early(): + """handle_duty_schedule_document: no document -> no import.""" + message = MagicMock() + message.document = None + update = _make_update(message=message, effective_user=_make_user()) + context = MagicMock() + context.user_data = { + "awaiting_duty_schedule_file": True, + "handover_utc_time": (6, 0), + } + await mod.handle_duty_schedule_document(update, context) + message.reply_text.assert_not_called() + + +@pytest.mark.asyncio +async def test_handle_duty_schedule_document_not_awaiting_file_returns_early(): + """handle_duty_schedule_document: not awaiting_duty_schedule_file -> no import.""" + message = MagicMock() + message.document = _make_document() + update = _make_update(message=message, effective_user=_make_user()) + context = MagicMock() + context.user_data = {} + await mod.handle_duty_schedule_document(update, context) + message.reply_text.assert_not_called() + + +@pytest.mark.asyncio +async def test_handle_duty_schedule_document_no_handover_time_returns_early(): + """handle_duty_schedule_document: no handover_utc_time -> no import.""" + message = MagicMock() + message.document = _make_document() + update = _make_update(message=message, effective_user=_make_user()) + context = MagicMock() + context.user_data = {"awaiting_duty_schedule_file": True} + await mod.handle_duty_schedule_document(update, context) + message.reply_text.assert_not_called() + + +@pytest.mark.asyncio +async def test_handle_duty_schedule_document_not_admin_returns_early(): + """handle_duty_schedule_document: not admin -> no import.""" + message = MagicMock() + message.document = _make_document() + update = _make_update(message=message, effective_user=_make_user()) + context = MagicMock() + context.user_data = { + "awaiting_duty_schedule_file": True, + "handover_utc_time": (6, 0), + } + with patch.object( + mod, "is_admin_async", new_callable=AsyncMock, return_value=False + ): + await mod.handle_duty_schedule_document(update, context) + message.reply_text.assert_not_called() + + +@pytest.mark.asyncio +async def test_handle_duty_schedule_document_non_json_replies_need_json(): + """handle_duty_schedule_document: file not .json -> reply need_json.""" + message = MagicMock() + message.document = _make_document(file_name="data.txt") + message.reply_text = AsyncMock() + update = _make_update(message=message, effective_user=_make_user()) + context = MagicMock() + context.user_data = { + "awaiting_duty_schedule_file": True, + "handover_utc_time": (6, 0), + } + + with patch.object(mod, "is_admin_async", new_callable=AsyncMock, return_value=True): + with patch.object(mod, "get_lang", return_value="en"): + with patch.object(mod, "t") as mock_t: + mock_t.return_value = "File must be .json" + await mod.handle_duty_schedule_document(update, context) + message.reply_text.assert_called_once_with("File must be .json") + mock_t.assert_called_with("en", "import.need_json") + + +@pytest.mark.asyncio +async def test_handle_duty_schedule_document_parse_error_replies_and_clears_user_data(): + """handle_duty_schedule_document: parse_duty_schedule raises DutyScheduleParseError -> reply, clear user_data.""" + message = MagicMock() + message.document = _make_document() + message.reply_text = AsyncMock() + update = _make_update(message=message, effective_user=_make_user()) + context = MagicMock() + context.user_data = { + "awaiting_duty_schedule_file": True, + "handover_utc_time": (6, 0), + } + context.bot = MagicMock() + mock_file = MagicMock() + mock_file.download_as_bytearray = AsyncMock(return_value=bytearray(b"invalid")) + context.bot.get_file = AsyncMock(return_value=mock_file) + + with patch.object(mod, "is_admin_async", new_callable=AsyncMock, return_value=True): + with patch.object(mod, "get_lang", return_value="en"): + with patch.object(mod, "parse_duty_schedule") as mock_parse: + mock_parse.side_effect = DutyScheduleParseError("Bad JSON") + with patch.object(mod, "t") as mock_t: + mock_t.return_value = "Parse error: Bad JSON" + await mod.handle_duty_schedule_document(update, context) + message.reply_text.assert_called_once_with("Parse error: Bad JSON") + mock_t.assert_called_with("en", "import.parse_error", error="Bad JSON") + assert "awaiting_duty_schedule_file" not in context.user_data + assert "handover_utc_time" not in context.user_data + + +@pytest.mark.asyncio +async def test_handle_duty_schedule_document_import_error_replies_and_clears_user_data(): + """handle_duty_schedule_document: run_import in executor raises -> reply import_error, clear user_data.""" + message = MagicMock() + message.document = _make_document() + message.reply_text = AsyncMock() + update = _make_update(message=message, effective_user=_make_user()) + context = MagicMock() + context.user_data = { + "awaiting_duty_schedule_file": True, + "handover_utc_time": (6, 0), + } + mock_file = MagicMock() + mock_file.download_as_bytearray = AsyncMock(return_value=bytearray(b"{}")) + context.bot = MagicMock() + context.bot.get_file = AsyncMock(return_value=mock_file) + + result = DutyScheduleResult( + start_date=date(2026, 2, 1), + end_date=date(2026, 2, 28), + entries=[], + ) + + with patch.object(mod, "is_admin_async", new_callable=AsyncMock, return_value=True): + with patch.object(mod, "get_lang", return_value="en"): + with patch.object(mod, "parse_duty_schedule", return_value=result): + with patch.object(mod, "session_scope") as mock_scope: + mock_session = MagicMock() + mock_scope.return_value.__enter__.return_value = mock_session + mock_scope.return_value.__exit__.return_value = None + with patch.object(mod, "run_import") as mock_run: + mock_run.side_effect = ValueError("DB error") + with patch.object(mod, "t") as mock_t: + mock_t.return_value = "Import error: DB error" + await mod.handle_duty_schedule_document(update, context) + message.reply_text.assert_called_once_with("Import error: DB error") + mock_t.assert_called_with("en", "import.import_error", error="DB error") + assert "awaiting_duty_schedule_file" not in context.user_data + assert "handover_utc_time" not in context.user_data + + +@pytest.mark.asyncio +async def test_handle_duty_schedule_document_success_replies_done_and_clears_user_data(): + """handle_duty_schedule_document: successful import -> reply done, clear user_data.""" + message = MagicMock() + message.document = _make_document() + message.reply_text = AsyncMock() + update = _make_update(message=message, effective_user=_make_user()) + context = MagicMock() + context.user_data = { + "awaiting_duty_schedule_file": True, + "handover_utc_time": (6, 0), + } + mock_file = MagicMock() + mock_file.download_as_bytearray = AsyncMock(return_value=bytearray(b"{}")) + context.bot = MagicMock() + context.bot.get_file = AsyncMock(return_value=mock_file) + + result = DutyScheduleResult( + start_date=date(2026, 2, 1), + end_date=date(2026, 2, 28), + entries=[], + ) + + with patch.object(mod, "is_admin_async", new_callable=AsyncMock, return_value=True): + with patch.object(mod, "get_lang", return_value="en"): + with patch.object(mod, "parse_duty_schedule", return_value=result): + with patch.object(mod, "session_scope") as mock_scope: + mock_session = MagicMock() + mock_scope.return_value.__enter__.return_value = mock_session + mock_scope.return_value.__exit__.return_value = None + with patch.object(mod, "run_import", return_value=(3, 5, 0, 1)): + with patch.object(mod, "t") as mock_t: + mock_t.side_effect = lambda lang, key, **kw: ( + "Done: 3 users, 5 duties, 1 vacation (6 total)" + if key == "import.done" + else "" + ) + await mod.handle_duty_schedule_document(update, context) + message.reply_text.assert_called_once() + assert "done" in message.reply_text.call_args[0][0].lower() or "import.done" in str( + message.reply_text.call_args + ) + assert "awaiting_duty_schedule_file" not in context.user_data + assert "handover_utc_time" not in context.user_data