package deployer import ( "context" "fmt" "strconv" "sync" "sync/atomic" networkingv1 "k8s.io/api/networking/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime/schema" ) // mockIngressClient implements IngressClient for testing. type mockIngressClient struct { mu sync.RWMutex ingresses map[string]*networkingv1.Ingress // key: "namespace/name" // For testing error scenarios getErr error createErr error updateErr error deleteErr error // Counters for verifying retry behavior getCalls atomic.Int32 createCalls atomic.Int32 updateCalls atomic.Int32 deleteCalls atomic.Int32 } func newMockIngressClient() *mockIngressClient { return &mockIngressClient{ ingresses: make(map[string]*networkingv1.Ingress), } } func (m *mockIngressClient) key(namespace, name string) string { return namespace + "/" + name } func (m *mockIngressClient) GetIngress(ctx context.Context, namespace, name string) (*networkingv1.Ingress, error) { m.getCalls.Add(1) m.mu.RLock() defer m.mu.RUnlock() if m.getErr != nil { return nil, m.getErr } ing, ok := m.ingresses[m.key(namespace, name)] if !ok { return nil, errors.NewNotFound(schema.GroupResource{Group: "networking.k8s.io", Resource: "ingresses"}, name) } return ing.DeepCopy(), nil } func (m *mockIngressClient) CreateIngress(ctx context.Context, namespace string, ingress *networkingv1.Ingress) (*networkingv1.Ingress, error) { m.createCalls.Add(1) m.mu.Lock() defer m.mu.Unlock() if m.createErr != nil { return nil, m.createErr } key := m.key(namespace, ingress.Name) if _, exists := m.ingresses[key]; exists { return nil, errors.NewAlreadyExists(schema.GroupResource{Group: "networking.k8s.io", Resource: "ingresses"}, ingress.Name) } // Set resource version for conflict detection ingress.ResourceVersion = "1" m.ingresses[key] = ingress.DeepCopy() return ingress.DeepCopy(), nil } func (m *mockIngressClient) UpdateIngress(ctx context.Context, namespace string, ingress *networkingv1.Ingress) (*networkingv1.Ingress, error) { m.updateCalls.Add(1) m.mu.Lock() defer m.mu.Unlock() if m.updateErr != nil { return nil, m.updateErr } key := m.key(namespace, ingress.Name) existing, ok := m.ingresses[key] if !ok { return nil, errors.NewNotFound(schema.GroupResource{Group: "networking.k8s.io", Resource: "ingresses"}, ingress.Name) } // Check resource version for optimistic locking if ingress.ResourceVersion != existing.ResourceVersion { return nil, errors.NewConflict(schema.GroupResource{Group: "networking.k8s.io", Resource: "ingresses"}, ingress.Name, fmt.Errorf("resource version mismatch")) } // Increment resource version rv, _ := strconv.Atoi(existing.ResourceVersion) ingress.ResourceVersion = strconv.Itoa(rv + 1) m.ingresses[key] = ingress.DeepCopy() return ingress.DeepCopy(), nil } func (m *mockIngressClient) DeleteIngress(ctx context.Context, namespace, name string) error { m.deleteCalls.Add(1) m.mu.Lock() defer m.mu.Unlock() if m.deleteErr != nil { return m.deleteErr } delete(m.ingresses, m.key(namespace, name)) return nil } // conflictOnceError returns a conflict error only on the first call, then succeeds. type conflictOnceError struct { mu sync.Mutex called bool callCount int } func (e *conflictOnceError) Error() error { e.mu.Lock() defer e.mu.Unlock() e.callCount++ if !e.called { e.called = true return errors.NewConflict(schema.GroupResource{Group: "networking.k8s.io", Resource: "ingresses"}, "test", fmt.Errorf("simulated conflict")) } return nil } func (e *conflictOnceError) CallCount() int { e.mu.Lock() defer e.mu.Unlock() return e.callCount } // conflictNTimesClient wraps mockIngressClient to return conflict errors N times on update. type conflictNTimesClient struct { *mockIngressClient conflictCount int mu sync.Mutex updateCalls int } func newConflictNTimesClient(n int) *conflictNTimesClient { return &conflictNTimesClient{ mockIngressClient: newMockIngressClient(), conflictCount: n, } } func (c *conflictNTimesClient) UpdateIngress(ctx context.Context, namespace string, ingress *networkingv1.Ingress) (*networkingv1.Ingress, error) { c.mu.Lock() c.updateCalls++ callNum := c.updateCalls c.mu.Unlock() if callNum <= c.conflictCount { return nil, errors.NewConflict(schema.GroupResource{Group: "networking.k8s.io", Resource: "ingresses"}, ingress.Name, fmt.Errorf("simulated conflict %d", callNum)) } return c.mockIngressClient.UpdateIngress(ctx, namespace, ingress) } func (c *conflictNTimesClient) UpdateCallCount() int { c.mu.Lock() defer c.mu.Unlock() return c.updateCalls }