First commit

This commit is contained in:
2025-12-19 12:58:58 +01:00
parent cba66667f3
commit 87865e2c6d
26 changed files with 3343 additions and 0 deletions
View File
+99
View File
@@ -0,0 +1,99 @@
"""
App Registration Service
Handles operations related to Azure AD app registrations.
"""
from msgraph import GraphServiceClient
from typing import List, Dict
class AppRegistrationService:
"""Service for managing app registrations via Microsoft Graph."""
def __init__(self, graph_client: GraphServiceClient):
"""
Initialize the app registration service.
Args:
graph_client: Authenticated Graph service client
"""
self.graph_client = graph_client
async def list_applications(self) -> List[Dict[str, str]]:
"""
Get all app registrations from Azure AD.
Returns:
List[Dict]: List of app registrations with id, app_id, and display_name
Raises:
Exception: If the API call fails
"""
try:
apps = []
# Get all applications
result = await self.graph_client.applications.get()
if result and result.value:
for app in result.value:
apps.append({
'id': app.id, # Object ID
'app_id': app.app_id, # Application (client) ID
'display_name': app.display_name
})
# Handle pagination if there are more than 100 apps
while result and result.odata_next_link:
# Continue fetching next page
from kiota_abstractions.base_request_configuration import RequestConfiguration
request_config = RequestConfiguration()
request_config.url = result.odata_next_link
result = await self.graph_client.applications.get(request_configuration=request_config)
if result and result.value:
for app in result.value:
apps.append({
'id': app.id,
'app_id': app.app_id,
'display_name': app.display_name
})
# Sort alphabetically by display name
apps.sort(key=lambda x: x['display_name'].lower())
return apps
except Exception as e:
raise Exception(f"Failed to list applications: {str(e)}")
async def get_application(self, app_object_id: str) -> Dict[str, any]:
"""
Get a specific app registration by its object ID.
Args:
app_object_id: The object ID of the app registration
Returns:
Dict: App registration details
Raises:
Exception: If the API call fails
"""
try:
app = await self.graph_client.applications.by_application_id(app_object_id).get()
if app:
return {
'id': app.id,
'app_id': app.app_id,
'display_name': app.display_name,
'password_credentials': app.password_credentials
}
return None
except Exception as e:
raise Exception(f"Failed to get application: {str(e)}")
+187
View File
@@ -0,0 +1,187 @@
"""
Key Vault Service
Handles operations related to Azure Key Vault.
"""
from azure.identity import InteractiveBrowserCredential
from azure.keyvault.secrets import SecretClient
from azure.mgmt.keyvault import KeyVaultManagementClient
from datetime import datetime
from typing import List, Dict
import re
class KeyVaultService:
"""Service for managing Key Vault operations."""
def __init__(self, credential: InteractiveBrowserCredential, subscription_id: str):
"""
Initialize the Key Vault service.
Args:
credential: Authenticated credential
subscription_id: Azure subscription ID
"""
self.credential = credential
self.subscription_id = subscription_id
self.mgmt_client = KeyVaultManagementClient(credential, subscription_id)
def list_keyvaults(self, resource_group: str = None) -> List[Dict[str, str]]:
"""
List all Key Vaults in the subscription.
Args:
resource_group: Optional resource group to filter by
Returns:
List[Dict]: List of Key Vaults with name, id, location, and resource_group
Raises:
Exception: If the API call fails
"""
try:
vaults = []
if resource_group:
# Get vaults from specific resource group
vault_list = self.mgmt_client.vaults.list_by_resource_group(resource_group)
else:
# Get all vaults in subscription
vault_list = self.mgmt_client.vaults.list_by_subscription()
for vault in vault_list:
# Extract resource group from vault ID
# Format: /subscriptions/{sub-id}/resourceGroups/{rg-name}/providers/...
rg_name = vault.id.split('/')[4] if len(vault.id.split('/')) > 4 else ''
vaults.append({
'name': vault.name,
'id': vault.id,
'location': vault.location,
'resource_group': rg_name
})
# Sort by name
vaults.sort(key=lambda x: x['name'].lower())
return vaults
except Exception as e:
raise Exception(f"Failed to list Key Vaults: {str(e)}")
def store_secret(
self,
vault_name: str,
secret_name: str,
secret_value: str,
description: str,
secret_id: str,
expires: datetime
) -> str:
"""
Store a secret in Key Vault with tags.
Args:
vault_name: Name of the Key Vault
secret_name: Name for the secret (will be sanitized)
secret_value: The secret value to store
description: Description tag
secret_id: Secret ID tag (from app registration)
expires: Expiration date
Returns:
str: The sanitized secret name
Raises:
Exception: If the operation fails
"""
try:
# Sanitize the secret name
sanitized_name = self._sanitize_name(secret_name)
# Construct vault URL
vault_url = f"https://{vault_name}.vault.azure.net"
# Create secret client
secret_client = SecretClient(vault_url=vault_url, credential=self.credential)
# Create tags
tags = {
'Description': description,
'SecretId': secret_id
}
# Set the secret
secret = secret_client.set_secret(
name=sanitized_name,
value=secret_value,
tags=tags,
expires_on=expires
)
return secret.name
except Exception as e:
raise Exception(f"Failed to store secret in Key Vault: {str(e)}")
def _sanitize_name(self, name: str) -> str:
"""
Sanitize a name for use in Key Vault.
Key Vault secret names can only contain alphanumeric characters and hyphens.
Args:
name: The name to sanitize
Returns:
str: Sanitized name
"""
if not name:
return name
# Replace any non-alphanumeric character (except hyphens) with hyphen
sanitized = re.sub(r'[^0-9a-zA-Z-]', '-', name)
# Remove consecutive hyphens
sanitized = re.sub(r'-+', '-', sanitized)
# Remove leading/trailing hyphens
sanitized = sanitized.strip('-')
return sanitized
def get_secret(self, vault_name: str, secret_name: str) -> Dict[str, any]:
"""
Retrieve a secret from Key Vault.
Args:
vault_name: Name of the Key Vault
secret_name: Name of the secret
Returns:
Dict: Secret properties including value and tags
Raises:
Exception: If the operation fails
"""
try:
# Construct vault URL
vault_url = f"https://{vault_name}.vault.azure.net"
# Create secret client
secret_client = SecretClient(vault_url=vault_url, credential=self.credential)
# Get the secret
secret = secret_client.get_secret(secret_name)
return {
'name': secret.name,
'value': secret.value,
'tags': secret.properties.tags,
'expires_on': secret.properties.expires_on,
'created_on': secret.properties.created_on
}
except Exception as e:
raise Exception(f"Failed to retrieve secret from Key Vault: {str(e)}")
+155
View File
@@ -0,0 +1,155 @@
"""
Secret Service
Handles creation and removal of app registration secrets.
"""
from msgraph import GraphServiceClient
from msgraph.generated.models.password_credential import PasswordCredential
from msgraph.generated.applications.item.add_password.add_password_post_request_body import AddPasswordPostRequestBody
from msgraph.generated.applications.item.remove_password.remove_password_post_request_body import RemovePasswordPostRequestBody
from datetime import datetime, timedelta
from typing import Dict, List, Any
import config
class SecretService:
"""Service for managing app registration secrets."""
def __init__(self, graph_client: GraphServiceClient):
"""
Initialize the secret service.
Args:
graph_client: Authenticated Graph service client
"""
self.graph_client = graph_client
async def create_secret(
self,
app_object_id: str,
description: str,
years: int = None
) -> Dict[str, Any]:
"""
Create a new secret for an app registration.
Args:
app_object_id: The object ID of the app registration
description: Display name/description for the secret
years: Number of years until expiration (defaults to config.APP_SECRET_EXPIRATION_YEARS)
Returns:
Dict: Contains secret_text, key_id, and end_datetime
Raises:
Exception: If the API call fails
"""
try:
if years is None:
years = config.APP_SECRET_EXPIRATION_YEARS
# Calculate expiration date
end_date = datetime.now() + timedelta(days=365 * years)
# Create password credential
password_cred = PasswordCredential()
password_cred.display_name = description
password_cred.end_date_time = end_date
# Create request body
request_body = AddPasswordPostRequestBody()
request_body.password_credential = password_cred
# Call Graph API to add password
result = await self.graph_client.applications.by_application_id(
app_object_id
).add_password.post(request_body)
if result:
return {
'secret_text': result.secret_text,
'key_id': str(result.key_id),
'end_datetime': result.end_date_time,
'display_name': result.display_name
}
raise Exception("No result returned from add_password")
except Exception as e:
raise Exception(f"Failed to create secret: {str(e)}")
async def remove_old_secrets(
self,
app_object_id: str,
keep_key_id: str
) -> int:
"""
Remove all secrets except the specified one.
Args:
app_object_id: The object ID of the app registration
keep_key_id: The key ID to keep (newly created secret)
Returns:
int: Number of secrets removed
Raises:
Exception: If the API call fails
"""
try:
removed_count = 0
# Get the app with its password credentials
app = await self.graph_client.applications.by_application_id(app_object_id).get()
if app and app.password_credentials:
for cred in app.password_credentials:
# Remove if it's not the one we want to keep
if str(cred.key_id) != str(keep_key_id):
# Create request body
request_body = RemovePasswordPostRequestBody()
request_body.key_id = cred.key_id
# Call Graph API to remove password
await self.graph_client.applications.by_application_id(
app_object_id
).remove_password.post(request_body)
removed_count += 1
return removed_count
except Exception as e:
raise Exception(f"Failed to remove old secrets: {str(e)}")
async def list_secrets(self, app_object_id: str) -> List[Dict[str, Any]]:
"""
List all secrets for an app registration (metadata only, not the secret values).
Args:
app_object_id: The object ID of the app registration
Returns:
List[Dict]: List of secret metadata
Raises:
Exception: If the API call fails
"""
try:
app = await self.graph_client.applications.by_application_id(app_object_id).get()
secrets = []
if app and app.password_credentials:
for cred in app.password_credentials:
secrets.append({
'key_id': str(cred.key_id),
'display_name': cred.display_name,
'start_datetime': cred.start_date_time,
'end_datetime': cred.end_date_time
})
return secrets
except Exception as e:
raise Exception(f"Failed to list secrets: {str(e)}")