haumdaucher_de/scripts/manage_secrets.py

218 lines
7.2 KiB
Python
Executable File

#!/usr/bin/env python3
import subprocess
import sys
import json
import os
# --- Configuration ---
PROJECT_ID = "haumdaucher"
SECRETS = {
"haumdaucher-oauth-client-id": {
"desc": "OAuth 2.0 Client ID",
"instructions": (
"1. Go to https://console.cloud.google.com/apis/credentials?project=haumdaucher\n"
"2. Look for 'OAuth 2.0 Client IDs' -> 'Haumdaucher Web' (or create one if missing).\n"
"3. Copy the 'Client ID' (looks like: 12345...apps.googleusercontent.com)"
),
"validation_hint": "Must end with .apps.googleusercontent.com"
},
"haumdaucher-oauth-client-secret": {
"desc": "OAuth 2.0 Client Secret",
"instructions": (
"1. On the same credentials page, click the edit icon (pencil) or the name of the client.\n"
"2. On the right side, find 'Client secret'.\n"
"3. Copy the string (it is hidden by default)."
),
"validation_hint": "Usually a random string of characters."
}
}
# --- Helpers ---
class Colors:
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
def print_header(msg):
print(f"\n{Colors.HEADER}{Colors.BOLD}=== {msg} ==={Colors.ENDC}")
def print_step(msg):
print(f"\n{Colors.OKBLUE}👉 {msg}{Colors.ENDC}")
def print_success(msg):
print(f"{Colors.OKGREEN}{msg}{Colors.ENDC}")
def print_error(msg):
print(f"{Colors.FAIL}{msg}{Colors.ENDC}")
def run_command(command, check=True):
try:
result = subprocess.run(
command,
shell=True,
check=check,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
return result.stdout.strip()
except subprocess.CalledProcessError as e:
if check:
print_error(f"Command failed: {command}")
print(e.stderr)
sys.exit(1)
return None
def verify_gcloud_login():
"""Ensures user is logged in to gcloud."""
account = run_command("gcloud config get-value account", check=False)
if not account or account == "(unset)":
print_error("You are not logged in to gcloud.")
print(" Run: gcloud auth login")
sys.exit(1)
project = run_command("gcloud config get-value project", check=False)
if project != PROJECT_ID:
print(f"{Colors.WARNING}⚠️ Current gcloud project is '{project}', but this script targets '{PROJECT_ID}'.{Colors.ENDC}")
confirm = input(" Continue anyway? (y/n): ")
if confirm.lower() != 'y':
sys.exit(1)
def enable_api():
"""Ensures Secret Manager API is enabled."""
print("🔍 Checking API status...")
run_command(f"gcloud services enable secretmanager.googleapis.com --project={PROJECT_ID}", check=False)
def get_secret_status(secret_id):
"""Returns 'MISSING', 'EMPTY' (no versions), or 'READY'."""
exists_cmd = f"gcloud secrets describe {secret_id} --project={PROJECT_ID} --format=json"
meta = run_command(exists_cmd, check=False)
if not meta:
return "MISSING"
# Check for versions
versions_cmd = f"gcloud secrets versions list {secret_id} --project={PROJECT_ID} --limit=1 --filter='state=ENABLED' --format=json"
versions = run_command(versions_cmd, check=False)
if not versions or versions == "[]":
return "EMPTY"
return "READY"
def create_secret_resource(secret_id):
print(f" creating secret container '{secret_id}'...")
run_command(f"gcloud secrets create {secret_id} --replication-policy=automatic --project={PROJECT_ID}")
def add_secret_version(secret_name, value):
process = subprocess.Popen(
f"gcloud secrets versions add {secret_name} --data-file=- --project={PROJECT_ID}",
shell=True,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
stdout, stderr = process.communicate(input=value)
if process.returncode != 0:
print_error(f"Failed to add version: {stderr}")
sys.exit(1)
def cleanup_old_versions(secret_name):
cmd = f"gcloud secrets versions list {secret_name} --project={PROJECT_ID} --limit=10 --filter='state!=DESTROYED' --format=json"
output = run_command(cmd, check=False)
if not output:
return
versions = json.loads(output)
if len(versions) <= 1:
return
# Keep latest enabled
# The API returns sorted list usually, but let's be safe: assume index 0 is latest
# Actually, we should just keep the one we just made.
print(f" 🧹 Cleaning up old versions for clean state...")
# Skip the first one
for v in versions[1:]:
version_id = v['name'].split('/')[-1]
run_command(f"gcloud secrets versions destroy {version_id} --secret={secret_name} --project={PROJECT_ID} --quiet", check=False)
# --- Main Logic ---
def main():
print_header("Haumdaucher Secret Manager Setup")
verify_gcloud_login()
enable_api()
# Main Loop ensuring state
while True:
all_ready = True
# 1. Audit State
print_header("Current Status Audit")
status_map = {}
for secret_id in SECRETS.keys():
status = get_secret_status(secret_id)
status_map[secret_id] = status
icon = "" if status == "READY" else ""
msg = f"{secret_id}: {status}"
if status == "READY":
print(f"{Colors.OKGREEN}{icon} {msg}{Colors.ENDC}")
else:
print(f"{Colors.FAIL}{icon} {msg}{Colors.ENDC}")
all_ready = False
if all_ready:
print_success("All secrets are configured correctly!")
break
# 2. Fix Missing Items
print_header("Action Required")
for secret_id, config in SECRETS.items():
status = status_map[secret_id]
if status == "READY":
continue
print_step(f"Configuring {config['desc']} ({secret_id})")
if status == "MISSING":
create_secret_resource(secret_id)
print(f"{Colors.WARNING}INSTRUCTIONS:{Colors.ENDC}")
print(config['instructions'])
print("")
import getpass
value = getpass.getpass(prompt=f"{Colors.BOLD}Paste {config['desc']} here (hidden): {Colors.ENDC}")
if not value.strip():
print_error("Empty value provided. Retrying audit...")
continue
add_secret_version(secret_id, value)
cleanup_old_versions(secret_id)
print_success("Secret updated.")
print("\n🔄 Re-verifying state...")
# Loop continues to verify
print_header("Next Steps")
print("1. Infrastructure is ready.")
print("2. Run Terraform to apply the changes:")
print(f"\n {Colors.OKBLUE}cd terraform && terraform apply{Colors.ENDC}\n")
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\n\n⚠️ Aborted by user.")
sys.exit(130)