-
Notifications
You must be signed in to change notification settings - Fork 3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
1 changed file
with
138 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,6 @@ | ||
import functools | ||
import sys | ||
from typing import Any, Iterable, List, Optional, Tuple | ||
from typing import Any, Dict, Iterable, List, Optional, Tuple | ||
|
||
import pytest | ||
|
||
|
@@ -334,3 +334,140 @@ def test_broken_keyring_disables_keyring(monkeypatch: pytest.MonkeyPatch) -> Non | |
url, allow_netrc=False, allow_keyring=True | ||
) == (None, None) | ||
assert keyring_broken._call_count == 1 | ||
|
||
|
||
class KeyringSubprocessResult(KeyringModuleV1): | ||
"""Represents the subprocess call to keyring""" | ||
|
||
returncode = 0 # Default to zero retcode | ||
|
||
def __call__( | ||
self, | ||
cmd: List[str], | ||
*, | ||
env: Dict[str, str], | ||
stdin: Optional[Any] = None, | ||
capture_output: Optional[bool] = None, | ||
input: Optional[bytes] = None, | ||
) -> Any: | ||
if cmd[1] == "get": | ||
assert stdin == -3 # subprocess.DEVNULL | ||
assert capture_output is True | ||
assert env["PYTHONIOENCODING"] == "utf-8" | ||
|
||
password = self.get_password(*cmd[2:]) | ||
if password is None: | ||
self.returncode = 1 | ||
else: | ||
self.stdout = password.encode("utf-8") + b"\n" | ||
|
||
if cmd[1] == "set": | ||
assert stdin is None | ||
assert capture_output is None | ||
assert env["PYTHONIOENCODING"] == "utf-8" | ||
assert input is not None | ||
|
||
self.set_password(cmd[2], cmd[3], input.decode("utf-8").strip("\n")) | ||
|
||
return self | ||
|
||
def check_returncode(self) -> None: | ||
if self.returncode: | ||
raise Exception() | ||
|
||
|
||
@pytest.mark.parametrize( | ||
"url, expect", | ||
( | ||
("http://example.com/path1", (None, None)), | ||
# path1 URLs will be resolved by netloc | ||
("http://[email protected]/path1", ("user", "user!netloc")), | ||
("http://[email protected]/path1", ("user2", "user2!netloc")), | ||
# path2 URLs will be resolved by index URL | ||
("http://example.com/path2/path3", (None, None)), | ||
("http://[email protected]/path2/path3", ("foo", "foo!url")), | ||
), | ||
) | ||
def test_keyring_cli_get_password( | ||
monkeypatch: pytest.MonkeyPatch, | ||
url: str, | ||
expect: Tuple[Optional[str], Optional[str]], | ||
) -> None: | ||
monkeypatch.setattr(pip._internal.network.auth.shutil, "which", lambda x: "keyring") | ||
monkeypatch.setattr( | ||
pip._internal.network.auth.subprocess, "run", KeyringSubprocessResult() | ||
) | ||
auth = MultiDomainBasicAuth(index_urls=["http://example.com/path2"]) | ||
|
||
actual = auth._get_new_credentials(url, allow_netrc=False, allow_keyring=True) | ||
assert actual == expect | ||
|
||
|
||
@pytest.mark.parametrize( | ||
"response_status, creds, expect_save", | ||
( | ||
(403, ("user", "pass", True), False), | ||
( | ||
200, | ||
("user", "pass", True), | ||
True, | ||
), | ||
( | ||
200, | ||
("user", "pass", False), | ||
False, | ||
), | ||
), | ||
) | ||
def test_keyring_cli_set_password( | ||
monkeypatch: pytest.MonkeyPatch, | ||
response_status: int, | ||
creds: Tuple[str, str, bool], | ||
expect_save: bool, | ||
) -> None: | ||
monkeypatch.setattr(pip._internal.network.auth.shutil, "which", lambda x: "keyring") | ||
keyring = KeyringSubprocessResult() | ||
monkeypatch.setattr(pip._internal.network.auth.subprocess, "run", keyring) | ||
auth = MultiDomainBasicAuth(prompting=True) | ||
monkeypatch.setattr(auth, "_get_url_and_credentials", lambda u: (u, None, None)) | ||
monkeypatch.setattr(auth, "_prompt_for_password", lambda *a: creds) | ||
if creds[2]: | ||
# when _prompt_for_password indicates to save, we should save | ||
def should_save_password_to_keyring(*a: Any) -> bool: | ||
return True | ||
|
||
else: | ||
# when _prompt_for_password indicates not to save, we should | ||
# never call this function | ||
def should_save_password_to_keyring(*a: Any) -> bool: | ||
assert False, "_should_save_password_to_keyring should not be called" | ||
|
||
monkeypatch.setattr( | ||
auth, "_should_save_password_to_keyring", should_save_password_to_keyring | ||
) | ||
|
||
req = MockRequest("https://example.com") | ||
resp = MockResponse(b"") | ||
resp.url = req.url | ||
connection = MockConnection() | ||
|
||
def _send(sent_req: MockRequest, **kwargs: Any) -> MockResponse: | ||
assert sent_req is req | ||
assert "Authorization" in sent_req.headers | ||
r = MockResponse(b"") | ||
r.status_code = response_status | ||
return r | ||
|
||
# https://github.com/python/mypy/issues/2427 | ||
connection._send = _send # type: ignore[assignment] | ||
|
||
resp.request = req | ||
resp.status_code = 401 | ||
resp.connection = connection | ||
|
||
auth.handle_401(resp) | ||
|
||
if expect_save: | ||
assert keyring.saved_passwords == [("example.com", creds[0], creds[1])] | ||
else: | ||
assert keyring.saved_passwords == [] |