138 lines
4.7 KiB
Python
Executable File
138 lines
4.7 KiB
Python
Executable File
#!/usr/bin/env python3
|
||
import subprocess
|
||
import argparse
|
||
import sys
|
||
import json
|
||
|
||
# Configuration
|
||
PROJECT_ID = "haumdaucher" # Could fetch from gcloud config, but hardcoding for project context
|
||
SECRETS = {
|
||
"haumdaucher-oauth-client-id": "Google OAuth Client ID",
|
||
"haumdaucher-oauth-client-secret": "Google OAuth Client Secret"
|
||
}
|
||
|
||
def run_command(command, check=True):
|
||
"""Runs a shell command and returns the output."""
|
||
try:
|
||
result = subprocess.run(
|
||
command,
|
||
shell=True,
|
||
check=check,
|
||
stdout=subprocess.PIPE,
|
||
stderr=subprocess.PIPE,
|
||
text=True
|
||
)
|
||
return result.stdout.strip()
|
||
except subprocess.CalledProcessError as e:
|
||
print(f"❌ Error running command: {command}")
|
||
print(f" Stderr: {e.stderr}")
|
||
if check:
|
||
sys.exit(1)
|
||
return None
|
||
|
||
def check_secret_exists(secret_name):
|
||
"""Checks if a secret exists."""
|
||
cmd = f"gcloud secrets describe {secret_name} --project={PROJECT_ID} --format=json"
|
||
result = run_command(cmd, check=False)
|
||
return result is not None
|
||
|
||
def create_secret(secret_name):
|
||
"""Creates a new secret."""
|
||
print(f" Creating secret '{secret_name}'...")
|
||
cmd = f"gcloud secrets create {secret_name} --replication-policy=automatic --project={PROJECT_ID}"
|
||
run_command(cmd)
|
||
|
||
def add_secret_version(secret_name, payload):
|
||
"""Adds a new version to the secret."""
|
||
print(f" Adding new version to '{secret_name}'...")
|
||
# Passing payload via stdin to avoid shell history logging
|
||
process = subprocess.Popen(
|
||
f"gcloud secrets versions add {secret_name} --data-file=- --project={PROJECT_ID}",
|
||
shell=True,
|
||
stdin=subprocess.PIPE,
|
||
stdout=subprocess.PIPE,
|
||
stderr=subprocess.PIPE,
|
||
text=True
|
||
)
|
||
stdout, stderr = process.communicate(input=payload)
|
||
if process.returncode != 0:
|
||
print(f"❌ Failed to add version: {stderr}")
|
||
sys.exit(1)
|
||
print(" ✅ Version added.")
|
||
|
||
def cleanup_old_versions(secret_name):
|
||
"""Destroys old versions, keeping only the latest enabled one."""
|
||
print(f" Cleaning up old versions for '{secret_name}'...")
|
||
cmd = f"gcloud secrets versions list {secret_name} --project={PROJECT_ID} --limit=10 --format=json"
|
||
output = run_command(cmd)
|
||
if not output:
|
||
return
|
||
|
||
versions = json.loads(output)
|
||
# Sort by createTime desc (latest first)
|
||
# Filter for ENABLED versions usually, but we want to destroy everything except latest.
|
||
|
||
# We keep the one with state ENABLED that is most recent?
|
||
# Usually the just-added one is ENABLED.
|
||
|
||
if len(versions) <= 1:
|
||
print(" No old versions to cleanup.")
|
||
return
|
||
|
||
# Keep the latest one (index 0)
|
||
latest = versions[0]
|
||
to_destroy = versions[1:]
|
||
|
||
for v in to_destroy:
|
||
state = v.get('state')
|
||
if state == 'DESTROYED':
|
||
continue
|
||
|
||
version_id = v['name'].split('/')[-1]
|
||
print(f" destroying old version {version_id} ({state})...")
|
||
destroy_cmd = f"gcloud secrets versions destroy {version_id} --secret={secret_name} --project={PROJECT_ID} --quiet"
|
||
run_command(destroy_cmd)
|
||
|
||
def main():
|
||
print(f"🔐 Haumdaucher Secret Manager Tool")
|
||
print(f" Project: {PROJECT_ID}")
|
||
|
||
# Ensure API enabled
|
||
print("🔍 Checking Secret Manager API...")
|
||
run_command(f"gcloud services enable secretmanager.googleapis.com --project={PROJECT_ID}", check=False)
|
||
|
||
for secret_id, description in SECRETS.items():
|
||
print(f"\n👉 Secret: {secret_id} ({description})")
|
||
|
||
# Check existence
|
||
if not check_secret_exists(secret_id):
|
||
create_secret(secret_id)
|
||
|
||
# Prompt for value
|
||
if secret_id == "haumdaucher-oauth-client-id":
|
||
print(f" ℹ️ Go to GCP Console > Credentials > OAuth 2.0 Client IDs")
|
||
print(f" Copy the 'Client ID' (ends in .apps.googleusercontent.com)")
|
||
elif secret_id == "haumdaucher-oauth-client-secret":
|
||
print(f" ℹ️ Copy the 'Client Secret' (shorter string, hidden in console)")
|
||
|
||
print(f" Enter value for {description} (Input hidden): ")
|
||
# Use getpass logic via manual read to support all shells or just input() but masked?
|
||
# getpass in python is better.
|
||
import getpass
|
||
value = getpass.getpass(prompt=" > ")
|
||
|
||
if not value:
|
||
print(" ⚠️ Skipping update (empty input).")
|
||
continue
|
||
|
||
# Add version
|
||
add_secret_version(secret_id, value)
|
||
|
||
# Lifecycle: Destroy old
|
||
cleanup_old_versions(secret_id)
|
||
|
||
print("\n✅ All secrets updated successfully.")
|
||
|
||
if __name__ == "__main__":
|
||
main()
|