132 lines
4.3 KiB
Python
Executable File
132 lines
4.3 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
import subprocess
|
|
import argparse
|
|
import sys
|
|
import json
|
|
|
|
# Configuration
|
|
PROJECT_ID = "haumdaucher" # Could fetch from gcloud config, but hardcoding for project context
|
|
SECRETS = {
|
|
"haumdaucher-oauth-client-id": "Google OAuth Client ID",
|
|
"haumdaucher-oauth-client-secret": "Google OAuth Client Secret"
|
|
}
|
|
|
|
def run_command(command, check=True):
|
|
"""Runs a shell command and returns the output."""
|
|
try:
|
|
result = subprocess.run(
|
|
command,
|
|
shell=True,
|
|
check=check,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True
|
|
)
|
|
return result.stdout.strip()
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"❌ Error running command: {command}")
|
|
print(f" Stderr: {e.stderr}")
|
|
if check:
|
|
sys.exit(1)
|
|
return None
|
|
|
|
def check_secret_exists(secret_name):
|
|
"""Checks if a secret exists."""
|
|
cmd = f"gcloud secrets describe {secret_name} --project={PROJECT_ID} --format=json"
|
|
result = run_command(cmd, check=False)
|
|
return result is not None
|
|
|
|
def create_secret(secret_name):
|
|
"""Creates a new secret."""
|
|
print(f" Creating secret '{secret_name}'...")
|
|
cmd = f"gcloud secrets create {secret_name} --replication-policy=automatic --project={PROJECT_ID}"
|
|
run_command(cmd)
|
|
|
|
def add_secret_version(secret_name, payload):
|
|
"""Adds a new version to the secret."""
|
|
print(f" Adding new version to '{secret_name}'...")
|
|
# Passing payload via stdin to avoid shell history logging
|
|
process = subprocess.Popen(
|
|
f"gcloud secrets versions add {secret_name} --data-file=- --project={PROJECT_ID}",
|
|
shell=True,
|
|
stdin=subprocess.PIPE,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True
|
|
)
|
|
stdout, stderr = process.communicate(input=payload)
|
|
if process.returncode != 0:
|
|
print(f"❌ Failed to add version: {stderr}")
|
|
sys.exit(1)
|
|
print(" ✅ Version added.")
|
|
|
|
def cleanup_old_versions(secret_name):
|
|
"""Destroys old versions, keeping only the latest enabled one."""
|
|
print(f" Cleaning up old versions for '{secret_name}'...")
|
|
cmd = f"gcloud secrets versions list {secret_name} --project={PROJECT_ID} --limit=10 --format=json"
|
|
output = run_command(cmd)
|
|
if not output:
|
|
return
|
|
|
|
versions = json.loads(output)
|
|
# Sort by createTime desc (latest first)
|
|
# Filter for ENABLED versions usually, but we want to destroy everything except latest.
|
|
|
|
# We keep the one with state ENABLED that is most recent?
|
|
# Usually the just-added one is ENABLED.
|
|
|
|
if len(versions) <= 1:
|
|
print(" No old versions to cleanup.")
|
|
return
|
|
|
|
# Keep the latest one (index 0)
|
|
latest = versions[0]
|
|
to_destroy = versions[1:]
|
|
|
|
for v in to_destroy:
|
|
state = v.get('state')
|
|
if state == 'DESTROYED':
|
|
continue
|
|
|
|
version_id = v['name'].split('/')[-1]
|
|
print(f" destroying old version {version_id} ({state})...")
|
|
destroy_cmd = f"gcloud secrets versions destroy {version_id} --secret={secret_name} --project={PROJECT_ID} --quiet"
|
|
run_command(destroy_cmd)
|
|
|
|
def main():
|
|
print(f"🔐 Haumdaucher Secret Manager Tool")
|
|
print(f" Project: {PROJECT_ID}")
|
|
|
|
# Ensure API enabled
|
|
print("🔍 Checking Secret Manager API...")
|
|
run_command(f"gcloud services enable secretmanager.googleapis.com --project={PROJECT_ID}", check=False)
|
|
|
|
for secret_id, description in SECRETS.items():
|
|
print(f"\n👉 Secret: {secret_id} ({description})")
|
|
|
|
# Check existence
|
|
if not check_secret_exists(secret_id):
|
|
create_secret(secret_id)
|
|
|
|
# Prompt for value
|
|
print(f" Enter value for {description} (Input hidden): ")
|
|
# Use getpass logic via manual read to support all shells or just input() but masked?
|
|
# getpass in python is better.
|
|
import getpass
|
|
value = getpass.getpass(prompt=" > ")
|
|
|
|
if not value:
|
|
print(" ⚠️ Skipping update (empty input).")
|
|
continue
|
|
|
|
# Add version
|
|
add_secret_version(secret_id, value)
|
|
|
|
# Lifecycle: Destroy old
|
|
cleanup_old_versions(secret_id)
|
|
|
|
print("\n✅ All secrets updated successfully.")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|