Merged
Show file tree
Hide file tree
Changes from all commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Failed to load files.
Original file line numberDiff line numberDiff line change
Expand Up@@ -19,6 +19,7 @@
import json
import os
import time
import unittest.mock

from google.auth import crypt
from google.auth import jwt
Expand All@@ -36,6 +37,9 @@
from tests import testutils


MOCK_CURRENT_TIME = 1500000000
MOCK_CURRENT_TIME_UTC = datetime.datetime.fromtimestamp(
MOCK_CURRENT_TIME, tz=datetime.timezone.utc)
MOCK_UID = 'user1'
MOCK_CREDENTIAL = credentials.Certificate(
testutils.resource_filename('service_account.json'))
Expand DownExpand Up@@ -105,16 +109,17 @@ def verify_custom_token(custom_token, expected_claims, tenant_id=None):
for key, value in expected_claims.items():
assert value == token['claims'][key]

def _get_id_token(payload_overrides=None, header_overrides=None):
def _get_id_token(payload_overrides=None, header_overrides=None, current_time=MOCK_CURRENT_TIME):
signer = crypt.RSASigner.from_string(MOCK_PRIVATE_KEY)
headers = {
'kid': 'mock-key-id-1'
}
now = int(current_time if current_time is not None else time.time())
payload = {
'aud': MOCK_CREDENTIAL.project_id,
'iss': 'https://securetoken.google.com/' + MOCK_CREDENTIAL.project_id,
'iat': int(time.time()) - 100,
'exp': int(time.time()) + 3600,
'iat': now - 100,
'exp': now + 3600,
'sub': '1234567890',
'admin': True,
'firebase': {
Expand All@@ -127,12 +132,13 @@ def _get_id_token(payload_overrides=None, header_overrides=None):
payload = _merge_jwt_claims(payload, payload_overrides)
return jwt.encode(signer, payload, header=headers)

def _get_session_cookie(payload_overrides=None, header_overrides=None):
def _get_session_cookie(
payload_overrides=None, header_overrides=None, current_time=MOCK_CURRENT_TIME):
payload_overrides = payload_overrides or {}
if 'iss' not in payload_overrides:
payload_overrides['iss'] = 'https://session.firebase.google.com/{0}'.format(
MOCK_CREDENTIAL.project_id)
return _get_id_token(payload_overrides, header_overrides)
return _get_id_token(payload_overrides, header_overrides, current_time=current_time)

def _instrument_user_manager(app, status, payload):
client = auth._get_client(app)
Expand DownExpand Up@@ -205,7 +211,7 @@ def env_var_app(request):
@pytest.fixture(scope='module')
def revoked_tokens():
mock_user = json.loads(testutils.resource('get_user.json'))
mock_user['users'][0]['validSince'] = str(int(time.time())+100)
mock_user['users'][0]['validSince'] = str(MOCK_CURRENT_TIME + 100)
return json.dumps(mock_user)

@pytest.fixture(scope='module')
Expand All@@ -218,7 +224,7 @@ def user_disabled():
def user_disabled_and_revoked():
mock_user = json.loads(testutils.resource('get_user.json'))
mock_user['users'][0]['disabled'] = True
mock_user['users'][0]['validSince'] = str(int(time.time())+100)
mock_user['users'][0]['validSince'] = str(MOCK_CURRENT_TIME + 100)
return json.dumps(mock_user)


Expand DownExpand Up@@ -420,6 +426,17 @@ def test_unexpected_response(self, user_mgt_app):

class TestVerifyIdToken:

def setup_method(self):
self.time_ = unittest.mock.('time.time', return_value=MOCK_CURRENT_TIME)
self.time_.start()
self.utcnow_ = unittest.mock.(
'google.auth.jwt._helpers.utcnow', return_value=MOCK_CURRENT_TIME_UTC)
self.utcnow_.start()

def teardown_method(self):
self.time_.stop()
self.utcnow_.stop()

valid_tokens = {
'BinaryToken': TEST_ID_TOKEN,
'TextToken': TEST_ID_TOKEN.decode('utf-8'),
Expand All@@ -435,14 +452,14 @@ class TestVerifyIdToken:
'EmptySubject': _get_id_token({'sub': ''}),
'IntSubject': _get_id_token({'sub': 10}),
'LongStrSubject': _get_id_token({'sub': 'a' * 129}),
'FutureToken': _get_id_token({'iat': int(time.time()) + 1000}),
'FutureToken': _get_id_token({'iat': MOCK_CURRENT_TIME + 1000}),
'ExpiredToken': _get_id_token({
'iat': int(time.time()) - 10000,
'exp': int(time.time()) - 3600
'iat': MOCK_CURRENT_TIME - 10000,
'exp': MOCK_CURRENT_TIME - 3600
}),
'ExpiredTokenShort': _get_id_token({
'iat': int(time.time()) - 10000,
'exp': int(time.time()) - 30
'iat': MOCK_CURRENT_TIME - 10000,
'exp': MOCK_CURRENT_TIME - 30
}),
'BadFormatToken': 'foobar'
}
Expand DownExpand Up@@ -618,6 +635,17 @@ def test_certificate_request_failure(self, user_mgt_app):

class TestVerifySessionCookie:

def setup_method(self):
self.time_ = unittest.mock.('time.time', return_value=MOCK_CURRENT_TIME)
self.time_.start()
self.utcnow_ = unittest.mock.(
'google.auth.jwt._helpers.utcnow', return_value=MOCK_CURRENT_TIME_UTC)
self.utcnow_.start()

def teardown_method(self):
self.time_.stop()
self.utcnow_.stop()

valid_cookies = {
'BinaryCookie': TEST_SESSION_COOKIE,
'TextCookie': TEST_SESSION_COOKIE.decode('utf-8'),
Expand All@@ -633,14 +661,14 @@ class TestVerifySessionCookie:
'EmptySubject': _get_session_cookie({'sub': ''}),
'IntSubject': _get_session_cookie({'sub': 10}),
'LongStrSubject': _get_session_cookie({'sub': 'a' * 129}),
'FutureCookie': _get_session_cookie({'iat': int(time.time()) + 1000}),
'FutureCookie': _get_session_cookie({'iat': MOCK_CURRENT_TIME + 1000}),
'ExpiredCookie': _get_session_cookie({
'iat': int(time.time()) - 10000,
'exp': int(time.time()) - 3600
'iat': MOCK_CURRENT_TIME - 10000,
'exp': MOCK_CURRENT_TIME - 3600
}),
'ExpiredCookieShort': _get_session_cookie({
'iat': int(time.time()) - 10000,
'exp': int(time.time()) - 30
'iat': MOCK_CURRENT_TIME - 10000,
'exp': MOCK_CURRENT_TIME - 30
}),
'BadFormatCookie': 'foobar',
'IDToken': TEST_ID_TOKEN,
Expand DownExpand Up@@ -792,6 +820,17 @@ def test_certificate_request_failure(self, user_mgt_app):

class TestCertificateCaching:

def setup_method(self):
self.time_ = unittest.mock.('time.time', return_value=MOCK_CURRENT_TIME)
self.time_.start()
self.utcnow_ = unittest.mock.(
'google.auth.jwt._helpers.utcnow', return_value=MOCK_CURRENT_TIME_UTC)
self.utcnow_.start()

def teardown_method(self):
self.time_.stop()
self.utcnow_.stop()

def test_certificate_caching(self, user_mgt_app, httpserver):
httpserver.serve_content(MOCK_PUBLIC_CERTS, 200, headers={'Cache-Control': 'max-age=3600'})
verifier = _token_gen.TokenVerifier(user_mgt_app)
Expand All@@ -810,6 +849,18 @@ def test_certificate_caching(self, user_mgt_app, httpserver):

class TestCertificateFetchTimeout:

def setup_method(self):
self.time_ = unittest.mock.('time.time', return_value=MOCK_CURRENT_TIME)
self.time_.start()
self.utcnow_ = unittest.mock.(
'google.auth.jwt._helpers.utcnow', return_value=MOCK_CURRENT_TIME_UTC)
self.utcnow_.start()

def teardown_method(self):
self.time_.stop()
self.utcnow_.stop()
testutils.cleanup_apps()

timeout_configs = [
({'httpTimeout': 4}, 4),
({'httpTimeout': None}, None),
Expand DownExpand Up@@ -852,6 +903,3 @@ def _instrument_session(self, app):
recorder = []
request.session.mount('https://', testutils.MockAdapter(MOCK_PUBLIC_CERTS, 200, recorder))
return recorder

def teardown_method(self):
testutils.cleanup_apps()