From d12e21064ba6f5844915c46309b7478479a73600 Mon Sep 17 00:00:00 2001 From: Vasilii Iakliushin Date: Tue, 22 Jul 2025 11:55:38 +0200 Subject: [PATCH] Workhorse: add context cancellation and goroutine cleanup **Problem** Goroutines in Execute() method were not respecting context cancellation, leading to potential resource leaks and hanging operations. **Solution** * Use cancellable context to signal goroutine termination * Check context.Done() in goroutine loops to prevent blocking --- .../internal/ai_assist/duoworkflow/runner.go | 40 +++++++++++++------ .../ai_assist/duoworkflow/runner_test.go | 4 ++ 2 files changed, 31 insertions(+), 13 deletions(-) diff --git a/workhorse/internal/ai_assist/duoworkflow/runner.go b/workhorse/internal/ai_assist/duoworkflow/runner.go index 3657eddfe74c9c..3cb17e86abc575 100644 --- a/workhorse/internal/ai_assist/duoworkflow/runner.go +++ b/workhorse/internal/ai_assist/duoworkflow/runner.go @@ -32,6 +32,7 @@ type websocketConn interface { type workflowStream interface { Send(*pb.ClientEvent) error Recv() (*pb.Action, error) + CloseSend() error } type runner struct { @@ -44,32 +45,45 @@ type runner struct { } func (r *runner) Execute(ctx context.Context) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + errCh := make(chan error, 2) go func() { for { - if err := r.handleWebSocketMessage(); err != nil { - errCh <- err + select { + case <-ctx.Done(): return + default: + if err := r.handleWebSocketMessage(); err != nil { + errCh <- err + return + } } } }() go func() { for { - action, err := r.wf.Recv() - if err != nil { - if err == io.EOF { - errCh <- nil // Expected error when a workflow ends - } else { - errCh <- fmt.Errorf("duoworkflow: failed to read a gRPC message: %v", err) - } + select { + case <-ctx.Done(): return - } + default: + action, err := r.wf.Recv() + if err != nil { + if err == io.EOF { + errCh <- nil // Expected error when a workflow ends + } else { + errCh <- fmt.Errorf("duoworkflow: failed to read a gRPC message: %v", err) + } + return + } - if err := r.handleAgentAction(ctx, action); err != nil { - errCh <- err - return + if err := r.handleAgentAction(ctx, action); err != nil { + errCh <- err + return + } } } }() diff --git a/workhorse/internal/ai_assist/duoworkflow/runner_test.go b/workhorse/internal/ai_assist/duoworkflow/runner_test.go index 0cc56df006f40c..4b11db4ec3fbec 100644 --- a/workhorse/internal/ai_assist/duoworkflow/runner_test.go +++ b/workhorse/internal/ai_assist/duoworkflow/runner_test.go @@ -84,6 +84,10 @@ func (m *mockWorkflowStream) Recv() (*pb.Action, error) { return action, nil } +func (m *mockWorkflowStream) CloseSend() error { + return nil +} + func TestRunner_Execute(t *testing.T) { tests := []struct { name string -- GitLab