Files
I-SecretUpdate/services/keyvault_service.py
T
2025-12-19 12:58:58 +01:00

188 lines
5.5 KiB
Python

"""
Key Vault Service
Handles operations related to Azure Key Vault.
"""
from azure.identity import InteractiveBrowserCredential
from azure.keyvault.secrets import SecretClient
from azure.mgmt.keyvault import KeyVaultManagementClient
from datetime import datetime
from typing import List, Dict
import re
class KeyVaultService:
"""Service for managing Key Vault operations."""
def __init__(self, credential: InteractiveBrowserCredential, subscription_id: str):
"""
Initialize the Key Vault service.
Args:
credential: Authenticated credential
subscription_id: Azure subscription ID
"""
self.credential = credential
self.subscription_id = subscription_id
self.mgmt_client = KeyVaultManagementClient(credential, subscription_id)
def list_keyvaults(self, resource_group: str = None) -> List[Dict[str, str]]:
"""
List all Key Vaults in the subscription.
Args:
resource_group: Optional resource group to filter by
Returns:
List[Dict]: List of Key Vaults with name, id, location, and resource_group
Raises:
Exception: If the API call fails
"""
try:
vaults = []
if resource_group:
# Get vaults from specific resource group
vault_list = self.mgmt_client.vaults.list_by_resource_group(resource_group)
else:
# Get all vaults in subscription
vault_list = self.mgmt_client.vaults.list_by_subscription()
for vault in vault_list:
# Extract resource group from vault ID
# Format: /subscriptions/{sub-id}/resourceGroups/{rg-name}/providers/...
rg_name = vault.id.split('/')[4] if len(vault.id.split('/')) > 4 else ''
vaults.append({
'name': vault.name,
'id': vault.id,
'location': vault.location,
'resource_group': rg_name
})
# Sort by name
vaults.sort(key=lambda x: x['name'].lower())
return vaults
except Exception as e:
raise Exception(f"Failed to list Key Vaults: {str(e)}")
def store_secret(
self,
vault_name: str,
secret_name: str,
secret_value: str,
description: str,
secret_id: str,
expires: datetime
) -> str:
"""
Store a secret in Key Vault with tags.
Args:
vault_name: Name of the Key Vault
secret_name: Name for the secret (will be sanitized)
secret_value: The secret value to store
description: Description tag
secret_id: Secret ID tag (from app registration)
expires: Expiration date
Returns:
str: The sanitized secret name
Raises:
Exception: If the operation fails
"""
try:
# Sanitize the secret name
sanitized_name = self._sanitize_name(secret_name)
# Construct vault URL
vault_url = f"https://{vault_name}.vault.azure.net"
# Create secret client
secret_client = SecretClient(vault_url=vault_url, credential=self.credential)
# Create tags
tags = {
'Description': description,
'SecretId': secret_id
}
# Set the secret
secret = secret_client.set_secret(
name=sanitized_name,
value=secret_value,
tags=tags,
expires_on=expires
)
return secret.name
except Exception as e:
raise Exception(f"Failed to store secret in Key Vault: {str(e)}")
def _sanitize_name(self, name: str) -> str:
"""
Sanitize a name for use in Key Vault.
Key Vault secret names can only contain alphanumeric characters and hyphens.
Args:
name: The name to sanitize
Returns:
str: Sanitized name
"""
if not name:
return name
# Replace any non-alphanumeric character (except hyphens) with hyphen
sanitized = re.sub(r'[^0-9a-zA-Z-]', '-', name)
# Remove consecutive hyphens
sanitized = re.sub(r'-+', '-', sanitized)
# Remove leading/trailing hyphens
sanitized = sanitized.strip('-')
return sanitized
def get_secret(self, vault_name: str, secret_name: str) -> Dict[str, any]:
"""
Retrieve a secret from Key Vault.
Args:
vault_name: Name of the Key Vault
secret_name: Name of the secret
Returns:
Dict: Secret properties including value and tags
Raises:
Exception: If the operation fails
"""
try:
# Construct vault URL
vault_url = f"https://{vault_name}.vault.azure.net"
# Create secret client
secret_client = SecretClient(vault_url=vault_url, credential=self.credential)
# Get the secret
secret = secret_client.get_secret(secret_name)
return {
'name': secret.name,
'value': secret.value,
'tags': secret.properties.tags,
'expires_on': secret.properties.expires_on,
'created_on': secret.properties.created_on
}
except Exception as e:
raise Exception(f"Failed to retrieve secret from Key Vault: {str(e)}")