fix(registry): delete container images on project teardown
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed
Root cause of DIGEST_INVALID errors was registry disk exhaustion. Project teardown wasn't cleaning up container images, causing the registry PVC to fill up over time. Changes: - Add RegistryProvider port interface for registry operations - Extend zot.Client with DeleteProjectRepositories method - Wire registry provider into ProjectInfraService - Delete images during DeleteProject cleanup (step 4) The zot client uses the OCI distribution API: - Lists all repos, filters by project prefix - Gets manifest digests via HEAD request - Deletes manifests by digest to trigger GC Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
f20fc6c51c
commit
4486042155
@ -201,6 +201,18 @@ func main() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Initialize registry client (for monitoring and image cleanup on project teardown)
|
||||||
|
var registryClient *zot.Client
|
||||||
|
if infraCfg.RegistryURL != "" {
|
||||||
|
registryURL := infraCfg.RegistryURL
|
||||||
|
// Ensure URL has protocol
|
||||||
|
if !strings.HasPrefix(registryURL, "http") {
|
||||||
|
registryURL = "https://" + registryURL
|
||||||
|
}
|
||||||
|
registryClient = zot.NewClient(registryURL).WithLogger(logger)
|
||||||
|
logger.Info("registry client initialized", "url", registryURL)
|
||||||
|
}
|
||||||
|
|
||||||
// Initialize CodeAgent registry (multi-provider support)
|
// Initialize CodeAgent registry (multi-provider support)
|
||||||
agentRegistry := codeagent.NewRegistry()
|
agentRegistry := codeagent.NewRegistry()
|
||||||
|
|
||||||
@ -348,13 +360,16 @@ func main() {
|
|||||||
ClusterIP: infraCfg.ClusterIP,
|
ClusterIP: infraCfg.ClusterIP,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
// Wire optional database and cache provisioners
|
// Wire optional database, cache, and registry provisioners
|
||||||
if dbProvisioner != nil {
|
if dbProvisioner != nil {
|
||||||
projectInfraService = projectInfraService.WithDatabaseProvisioner(dbProvisioner)
|
projectInfraService = projectInfraService.WithDatabaseProvisioner(dbProvisioner)
|
||||||
}
|
}
|
||||||
if cacheProvisioner != nil {
|
if cacheProvisioner != nil {
|
||||||
projectInfraService = projectInfraService.WithCacheProvisioner(cacheProvisioner)
|
projectInfraService = projectInfraService.WithCacheProvisioner(cacheProvisioner)
|
||||||
}
|
}
|
||||||
|
if registryClient != nil {
|
||||||
|
projectInfraService = projectInfraService.WithRegistryProvider(registryClient)
|
||||||
|
}
|
||||||
|
|
||||||
// Create domain service adapter for infrastructure handler
|
// Create domain service adapter for infrastructure handler
|
||||||
domainServiceAdapter := handlers.NewDomainServiceAdapter(projectInfraService)
|
domainServiceAdapter := handlers.NewDomainServiceAdapter(projectInfraService)
|
||||||
@ -457,22 +472,10 @@ func main() {
|
|||||||
// Initialize operations handler (for debugging project failures)
|
// Initialize operations handler (for debugging project failures)
|
||||||
operationsHandler := handlers.NewOperationsHandler(operationRepo)
|
operationsHandler := handlers.NewOperationsHandler(operationRepo)
|
||||||
|
|
||||||
// Initialize registry health checker (for monitoring)
|
|
||||||
var registryChecker *zot.Client
|
|
||||||
if infraCfg.RegistryURL != "" {
|
|
||||||
registryURL := infraCfg.RegistryURL
|
|
||||||
// Ensure URL has protocol
|
|
||||||
if !strings.HasPrefix(registryURL, "http") {
|
|
||||||
registryURL = "https://" + registryURL
|
|
||||||
}
|
|
||||||
registryChecker = zot.NewClient(registryURL)
|
|
||||||
logger.Info("registry health checker initialized", "url", registryURL)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Initialize diagnostics service (aggregates health data for debugging)
|
// Initialize diagnostics service (aggregates health data for debugging)
|
||||||
diagnosticsService := service.NewDiagnosticsService(
|
diagnosticsService := service.NewDiagnosticsService(
|
||||||
operationRepo,
|
operationRepo,
|
||||||
registryChecker,
|
registryClient,
|
||||||
woodpeckerClient,
|
woodpeckerClient,
|
||||||
service.DiagnosticsServiceConfig{
|
service.DiagnosticsServiceConfig{
|
||||||
DefaultGitOwner: infraCfg.GiteaDefaultOrg,
|
DefaultGitOwner: infraCfg.GiteaDefaultOrg,
|
||||||
@ -482,9 +485,9 @@ func main() {
|
|||||||
|
|
||||||
// Initialize external health checker (background monitoring of registry, CI, git)
|
// Initialize external health checker (background monitoring of registry, CI, git)
|
||||||
var externalHealthChecker *worker.ExternalHealthChecker
|
var externalHealthChecker *worker.ExternalHealthChecker
|
||||||
if registryChecker != nil || woodpeckerClient != nil || giteaClient != nil {
|
if registryClient != nil || woodpeckerClient != nil || giteaClient != nil {
|
||||||
externalHealthChecker = worker.NewExternalHealthChecker(
|
externalHealthChecker = worker.NewExternalHealthChecker(
|
||||||
registryChecker,
|
registryClient,
|
||||||
woodpeckerClient,
|
woodpeckerClient,
|
||||||
giteaClient,
|
giteaClient,
|
||||||
worker.ExternalHealthConfig{
|
worker.ExternalHealthConfig{
|
||||||
@ -498,8 +501,8 @@ func main() {
|
|||||||
// Override default health/ready endpoints with full dependency checks
|
// Override default health/ready endpoints with full dependency checks
|
||||||
healthHandler := handlers.NewHealthHandler("rdev-api", database.DB, nil).
|
healthHandler := handlers.NewHealthHandler("rdev-api", database.DB, nil).
|
||||||
WithAgentRegistry(agentRegistry)
|
WithAgentRegistry(agentRegistry)
|
||||||
if registryChecker != nil {
|
if registryClient != nil {
|
||||||
healthHandler = healthHandler.WithRegistryChecker(registryChecker)
|
healthHandler = healthHandler.WithRegistryChecker(registryClient)
|
||||||
}
|
}
|
||||||
if externalHealthChecker != nil {
|
if externalHealthChecker != nil {
|
||||||
healthHandler = healthHandler.WithExternalHealthChecker(externalHealthChecker)
|
healthHandler = healthHandler.WithExternalHealthChecker(externalHealthChecker)
|
||||||
|
|||||||
@ -1,32 +1,48 @@
|
|||||||
// Package zot provides a client for checking zot container registry health.
|
// Package zot provides a client for interacting with the zot container registry.
|
||||||
package zot
|
package zot
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"log/slog"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/orchard9/rdev/internal/domain"
|
"github.com/orchard9/rdev/internal/domain"
|
||||||
|
"github.com/orchard9/rdev/internal/port"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Client checks zot registry health via the OCI /v2/ endpoint.
|
// Client interacts with the zot container registry.
|
||||||
type Client struct {
|
type Client struct {
|
||||||
url string
|
url string
|
||||||
httpClient *http.Client
|
httpClient *http.Client
|
||||||
|
logger *slog.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewClient creates a new zot health checker.
|
// Ensure Client implements port.RegistryProvider at compile time.
|
||||||
|
var _ port.RegistryProvider = (*Client)(nil)
|
||||||
|
|
||||||
|
// NewClient creates a new zot client.
|
||||||
// The URL should be the registry base URL (e.g., "https://registry.threesix.ai").
|
// The URL should be the registry base URL (e.g., "https://registry.threesix.ai").
|
||||||
func NewClient(url string) *Client {
|
func NewClient(url string) *Client {
|
||||||
return &Client{
|
return &Client{
|
||||||
url: url,
|
url: url,
|
||||||
httpClient: &http.Client{
|
httpClient: &http.Client{
|
||||||
Timeout: 5 * time.Second,
|
Timeout: 30 * time.Second,
|
||||||
},
|
},
|
||||||
|
logger: slog.Default(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithLogger sets the logger for the client.
|
||||||
|
func (c *Client) WithLogger(logger *slog.Logger) *Client {
|
||||||
|
c.logger = logger
|
||||||
|
return c
|
||||||
|
}
|
||||||
|
|
||||||
// Check returns the health status of the registry.
|
// Check returns the health status of the registry.
|
||||||
// A 200 or 401 response indicates the registry is healthy (401 means auth required but registry is up).
|
// A 200 or 401 response indicates the registry is healthy (401 means auth required but registry is up).
|
||||||
func (c *Client) Check(ctx context.Context) domain.RegistryStatus {
|
func (c *Client) Check(ctx context.Context) domain.RegistryStatus {
|
||||||
@ -72,3 +88,180 @@ func (c *Client) Check(ctx context.Context) domain.RegistryStatus {
|
|||||||
|
|
||||||
return status
|
return status
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// catalogResponse is the OCI catalog API response.
|
||||||
|
type catalogResponse struct {
|
||||||
|
Repositories []string `json:"repositories"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// tagsResponse is the OCI tags list API response.
|
||||||
|
type tagsResponse struct {
|
||||||
|
Name string `json:"name"`
|
||||||
|
Tags []string `json:"tags"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// ListRepositories returns all repositories in the registry.
|
||||||
|
func (c *Client) ListRepositories(ctx context.Context) ([]string, error) {
|
||||||
|
req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.url+"/v2/_catalog", nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("create request: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := c.httpClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("request failed: %w", err)
|
||||||
|
}
|
||||||
|
defer func() { _ = resp.Body.Close() }()
|
||||||
|
|
||||||
|
if resp.StatusCode != http.StatusOK {
|
||||||
|
return nil, fmt.Errorf("unexpected status: %d", resp.StatusCode)
|
||||||
|
}
|
||||||
|
|
||||||
|
body, err := io.ReadAll(resp.Body)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("read body: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var catalog catalogResponse
|
||||||
|
if err := json.Unmarshal(body, &catalog); err != nil {
|
||||||
|
return nil, fmt.Errorf("parse catalog: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return catalog.Repositories, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ListProjectRepositories returns all repositories for a specific project.
|
||||||
|
// This includes both the main repo and sub-repos like cache.
|
||||||
|
func (c *Client) ListProjectRepositories(ctx context.Context, projectID string) ([]string, error) {
|
||||||
|
allRepos, err := c.ListRepositories(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var projectRepos []string
|
||||||
|
prefix := projectID + "/"
|
||||||
|
for _, repo := range allRepos {
|
||||||
|
if repo == projectID || strings.HasPrefix(repo, prefix) {
|
||||||
|
projectRepos = append(projectRepos, repo)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return projectRepos, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeleteRepository deletes all tags and manifests for a repository.
|
||||||
|
// This triggers Zot's garbage collection to reclaim storage.
|
||||||
|
func (c *Client) DeleteRepository(ctx context.Context, repo string) error {
|
||||||
|
// List all tags for the repository
|
||||||
|
tags, err := c.listTags(ctx, repo)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("list tags for %s: %w", repo, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete each tag's manifest
|
||||||
|
for _, tag := range tags {
|
||||||
|
if err := c.deleteManifest(ctx, repo, tag); err != nil {
|
||||||
|
c.logger.Warn("failed to delete manifest", "repo", repo, "tag", tag, "error", err)
|
||||||
|
// Continue with other tags
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
c.logger.Info("deleted repository", "repo", repo, "tags_deleted", len(tags))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeleteProjectRepositories deletes all repositories for a project.
|
||||||
|
func (c *Client) DeleteProjectRepositories(ctx context.Context, projectID string) error {
|
||||||
|
repos, err := c.ListProjectRepositories(ctx, projectID)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("list project repos: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, repo := range repos {
|
||||||
|
if err := c.DeleteRepository(ctx, repo); err != nil {
|
||||||
|
c.logger.Warn("failed to delete repo", "repo", repo, "error", err)
|
||||||
|
// Continue with other repos
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
c.logger.Info("deleted project repositories", "project", projectID, "repos_deleted", len(repos))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// listTags returns all tags for a repository.
|
||||||
|
func (c *Client) listTags(ctx context.Context, repo string) ([]string, error) {
|
||||||
|
req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.url+"/v2/"+repo+"/tags/list", nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("create request: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := c.httpClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("request failed: %w", err)
|
||||||
|
}
|
||||||
|
defer func() { _ = resp.Body.Close() }()
|
||||||
|
|
||||||
|
if resp.StatusCode == http.StatusNotFound {
|
||||||
|
return nil, nil // Repo doesn't exist, nothing to delete
|
||||||
|
}
|
||||||
|
if resp.StatusCode != http.StatusOK {
|
||||||
|
return nil, fmt.Errorf("unexpected status: %d", resp.StatusCode)
|
||||||
|
}
|
||||||
|
|
||||||
|
body, err := io.ReadAll(resp.Body)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("read body: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var tags tagsResponse
|
||||||
|
if err := json.Unmarshal(body, &tags); err != nil {
|
||||||
|
return nil, fmt.Errorf("parse tags: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return tags.Tags, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// deleteManifest deletes a specific manifest by tag.
|
||||||
|
func (c *Client) deleteManifest(ctx context.Context, repo, tag string) error {
|
||||||
|
// First, get the manifest digest
|
||||||
|
headReq, err := http.NewRequestWithContext(ctx, http.MethodHead, c.url+"/v2/"+repo+"/manifests/"+tag, nil)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("create head request: %w", err)
|
||||||
|
}
|
||||||
|
headReq.Header.Set("Accept", "application/vnd.oci.image.manifest.v1+json, application/vnd.docker.distribution.manifest.v2+json")
|
||||||
|
|
||||||
|
headResp, err := c.httpClient.Do(headReq)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("head request failed: %w", err)
|
||||||
|
}
|
||||||
|
defer func() { _ = headResp.Body.Close() }()
|
||||||
|
|
||||||
|
if headResp.StatusCode == http.StatusNotFound {
|
||||||
|
return nil // Already deleted
|
||||||
|
}
|
||||||
|
if headResp.StatusCode != http.StatusOK {
|
||||||
|
return fmt.Errorf("head unexpected status: %d", headResp.StatusCode)
|
||||||
|
}
|
||||||
|
|
||||||
|
digest := headResp.Header.Get("Docker-Content-Digest")
|
||||||
|
if digest == "" {
|
||||||
|
return fmt.Errorf("no digest in response")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete by digest
|
||||||
|
delReq, err := http.NewRequestWithContext(ctx, http.MethodDelete, c.url+"/v2/"+repo+"/manifests/"+digest, nil)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("create delete request: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
delResp, err := c.httpClient.Do(delReq)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("delete request failed: %w", err)
|
||||||
|
}
|
||||||
|
defer func() { _ = delResp.Body.Close() }()
|
||||||
|
|
||||||
|
if delResp.StatusCode != http.StatusAccepted && delResp.StatusCode != http.StatusOK && delResp.StatusCode != http.StatusNotFound {
|
||||||
|
return fmt.Errorf("delete unexpected status: %d", delResp.StatusCode)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|||||||
28
internal/port/registry_provider.go
Normal file
28
internal/port/registry_provider.go
Normal file
@ -0,0 +1,28 @@
|
|||||||
|
// Package port defines interfaces (ports) for external dependencies.
|
||||||
|
package port
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/orchard9/rdev/internal/domain"
|
||||||
|
)
|
||||||
|
|
||||||
|
// RegistryProvider manages container registry operations.
|
||||||
|
type RegistryProvider interface {
|
||||||
|
// Check returns the health status of the registry.
|
||||||
|
Check(ctx context.Context) domain.RegistryStatus
|
||||||
|
|
||||||
|
// ListRepositories returns all repositories in the registry.
|
||||||
|
ListRepositories(ctx context.Context) ([]string, error)
|
||||||
|
|
||||||
|
// ListProjectRepositories returns all repositories for a specific project.
|
||||||
|
// This includes both the main repo and sub-repos like cache.
|
||||||
|
ListProjectRepositories(ctx context.Context, projectID string) ([]string, error)
|
||||||
|
|
||||||
|
// DeleteRepository deletes all tags and manifests for a repository.
|
||||||
|
DeleteRepository(ctx context.Context, repo string) error
|
||||||
|
|
||||||
|
// DeleteProjectRepositories deletes all repositories for a project.
|
||||||
|
// This should be called during project teardown to reclaim storage.
|
||||||
|
DeleteProjectRepositories(ctx context.Context, projectID string) error
|
||||||
|
}
|
||||||
@ -34,6 +34,7 @@ type ProjectInfraService struct {
|
|||||||
credentialStore port.CredentialStore
|
credentialStore port.CredentialStore
|
||||||
dbProvisioner port.DatabaseProvisioner
|
dbProvisioner port.DatabaseProvisioner
|
||||||
cacheProvisioner port.CacheProvisioner
|
cacheProvisioner port.CacheProvisioner
|
||||||
|
registryProvider port.RegistryProvider
|
||||||
|
|
||||||
// Config
|
// Config
|
||||||
defaultGitOwner string
|
defaultGitOwner string
|
||||||
@ -100,6 +101,12 @@ func (s *ProjectInfraService) WithCacheProvisioner(cp port.CacheProvisioner) *Pr
|
|||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithRegistryProvider sets the container registry provider for image cleanup.
|
||||||
|
func (s *ProjectInfraService) WithRegistryProvider(rp port.RegistryProvider) *ProjectInfraService {
|
||||||
|
s.registryProvider = rp
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
|
||||||
// CreateProjectRequest contains parameters for creating a new project.
|
// CreateProjectRequest contains parameters for creating a new project.
|
||||||
type CreateProjectRequest struct {
|
type CreateProjectRequest struct {
|
||||||
Name string
|
Name string
|
||||||
|
|||||||
@ -712,20 +712,27 @@ func (s *ProjectInfraService) DeleteProject(ctx context.Context, projectID strin
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// 4. Delete all DNS records for project domains
|
// 4. Delete container images from registry
|
||||||
|
if s.registryProvider != nil {
|
||||||
|
if err := s.registryProvider.DeleteProjectRepositories(ctx, projectID); err != nil {
|
||||||
|
log.Warn("failed to delete project registry images", logging.FieldError, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 5. Delete all DNS records for project domains
|
||||||
s.deleteDNSRecords(ctx, status)
|
s.deleteDNSRecords(ctx, status)
|
||||||
|
|
||||||
// 5. Delete all project_domains entries (CASCADE should handle this, but be explicit)
|
// 6. Delete all project_domains entries (CASCADE should handle this, but be explicit)
|
||||||
if s.domainRepo != nil {
|
if s.domainRepo != nil {
|
||||||
if err := s.domainRepo.DeleteByProject(ctx, projectID); err != nil {
|
if err := s.domainRepo.DeleteByProject(ctx, projectID); err != nil {
|
||||||
log.Warn("failed to delete project domains", logging.FieldError, err)
|
log.Warn("failed to delete project domains", logging.FieldError, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// 6. Delete git repo (optional - might want to keep it)
|
// 7. Delete git repo (optional - might want to keep it)
|
||||||
// Skipping git repo deletion for safety
|
// Skipping git repo deletion for safety
|
||||||
|
|
||||||
// 7. Delete from database
|
// 8. Delete from database
|
||||||
_, err = s.db.ExecContext(ctx, `DELETE FROM projects WHERE id = $1`, projectID)
|
_, err = s.db.ExecContext(ctx, `DELETE FROM projects WHERE id = $1`, projectID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to delete project from database: %w", err)
|
return fmt.Errorf("failed to delete project from database: %w", err)
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user