|
19 | 19 | import json
|
20 | 20 | import os
|
21 | 21 | import time
|
| 22 | +import unittest.mock |
22 | 23 |
|
23 | 24 | from google.auth import crypt
|
24 | 25 | from google.auth import jwt
|
|
36 | 37 | from tests import testutils
|
37 | 38 |
|
38 | 39 |
|
| 40 | +MOCK_CURRENT_TIME = 1500000000 |
| 41 | +MOCK_CURRENT_TIME_UTC = datetime.datetime.fromtimestamp( |
| 42 | +MOCK_CURRENT_TIME, tz=datetime.timezone.utc) |
39 | 43 | MOCK_UID = 'user1'
|
40 | 44 | MOCK_CREDENTIAL = credentials.Certificate(
|
41 | 45 | testutils.resource_filename('service_account.json'))
|
@@ -105,16 +109,17 @@ def verify_custom_token(custom_token, expected_claims, tenant_id=None):
|
105 | 109 | for key, value in expected_claims.items():
|
106 | 110 | assert value == token['claims'][key]
|
107 | 111 |
|
108 |
| -def _get_id_token(payload_overrides=None, header_overrides=None): |
| 112 | +def _get_id_token(payload_overrides=None, header_overrides=None, current_time=MOCK_CURRENT_TIME): |
109 | 113 | signer = crypt.RSASigner.from_string(MOCK_PRIVATE_KEY)
|
110 | 114 | headers = {
|
111 | 115 | 'kid': 'mock-key-id-1'
|
112 | 116 | }
|
| 117 | +now = int(current_time if current_time is not None else time.time()) |
113 | 118 | payload = {
|
114 | 119 | 'aud': MOCK_CREDENTIAL.project_id,
|
115 | 120 | 'iss': 'https://securetoken.google.com/' + MOCK_CREDENTIAL.project_id,
|
116 |
| -'iat': int(time.time()) - 100, |
117 |
| -'exp': int(time.time()) + 3600, |
| 121 | +'iat': now - 100, |
| 122 | +'exp': now + 3600, |
118 | 123 | 'sub': '1234567890',
|
119 | 124 | 'admin': True,
|
120 | 125 | 'firebase': {
|
@@ -127,12 +132,13 @@ def _get_id_token(payload_overrides=None, header_overrides=None):
|
127 | 132 | payload = _merge_jwt_claims(payload, payload_overrides)
|
128 | 133 | return jwt.encode(signer, payload, header=headers)
|
129 | 134 |
|
130 |
| -def _get_session_cookie(payload_overrides=None, header_overrides=None): |
| 135 | +def _get_session_cookie( |
| 136 | +payload_overrides=None, header_overrides=None, current_time=MOCK_CURRENT_TIME): |
131 | 137 | payload_overrides = payload_overrides or {}
|
132 | 138 | if 'iss' not in payload_overrides:
|
133 | 139 | payload_overrides['iss'] = 'https://session.firebase.google.com/{0}'.format(
|
134 | 140 | MOCK_CREDENTIAL.project_id)
|
135 |
| -return _get_id_token(payload_overrides, header_overrides) |
| 141 | +return _get_id_token(payload_overrides, header_overrides, current_time=current_time) |
136 | 142 |
|
137 | 143 | def _instrument_user_manager(app, status, payload):
|
138 | 144 | client = auth._get_client(app)
|
@@ -205,7 +211,7 @@ def env_var_app(request):
|
205 | 211 | @pytest.fixture(scope='module')
|
206 | 212 | def revoked_tokens():
|
207 | 213 | mock_user = json.loads(testutils.resource('get_user.json'))
|
208 |
| -mock_user['users'][0]['validSince'] = str(int(time.time())+100) |
| 214 | +mock_user['users'][0]['validSince'] = str(MOCK_CURRENT_TIME + 100) |
209 | 215 | return json.dumps(mock_user)
|
210 | 216 |
|
211 | 217 | @pytest.fixture(scope='module')
|
@@ -218,7 +224,7 @@ def user_disabled():
|
218 | 224 | def user_disabled_and_revoked():
|
219 | 225 | mock_user = json.loads(testutils.resource('get_user.json'))
|
220 | 226 | mock_user['users'][0]['disabled'] = True
|
221 |
| -mock_user['users'][0]['validSince'] = str(int(time.time())+100) |
| 227 | +mock_user['users'][0]['validSince'] = str(MOCK_CURRENT_TIME + 100) |
222 | 228 | return json.dumps(mock_user)
|
223 | 229 |
|
224 | 230 |
|
@@ -420,6 +426,17 @@ def test_unexpected_response(self, user_mgt_app):
|
420 | 426 |
|
421 | 427 | class TestVerifyIdToken:
|
422 | 428 |
|
| 429 | +def setup_method(self): |
| 430 | +self.time_ = unittest.mock.('time.time', return_value=MOCK_CURRENT_TIME) |
| 431 | +self.time_.start() |
| 432 | +self.utcnow_ = unittest.mock.( |
| 433 | +'google.auth.jwt._helpers.utcnow', return_value=MOCK_CURRENT_TIME_UTC) |
| 434 | +self.utcnow_.start() |
| 435 | + |
| 436 | +def teardown_method(self): |
| 437 | +self.time_.stop() |
| 438 | +self.utcnow_.stop() |
| 439 | + |
423 | 440 | valid_tokens = {
|
424 | 441 | 'BinaryToken': TEST_ID_TOKEN,
|
425 | 442 | 'TextToken': TEST_ID_TOKEN.decode('utf-8'),
|
@@ -435,14 +452,14 @@ class TestVerifyIdToken:
|
435 | 452 | 'EmptySubject': _get_id_token({'sub': ''}),
|
436 | 453 | 'IntSubject': _get_id_token({'sub': 10}),
|
437 | 454 | 'LongStrSubject': _get_id_token({'sub': 'a' * 129}),
|
438 |
| -'FutureToken': _get_id_token({'iat': int(time.time()) + 1000}), |
| 455 | +'FutureToken': _get_id_token({'iat': MOCK_CURRENT_TIME + 1000}), |
439 | 456 | 'ExpiredToken': _get_id_token({
|
440 |
| -'iat': int(time.time()) - 10000, |
441 |
| -'exp': int(time.time()) - 3600 |
| 457 | +'iat': MOCK_CURRENT_TIME - 10000, |
| 458 | +'exp': MOCK_CURRENT_TIME - 3600 |
442 | 459 | }),
|
443 | 460 | 'ExpiredTokenShort': _get_id_token({
|
444 |
| -'iat': int(time.time()) - 10000, |
445 |
| -'exp': int(time.time()) - 30 |
| 461 | +'iat': MOCK_CURRENT_TIME - 10000, |
| 462 | +'exp': MOCK_CURRENT_TIME - 30 |
446 | 463 | }),
|
447 | 464 | 'BadFormatToken': 'foobar'
|
448 | 465 | }
|
@@ -618,6 +635,17 @@ def test_certificate_request_failure(self, user_mgt_app):
|
618 | 635 |
|
619 | 636 | class TestVerifySessionCookie:
|
620 | 637 |
|
| 638 | +def setup_method(self): |
| 639 | +self.time_ = unittest.mock.('time.time', return_value=MOCK_CURRENT_TIME) |
| 640 | +self.time_.start() |
| 641 | +self.utcnow_ = unittest.mock.( |
| 642 | +'google.auth.jwt._helpers.utcnow', return_value=MOCK_CURRENT_TIME_UTC) |
| 643 | +self.utcnow_.start() |
| 644 | + |
| 645 | +def teardown_method(self): |
| 646 | +self.time_.stop() |
| 647 | +self.utcnow_.stop() |
| 648 | + |
621 | 649 | valid_cookies = {
|
622 | 650 | 'BinaryCookie': TEST_SESSION_COOKIE,
|
623 | 651 | 'TextCookie': TEST_SESSION_COOKIE.decode('utf-8'),
|
@@ -633,14 +661,14 @@ class TestVerifySessionCookie:
|
633 | 661 | 'EmptySubject': _get_session_cookie({'sub': ''}),
|
634 | 662 | 'IntSubject': _get_session_cookie({'sub': 10}),
|
635 | 663 | 'LongStrSubject': _get_session_cookie({'sub': 'a' * 129}),
|
636 |
| -'FutureCookie': _get_session_cookie({'iat': int(time.time()) + 1000}), |
| 664 | +'FutureCookie': _get_session_cookie({'iat': MOCK_CURRENT_TIME + 1000}), |
637 | 665 | 'ExpiredCookie': _get_session_cookie({
|
638 |
| -'iat': int(time.time()) - 10000, |
639 |
| -'exp': int(time.time()) - 3600 |
| 666 | +'iat': MOCK_CURRENT_TIME - 10000, |
| 667 | +'exp': MOCK_CURRENT_TIME - 3600 |
640 | 668 | }),
|
641 | 669 | 'ExpiredCookieShort': _get_session_cookie({
|
642 |
| -'iat': int(time.time()) - 10000, |
643 |
| -'exp': int(time.time()) - 30 |
| 670 | +'iat': MOCK_CURRENT_TIME - 10000, |
| 671 | +'exp': MOCK_CURRENT_TIME - 30 |
644 | 672 | }),
|
645 | 673 | 'BadFormatCookie': 'foobar',
|
646 | 674 | 'IDToken': TEST_ID_TOKEN,
|
@@ -792,6 +820,17 @@ def test_certificate_request_failure(self, user_mgt_app):
|
792 | 820 |
|
793 | 821 | class TestCertificateCaching:
|
794 | 822 |
|
| 823 | +def setup_method(self): |
| 824 | +self.time_ = unittest.mock.('time.time', return_value=MOCK_CURRENT_TIME) |
| 825 | +self.time_.start() |
| 826 | +self.utcnow_ = unittest.mock.( |
| 827 | +'google.auth.jwt._helpers.utcnow', return_value=MOCK_CURRENT_TIME_UTC) |
| 828 | +self.utcnow_.start() |
| 829 | + |
| 830 | +def teardown_method(self): |
| 831 | +self.time_.stop() |
| 832 | +self.utcnow_.stop() |
| 833 | + |
795 | 834 | def test_certificate_caching(self, user_mgt_app, httpserver):
|
796 | 835 | httpserver.serve_content(MOCK_PUBLIC_CERTS, 200, headers={'Cache-Control': 'max-age=3600'})
|
797 | 836 | verifier = _token_gen.TokenVerifier(user_mgt_app)
|
@@ -810,6 +849,18 @@ def test_certificate_caching(self, user_mgt_app, httpserver):
|
810 | 849 |
|
811 | 850 | class TestCertificateFetchTimeout:
|
812 | 851 |
|
| 852 | +def setup_method(self): |
| 853 | +self.time_ = unittest.mock.('time.time', return_value=MOCK_CURRENT_TIME) |
| 854 | +self.time_.start() |
| 855 | +self.utcnow_ = unittest.mock.( |
| 856 | +'google.auth.jwt._helpers.utcnow', return_value=MOCK_CURRENT_TIME_UTC) |
| 857 | +self.utcnow_.start() |
| 858 | + |
| 859 | +def teardown_method(self): |
| 860 | +self.time_.stop() |
| 861 | +self.utcnow_.stop() |
| 862 | +testutils.cleanup_apps() |
| 863 | + |
813 | 864 | timeout_configs = [
|
814 | 865 | ({'httpTimeout': 4}, 4),
|
815 | 866 | ({'httpTimeout': None}, None),
|
@@ -852,6 +903,3 @@ def _instrument_session(self, app):
|
852 | 903 | recorder = []
|
853 | 904 | request.session.mount('https://', testutils.MockAdapter(MOCK_PUBLIC_CERTS, 200, recorder))
|
854 | 905 | return recorder
|
855 |
| - |
856 |
| -def teardown_method(self): |
857 |
| -testutils.cleanup_apps() |
|
0 commit comments