import datetime
from pathlib import Path
from typing import List, Dict, Optional, Set
from pyappconf import BaseConfig, AppConfig, ConfigFormats
from pydantic import BaseModel, Field
from github_secrets.exc import (
RepositoryNotInSecretsException,
RepositorySecretDoesNotExistException,
GlobalSecretDoesNotExistException,
SecretHasNotBeenSyncedException,
ProfileDoesNotExistException,
)
from github_secrets import exc
APP_NAME = "GithubSecrets"
[docs]class Secret(BaseModel):
name: str
value: str
created: datetime.datetime = Field(default_factory=lambda: datetime.datetime.now())
updated: datetime.datetime = Field(default_factory=lambda: datetime.datetime.now())
[docs] def update(self, value: str):
self.value = value
self.updated = datetime.datetime.now()
[docs]class SyncConfig(BaseModel):
secret_name: str
repository: Optional[str] = None
def __hash__(self):
return hash((self.secret_name, self.repository))
@property
def global_(self) -> bool:
return self.repository is None
[docs]class RepositorySecrets(BaseModel):
secrets: Dict[str, List[Secret]] = Field(default_factory=lambda: {})
@property
def sync_configs(self) -> Set[SyncConfig]:
all_configs: Set[SyncConfig] = set()
for repo, secrets in self.secrets.items():
all_configs.update(
[SyncConfig(secret_name=sec.name, repository=repo) for sec in secrets]
)
return all_configs
[docs] def add_secret(self, secret: Secret, repository: str) -> bool:
if repository not in self.secrets:
self.secrets[repository] = []
if self.repository_has_secret(secret.name, repository):
self.update_secret(secret, repository)
return False
else:
self.secrets[repository].append(secret)
return True
[docs] def repository_has_secret(self, name: str, repository: str):
if repository not in self.secrets:
raise RepositoryNotInSecretsException(
f"repository {repository} does not exist"
)
for secret in self.secrets[repository]:
if secret.name == name:
return True
return False
[docs] def get_secret(self, name: str, repository: str):
if repository not in self.secrets:
raise RepositoryNotInSecretsException(
f"repository {repository} does not exist"
)
for secret in self.secrets[repository]:
if secret.name == name:
return secret
raise RepositorySecretDoesNotExistException(
f"repository {repository} does not have secret with name {name}"
)
[docs] def remove_secret(self, name: str, repository: str):
if repository not in self.secrets:
raise RepositoryNotInSecretsException(
f"repository {repository} does not exist"
)
new_secrets: List[Secret] = []
for secret in self.secrets[repository]:
if secret.name != name:
new_secrets.append(secret)
self.secrets[repository] = new_secrets
[docs] def update_secret(self, secret: Secret, repository: str):
if repository not in self.secrets:
raise RepositoryNotInSecretsException(
f"repository {repository} does not exist"
)
updated = False
for existing_secret in self.secrets[repository]:
if existing_secret.name == secret.name:
existing_secret.update(secret.value)
updated = True
break
if not updated:
raise RepositorySecretDoesNotExistException(
f"no existing secret for {repository} with name {secret.name}"
)
[docs]class GlobalSecrets(BaseModel):
secrets: List[Secret] = Field(default_factory=lambda: [])
@property
def sync_configs(self) -> Set[SyncConfig]:
return {SyncConfig(secret_name=sec.name) for sec in self.secrets}
[docs] def add_secret(self, secret: Secret):
if self.has_secret(secret.name):
self.update_secret(secret)
return False
else:
self.secrets.append(secret)
return True
[docs] def has_secret(self, name: str):
for secret in self.secrets:
if secret.name == name:
return True
return False
[docs] def get_secret(self, name: str) -> Secret:
for secret in self.secrets:
if secret.name == name:
return secret
raise GlobalSecretDoesNotExistException(
f"secret with name {name} does not exist in global secrets"
)
[docs] def remove_secret(self, name: str):
new_secrets: List[Secret] = []
for secret in self.secrets:
if secret.name != name:
new_secrets.append(secret)
self.secrets = new_secrets
[docs] def update_secret(self, secret: Secret):
updated = False
for existing_secret in self.secrets:
if existing_secret.name == secret.name:
existing_secret.update(secret.value)
updated = True
break
if not updated:
raise GlobalSecretDoesNotExistException(
f"no existing global secret with name {secret.name}"
)
[docs]class SyncRecord(BaseModel):
secret_name: str
last_updated: datetime.datetime = Field(
default_factory=lambda: datetime.datetime.now()
)
[docs]class SecretsConfig(BaseConfig):
github_token: str = ""
include_repositories: Optional[List[str]] = None
exclude_repositories: Optional[List[str]] = None
global_secrets: GlobalSecrets = Field(default_factory=lambda: GlobalSecrets())
repository_secrets: RepositorySecrets = Field(
default_factory=lambda: RepositorySecrets()
)
repository_secrets_last_synced: Dict[str, List[SyncRecord]] = Field(
default_factory=lambda: {}
)
_settings = AppConfig(
app_name=APP_NAME, default_format=ConfigFormats.YAML, config_name="default"
)
@property
def sync_configs(self) -> Set[SyncConfig]:
return self.global_secrets.sync_configs.union(
self.repository_secrets.sync_configs
)
@property
def repositories(self) -> List[str]:
from github_secrets.git import get_repository_names
if self.include_repositories is not None:
return self.include_repositories
if not self.github_token:
raise ValueError("need to set github token")
repositories = get_repository_names(self.github_token)
if self.exclude_repositories is not None:
repositories = [
repo for repo in repositories if repo not in self.exclude_repositories
]
return repositories
@property
def new_repositories(self) -> List[str]:
from github_secrets.git import get_repository_names
if not self.github_token:
raise ValueError("need to set github token")
repositories = get_repository_names(self.github_token)
if self.exclude_repositories:
repositories = [
repo for repo in repositories if repo not in self.exclude_repositories
]
if not self.include_repositories:
new_repositories = repositories
else:
new_repositories = [
repo for repo in repositories if repo not in self.include_repositories
]
return new_repositories
@property
def unsynced_secrets(self) -> List[SyncConfig]:
global_secrets = self.global_secrets.secrets
all_sync_configs: List[SyncConfig] = []
for repo in self.repositories:
if repo in self.repository_secrets.secrets:
repo_secrets = self.repository_secrets.secrets[repo]
else:
repo_secrets = []
all_secrets = global_secrets + repo_secrets
repo_unsync_secrets: List[Secret]
if repo not in self.repository_secrets_last_synced:
repo_unsync_secrets = all_secrets
else:
repo_unsync_secrets = []
existing_secrets = [
sec.secret_name for sec in self.repository_secrets_last_synced[repo]
]
for secret in all_secrets:
if secret.name not in existing_secrets:
repo_unsync_secrets.append(secret)
else:
if secret.updated > self.secret_last_synced(secret.name, repo):
repo_unsync_secrets.append(secret)
sync_configs = [
SyncConfig(secret_name=sec.name, repository=repo)
for sec in repo_unsync_secrets
]
all_sync_configs.extend(sync_configs)
return all_sync_configs
[docs] def bootstrap_repositories(self) -> Set[str]:
from github_secrets.git import get_repository_names
if not self.github_token:
raise ValueError("need to set github token")
repositories = get_repository_names(self.github_token)
if self.exclude_repositories:
repositories = [
repo for repo in repositories if repo not in self.exclude_repositories
]
if self.include_repositories:
include_repos = self.include_repositories
else:
include_repos = []
new_repositories = set(repositories).difference(include_repos)
self.include_repositories = repositories
return new_repositories
[docs] def secret_last_synced(self, name: str, repository: str) -> datetime.datetime:
if repository not in self.repository_secrets_last_synced:
raise SecretHasNotBeenSyncedException(
f"have not previously synced to repository {repository}"
)
for record in self.repository_secrets_last_synced[repository]:
if record.secret_name == name:
return record.last_updated
raise SecretHasNotBeenSyncedException(
f"secret {name} has not been previously synced to repository {repository}"
)
[docs] def record_sync_for_repo(self, secret: Secret, repository: str) -> bool:
sync_record = SyncRecord(secret_name=secret.name)
if repository not in self.repository_secrets_last_synced:
self.repository_secrets_last_synced[repository] = []
updated = False
# Try update
for record in self.repository_secrets_last_synced[repository]:
if record.secret_name == sync_record.secret_name:
record.last_updated = sync_record.last_updated
updated = True
if not updated:
# Create case
self.repository_secrets_last_synced[repository].append(sync_record)
return not updated
[docs] def record_sync_for_all_repos_and_secrets(self):
repos = self.repositories
for sync_config in self.sync_configs:
secret = Secret(name=sync_config.secret_name, value="does not matter")
if sync_config.global_:
for repo in repos:
self.record_sync_for_repo(secret, repo)
else:
self.record_sync_for_repo(secret, sync_config.repository)
[docs] def add_repository(self, name: str):
if self.include_repositories and name in self.include_repositories:
raise exc.RepositoryAlreadyExistsException(name)
if self.exclude_repositories and name in self.exclude_repositories:
raise exc.RepositoryIsExcludedException(name)
if self.include_repositories is None:
self.include_repositories = []
self.include_repositories.append(name)
[docs] def remove_repository(self, name: str):
if not self.include_repositories or name not in self.include_repositories:
raise exc.RepositoryDoesNotExistException(name)
self.include_repositories.remove(name)
[docs] def add_exclude_repository(self, name: str):
if self.include_repositories and name in self.include_repositories:
raise exc.RepositoryIsIncludedException(name)
if self.exclude_repositories and name in self.exclude_repositories:
raise exc.RepositoryIsExcludedException(name)
if self.exclude_repositories is None:
self.exclude_repositories = []
self.exclude_repositories.append(name)
[docs] def remove_exclude_repository(self, name: str):
if not self.exclude_repositories or name not in self.exclude_repositories:
raise exc.RepositoryDoesNotExistException(name)
self.exclude_repositories.remove(name)
[docs] class Config:
env_prefix = "GH_SECRETS_"
[docs]class Profile(BaseModel):
name: str
config_path: Path
DEFAULT_SECRETS_CONFIG_PATH = SecretsConfig._settings_with_overrides(
config_name="default"
).config_location
DEFAULT_PROFILE = Profile(name="default", config_path=DEFAULT_SECRETS_CONFIG_PATH)
[docs]class SecretsAppConfig(BaseConfig):
current_profile: Profile = DEFAULT_PROFILE
profiles: List[Profile] = Field(default_factory=lambda: [DEFAULT_PROFILE])
_settings = AppConfig(
app_name=APP_NAME, default_format=ConfigFormats.YAML, config_name="app"
)
[docs] def profile_exists(self, name: str):
for profile in self.profiles:
if profile.name == name:
return True
return False
[docs] def get_profile(self, name: str) -> Profile:
for profile in self.profiles:
if profile.name == name:
return profile
raise ProfileDoesNotExistException(f"no profile with name {name}")
[docs] def add_profile(self, name: str, path: Optional[Path] = None):
if path is None:
path = SecretsConfig._settings_with_overrides(
config_name=name
).config_location
profile = Profile(name=name, config_path=path)
self.profiles.append(profile)
[docs] def set_profile(self, name: str):
profile = self.get_profile(name)
self.current_profile = profile
[docs] def delete_profile(self, name: str):
new_profiles: List[Profile] = []
for profile in self.profiles:
if profile.name != name:
new_profiles.append(profile)
self.profiles = new_profiles