Files
I-SecretUpdate/auth/azure_authenticator.py
T
2025-12-22 09:57:49 +01:00

210 lines
7.2 KiB
Python

"""
Azure Authentication Module
Handles authentication to Azure services for Key Vault management.
"""
from azure.identity import InteractiveBrowserCredential
from azure.mgmt.resource import SubscriptionClient
from typing import Optional, List, Dict
class AzureAuthenticator:
"""Handles Azure authentication for Key Vault and resource management."""
def __init__(self):
"""
Initialize the Azure authenticator.
"""
self.tenant_id: Optional[str] = None
self.subscription_id: Optional[str] = None
self.credential: Optional[InteractiveBrowserCredential] = None
self.subscriptions: List[Dict[str, str]] = []
def discover_tenant_id(self) -> tuple[str, InteractiveBrowserCredential, List[Dict[str, str]]]:
"""
Discover user's tenant ID by authenticating with organizations endpoint.
Returns:
tuple: (tenant_id, credential, subscriptions) - The discovered tenant ID, credential to reuse, and subscription list
Raises:
Exception: If no subscriptions found or authentication fails
"""
try:
# Create temporary credential with "organizations" for discovery
temp_credential = InteractiveBrowserCredential(
tenant_id="organizations",
additionally_allowed_tenants=["*"]
)
# List subscriptions to extract tenant
sub_client = SubscriptionClient(temp_credential)
subscriptions_list = list(sub_client.subscriptions.list())
if not subscriptions_list:
raise Exception("No Azure subscriptions found. Please ensure you have access to at least one subscription.")
# Extract tenant ID from first subscription
tenant_id = subscriptions_list[0].tenant_id
print(f"Discovered Tenant ID: {tenant_id}")
# Convert subscriptions to dict format
subscriptions = [
{
'id': sub.subscription_id,
'name': sub.display_name
}
for sub in subscriptions_list
]
# Return tenant_id, credential, and subscriptions for reuse
return tenant_id, temp_credential, subscriptions
except Exception as e:
raise Exception(f"Failed to discover tenant: {str(e)}")
def authenticate(self, credential: InteractiveBrowserCredential = None, tenant_id: str = None, subscriptions: List[Dict[str, str]] = None) -> bool:
"""
Authenticate to Azure. Can reuse credential, use specific tenant, or discover tenant.
Args:
credential: Optional credential to reuse (from GraphAuthenticator)
tenant_id: Optional specific tenant ID to use
subscriptions: Optional pre-fetched subscriptions list (avoids re-listing)
Returns:
bool: True if authentication succeeded, False otherwise
Raises:
Exception: If authentication fails
"""
try:
if credential:
# Reuse credential (recommended path to avoid multiple auth prompts)
self.credential = credential
else:
# Create credential with specific tenant ID
if not tenant_id:
raise Exception("Either credential or tenant_id must be provided")
self.tenant_id = tenant_id
# Create NEW credential with specific tenant_id (avoids redirect)
self.credential = InteractiveBrowserCredential(
tenant_id=tenant_id,
additionally_allowed_tenants=["*"]
)
# Use provided subscriptions or list them if not provided
if subscriptions:
# Reuse pre-fetched subscriptions (avoids duplicate API call)
self.subscriptions = subscriptions
# Set tenant ID if provided
if tenant_id:
self.tenant_id = tenant_id
print(f"Using cached authentication. Found {len(subscriptions)} subscription(s).")
return True
else:
# List all subscriptions (fallback for legacy code path)
sub_client = SubscriptionClient(self.credential)
subscriptions_list = list(sub_client.subscriptions.list())
# Extract tenant ID if not already set
if subscriptions_list and not self.tenant_id:
first_sub = subscriptions_list[0]
self.tenant_id = first_sub.tenant_id if hasattr(first_sub, 'tenant_id') else None
if self.tenant_id:
print(f"Detected Tenant ID: {self.tenant_id}")
if subscriptions_list:
print(f"Successfully authenticated to Azure. Found {len(subscriptions_list)} subscription(s).")
# Store subscriptions for later selection
self.subscriptions = [
{
'id': sub.subscription_id,
'name': sub.display_name
}
for sub in subscriptions_list
]
return True
return False
except Exception as e:
raise Exception(f"Azure authentication failed: {str(e)}")
def get_credential(self) -> InteractiveBrowserCredential:
"""
Get the credential object.
Returns:
InteractiveBrowserCredential: The credential object
Raises:
Exception: If not authenticated
"""
if not self.credential:
raise Exception("Not authenticated. Call authenticate() first.")
return self.credential
def get_subscriptions(self) -> List[Dict[str, str]]:
"""
Get the list of available subscriptions.
Returns:
List[Dict]: List of subscriptions with 'id' and 'name'
"""
return self.subscriptions
def set_subscription(self, subscription_id: str):
"""
Set the active subscription ID.
Args:
subscription_id: The subscription ID to use
"""
self.subscription_id = subscription_id
def get_subscription_id(self) -> str:
"""
Get the subscription ID.
Returns:
str: The subscription ID
Raises:
Exception: If subscription not set
"""
if not self.subscription_id:
raise Exception("Subscription not set. Call set_subscription() first.")
return self.subscription_id
def get_tenant_id(self) -> str:
"""
Get the tenant ID.
Returns:
str: The tenant ID
Raises:
Exception: If not authenticated
"""
if not self.tenant_id:
raise Exception("Tenant ID not available. Call authenticate() first.")
return self.tenant_id
def is_authenticated(self) -> bool:
"""
Check if currently authenticated.
Returns:
bool: True if authenticated, False otherwise
"""
return self.credential is not None