From 6a7134ff8989d101a9f7d37cdb543acd67d54a51 Mon Sep 17 00:00:00 2001 From: John Hoffman Date: Tue, 26 Nov 2024 11:38:31 -0800 Subject: [PATCH 1/2] cherry-picks wasil's condor runner out of the old branch Wasil's condor runner should do the trick, but needs updating for the 0.2.4 revision. Starting this MR to get this in asap. --- cmd/runners/condor_runner.go | 388 +++++++++++++++++++++++++++++++++++ 1 file changed, 388 insertions(+) create mode 100644 cmd/runners/condor_runner.go diff --git a/cmd/runners/condor_runner.go b/cmd/runners/condor_runner.go new file mode 100644 index 00000000..a581741d --- /dev/null +++ b/cmd/runners/condor_runner.go @@ -0,0 +1,388 @@ +package agents + +import ( + "fmt" + "log" + "os/exec" + "strings" + + "bytes" + "os" + "path/filepath" + "strconv" + + // log "github.com/sirupsen/logrus" + // "gitlab.com/hoffman-lab/core/pkg/config" + . "gitlab.com/hoffman-lab/core/pkg/core" + "google.golang.org/protobuf/encoding/protojson" + "gopkg.in/yaml.v3" +) + +type CondorRunner struct { + *Runner +} + +// Getting closer, but this still is more boilerplate than I would like +func NewCondorRunner(bb Blackboard, spec *AgentSpec) error { + r, err := StartRunner(bb, spec) + if err != nil { + return err + } + + l := &CondorRunner{ + Runner: r, + } + l.RunAgent = l.RunCondorAgent + + return nil +} + +// func (r *CondorRunner) RunCondorAgent(spec *AgentSpec) error { + +// //data, err := json.Marshal(spec) + +// data, err := protojson.Marshal(spec) +// if err != nil { +// return err +// } + +// bb_addr := config.Globals.BlackboardHTTPAddress +// cmd_split := strings.Split(spec.Entrypoint, " ") + +// json_spec := string(data) + +// cmd_split = append(cmd_split, "--blackboard", bb_addr, "--spec", json_spec) + +// r.Log(Log_Info, "dispatching agent with command: "+strings.Join(cmd_split, " ")) + +// cmd := exec.Command(cmd_split[0], cmd_split[1:]...) +// data, err = cmd.CombinedOutput() +// if err != nil { +// r.Log(Log_Error, fmt.Sprintf("agent %s exited with error: %s", spec.Name, err)) +// log.Info(err, string(data)) +// } +// return nil +// } + +func (r *CondorRunner) RunCondorAgent(spec *AgentSpec) error { + + data, err := protojson.Marshal(spec) + if err != nil { + return err + } + + spec_attributes := spec.Attributes + var bb_addr string + readCondorMap := map[string]string{} + condor_yaml, attribute_exists := spec_attributes["condor_yaml"] + if attribute_exists { + fmt.Println("Reading: " + condor_yaml) + + readCondorMap, _ = r.LoadConfig(condor_yaml) + host_addr, config_host_exists := readCondorMap["host"] + if config_host_exists { + bb_addr = host_addr + } + + } + fmt.Println("::::READ CONDOR MAP : ") + + for key, value := range readCondorMap { + fmt.Println(":::: KEY : " + key) + fmt.Println(":::: VALUE : " + value) + } + + host_addr, spec_host_exists := spec_attributes["host"] + if spec_host_exists { + bb_addr = host_addr + } + // bb_addr := config.Globals.BlackboardHTTPAddress + cmd_split := strings.Split(spec.Entrypoint, " ") + + // cmd_split = []string{"condor_submit", spec.Entrypoint} + + // something about number of jobs + + //json_spec := strings.Replace(string(data), " ", "", -1) + json_spec := string(data) + + // sm_core_go_dependency := "#!/usr/bin/env bash\nsource /scratch/wasil/env/etc/profile.d/conda.sh\nconda activate smcore" + sm_core_go_dependency := "#!/usr/bin/env bash" + fmt.Println(sm_core_go_dependency) + cmd_split = append(cmd_split, []string{"--blackboard", bb_addr, "--spec", fmt.Sprintf("'%s'", json_spec)}...) + + r.Log(Log_Info, "dispatching agent with command: "+strings.Join(cmd_split, " ")) + + condor_sh_file := r.PrepCondorSubmit(spec, readCondorMap, sm_core_go_dependency+"\n"+strings.Join(cmd_split, " ")) + + submit_cmd := []string{"condor_submit", condor_sh_file} // potentially add some other stuff like number of parallel jobs + // fmt.Println(submit_cmd) + + cmd := exec.Command(submit_cmd[0], submit_cmd[1:]...) + data, err = cmd.Output() + fmt.Println(":::: COMMAND OUTPUT:") + fmt.Println(data) + if err != nil { + r.Log(Log_Error, fmt.Sprintf("agent %s exited with error: %s", spec.Name, err)) + } + return nil +} + +// executable : /radraid/apps/personal/gabriel/ga_test/ga_train/working_dir/watcher/01C5/kidneys_lesion_cnn_weight_12_014/0.sh +// output : /radraid/apps/personal/gabriel/ga_test/log/20230518_135403/watcher/01C5_kidneys_lesion_cnn_weight_12_014/log/$(cluster).$(process).out +// error : /radraid/apps/personal/gabriel/ga_test/log/20230518_135403/watcher/01C5_kidneys_lesion_cnn_weight_12_014/log/$(cluster).$(process).err +// log : /radraid/apps/personal/gabriel/ga_test/log/20230518_135403/watcher/01C5_kidneys_lesion_cnn_weight_12_014/log/$(cluster).$(process).log + +func (r *CondorRunner) PrepCondorSubmit(spec *AgentSpec, readCondorMap map[string]string, cmd string) string { + + // condor_attr := spec.Attributes["condor"] + name := spec.Name + spec_attributes := spec.Attributes + // condor_attr := spec.Condor + + var defaultCondorMap = map[string]string{ + "executable": "", + "output": "", + "error": "", + "log": "", + // "requirements": "((Machine == \"REDLRADADM14959.ad.medctr.ucla.edu\"))", + "requirements": "((Machine == \"REDLRADADM14959.ad.medctr.ucla.edu\"))", + // # rank : ((machine == "REDLRADADM23710.ad.medctr.ucla.edu")*5) + ((machine == "REDWRADMMC23199.ad.medctr.ucla.edu")*5) + ((machine == "REDLRADADM23589.ad.medctr.ucla.edu")*4) + ((machine == "REDLRADADM14958.ad.medctr.ucla.edu")*3) + ((machine == "REDLRADADM14959.ad.medctr.ucla.edu")*3) + ((machine == "REDLRADADM23620.ad.medctr.ucla.edu")*2) + ((machine == "redlradbei05920.ad.medctr.ucla.edu")*2) + ((machine == "REDLRADADM23621.ad.medctr.ucla.edu")*2) + // "universe" : "docker", + // "docker_image": "registry.cvib.ucla.edu/sm_develop:ga_gmc" + // "docker_pull_policy": "always" + "erase_output_and_error_on_restart": "False", + // "initialdir" : "/radraid/apps/personal/gabriel/ga_test/ga_train/working_dir/watcher/01C5/kidneys_lesion_cnn_weight_12_014" + "max_retries": "3", + "notification": "Error", + "on_exit_remove": "(ExitBySignal == False) && (ExitCode == 0 || ExitCode == 10)", + "priority": "20", + "request_cpus": "1", + "request_gpus": "0", + // requestmemory : + // "should_transfer_files": "YES", + "stream_error": "True", + "stream_output": "True", + "timeout": "None", + // "transfer_executable": "True", + "when_to_transfer_output": "ON_EXIT", + "docker_network_type": "host", + } + + condorMap := defaultCondorMap + + condor_attributes := map[string]string{} + + // condor_yaml, attribute_exists := spec_attributes["condor_yaml"] + // if attribute_exists { + // fmt.Println("Reading: " + condor_yaml) + + // readCondorMap, _ := r.LoadConfig(condor_yaml) + + for k := range readCondorMap { + condor_attributes[k] = readCondorMap[k] + } + + // // should not just replace + // // condor_attributes = readCondorMap + // } + + for k := range spec_attributes { + condor_attributes[k] = spec_attributes[k] + } + + machine, attribute_exists := condor_attributes["machine"] + if attribute_exists { + condorMap["requirements"] = strings.Join([]string{"(Machine == \"", machine, "\")"}, "") + } + + gpu_mem_requirement, attribute_exists := condor_attributes["gpu_mem_requirement"] + if attribute_exists { + if gpumem, _ := strconv.Atoi(gpu_mem_requirement); gpumem > 0 { + fmt.Println("-" + condorMap["requirements"] + "-") + fmt.Println(condorMap["requirements"] == "") + if condorMap["requirements"] == "" { + condorMap["requirements"] = strings.Join([]string{"(GPUMEM >= ", gpu_mem_requirement, ")"}, "") + } else { + condorMap["requirements"] = strings.Join([]string{condorMap["requirements"], "&& (GPUMEM >= ", gpu_mem_requirement, ")"}, "") + } + } + } + + var condor_submit_file string + for k := range condor_attributes { + fmt.Println(k) + + // switch ; string(k) { + switch k { + case "log_dir": + if _, err := os.Stat(condor_attributes[k]); os.IsNotExist(err) { + err := os.Mkdir(condor_attributes[k], 0777) + // TODO: handle error + + fmt.Println(err) + + } + // define + // executable + // output + // error + // log + condor_submit_file = filepath.Join(condor_attributes[k], name+"_condor.sub") + condorMap["executable"] = filepath.Join(condor_attributes[k], name+"_run.sh") + condorMap["output"] = filepath.Join(condor_attributes[k], name+"_$(cluster).$(process).out") + condorMap["error"] = filepath.Join(condor_attributes[k], name+"_$(cluster).$(process).err") + condorMap["log"] = filepath.Join(condor_attributes[k], name+"_$(cluster).$(process).log") + case "timeout": + condorMap["timeout"] = condor_attributes[k] + case "requestmemory": + condorMap["requestmemory"] = condor_attributes[k] + case "request_cpus": + condorMap["request_cpus"] = condor_attributes[k] + case "request_gpus": + condorMap["request_gpus"] = condor_attributes[k] + case "docker_image": + condorMap["docker_image"] = condor_attributes[k] + case "condor_exec_dir": + condorMap["remote_initialdir"] = condor_attributes[k] + + // NOTE: must be "machine" OR "gpu_mem_requirement" not both + // case "machine": + // condorMap["requirements"] = strings.Join([]string{"(Machine == \"", condor_attr[k], "\")"}, "") + // case "gpu_mem_requirement": + // if gpumem, _ := strconv.Atoi(condor_attr[k]); gpumem > 0 { + // fmt.Println("-" + condor_attr["requirements"] + "-") + // fmt.Println(condor_attr["requirements"] == "") + // if condor_attr["requirements"] == "" { + // condorMap["requirements"] = strings.Join([]string{"(GPUMEM >= ", condor_attr[k], ")"}, "") + // } else { + // condorMap["requirements"] = strings.Join([]string{condor_attr["requirements"], "&& (GPUMEM >= ", condor_attr[k], ")"}, "") + // } + // } + // condorMap["requirements"] = append(condor_attr["requirements"], " && (GPUMEM >= ", condor_attr[k], ")"...) + } + + } + + // python hello_agent.py --blackboard localhost:8080 --spec '{"name":"HelloAgent"} + + // && (GPUMEM >= 46000) + + // "queue" + + fmt.Println("Making " + condor_submit_file) + b := new(bytes.Buffer) + for key, value := range condorMap { + fmt.Fprintf(b, "%s=%s\n", key, value) + } + fmt.Fprintf(b, "queue") + fmt.Println("Finished " + condor_submit_file) + + r.Log(Log_Info, b.String()) + + r.CreateFile(condor_submit_file, b.String()) + r.CreateFile(condorMap["executable"], cmd) + return condor_submit_file +} + +func (r *CondorRunner) CreateFile(executable, cmd string) error { + + fmt.Println(executable) + f, err := os.Create(executable) + + if err != nil { + fmt.Println("error in executable creation") + fmt.Println(err) + return err + // r.Log.Fatal(err) + } + + defer f.Close() + + _, err2 := f.WriteString(cmd) + + if err2 != nil { + return err + // r.Log.Fatal(err2) + } + + err3 := f.Chmod(0755) + + if err3 != nil { + return err + // r.Log.Fatal(err2) + } + + fmt.Println("Created executable with command inside.") + + return err +} + +// var data = ` +// a: Easy! +// b: +// c: 2 +// d: [3, 4] +// ` + +// func (r *CondorRunner) LoadConfig(filepath string) (map[string]interface{}, error) { +func (r *CondorRunner) LoadConfig(filepath string) (map[string]string, error) { + fmt.Println("looking to load condor config file: " + filepath) + // condorMap := map[string]string{} + // f, err := os.Open(filepath) + // if err != nil { + // return condorMap, err + // } + + // err = yaml.NewDecoder(f).Decode(condorMap) + // if err != nil { + // fmt.Println("ERROR IN LOADING CONDOR CONFIG") + // fmt.Println(err) + + // return condorMap, err + // } + + data, err := os.ReadFile(filepath) // just pass the file name + if err != nil { + fmt.Print(err) + } + + condorMap := map[string]string{} + m := make(map[interface{}]interface{}) + + err = yaml.Unmarshal([]byte(data), &m) + if err != nil { + log.Fatalf("error: %v", err) + } + fmt.Printf("--- m:\n%v\n\n", m) + + // Make sure we're injecting the correct name into the agent (the + // heading that they're specified under in the yaml + // + // TODO: Use reflection here to add the ability for new "headings" + // to specify agents under in the future. + + // for k := range a.Controllers { + // a.Controllers[k].Name = k + // } + + // for k := range a.Agents { + // a.Agents[k].Name = k + // } + + // for k := range a.Runners { + // a.Runners[k].Name = k + // } + // for key, value := range condorMap { + // fmt.Println(key + " : " + value) + // // fmt.Fprintf(b, "%s=%s\n", key, value) + // } + for key, value := range m { + strKey := fmt.Sprintf("%v", key) + strValue := fmt.Sprintf("%v", value) + + condorMap[strKey] = strValue + } + + return condorMap, err +} -- GitLab From 359d964c43bcfaf05a7168b4ae5b7833c6f23955 Mon Sep 17 00:00:00 2001 From: John Hoffman Date: Tue, 26 Nov 2024 20:13:26 -0800 Subject: [PATCH 2/2] starting update; still need to do some figuring about hwo to parse --- cmd/runners/condor_runner.go | 149 +++++++++++++++++++++++------------ 1 file changed, 98 insertions(+), 51 deletions(-) diff --git a/cmd/runners/condor_runner.go b/cmd/runners/condor_runner.go index a581741d..3b5007c4 100644 --- a/cmd/runners/condor_runner.go +++ b/cmd/runners/condor_runner.go @@ -1,78 +1,128 @@ -package agents +package runners import ( - "fmt" - "log" - "os/exec" - "strings" - "bytes" + "encoding/json" + "fmt" "os" + "os/exec" "path/filepath" "strconv" + "strings" + "time" - // log "github.com/sirupsen/logrus" + log "github.com/sirupsen/logrus" // "gitlab.com/hoffman-lab/core/pkg/config" + client "gitlab.com/hoffman-lab/core/cmd/client-apis/go" + "gitlab.com/hoffman-lab/core/pkg/core" . "gitlab.com/hoffman-lab/core/pkg/core" - "google.golang.org/protobuf/encoding/protojson" "gopkg.in/yaml.v3" ) type CondorRunner struct { - *Runner + *client.BasicAgent + DataRoot string } -// Getting closer, but this still is more boilerplate than I would like -func NewCondorRunner(bb Blackboard, spec *AgentSpec) error { - r, err := StartRunner(bb, spec) - if err != nil { - return err - } +var _ client.ManagedAgent = &CondorRunner{} - l := &CondorRunner{ - Runner: r, - } - l.RunAgent = l.RunCondorAgent +func (r *CondorRunner) Setup() error { + // override the OnNewMessage method + // to handle CreateAgent messages - return nil -} + r.OnNewMessage = func(msg *core.Message) { + switch msg.Contents.(type) { + case *core.Message_Halt: + r.Stop() + case *core.Message_HaltAndCatchFire: + r.Stop() // play nicely...? + case *core.Message_CreateAgent: + fmt.Println("create agent!") -// func (r *CondorRunner) RunCondorAgent(spec *AgentSpec) error { + create_agent := msg.GetCreateAgent() + spec := create_agent.Spec -// //data, err := json.Marshal(spec) + // Message is not a match for the current runner + if create_agent.Runner != r.Name { + return + } -// data, err := protojson.Marshal(spec) -// if err != nil { -// return err -// } + // Every run gets its own execution directory. This way, we avoid + // polluting the current working directory with files an agent might + // create. These could clobber results from other agents and create + // non-determinism. + // + // NOTE: This would be a good candidate for an internal identifier for + // resource management, monitoring, etc. + if spec.Name == "" { + spec.Name = "empty-name" + } + execution_dir := filepath.Join(r.DataRoot, spec.Name+"-"+RandString(8)) -// bb_addr := config.Globals.BlackboardHTTPAddress -// cmd_split := strings.Split(spec.Entrypoint, " ") + err := os.MkdirAll(execution_dir, 0o775) + if err != nil { + log.Errorf("failed to create execution context for agent %s: %v", spec.Name, err) + return + } -// json_spec := string(data) + // Unpack the context into a directory + if create_agent.Spec.Context != nil { + err := client.UnpackContext(execution_dir, create_agent.Spec.GetContext()) + if err != nil { + log.Error("failed to create agent non-nil context: ", err) + r.Log(core.Log_Error, "failed to create agent non-nil context: "+err.Error()) + return + } + } + log.Info("executing local run -> ", randomEmoji(), " -> ", spec.Entrypoint) + } + } + return nil +} -// cmd_split = append(cmd_split, "--blackboard", bb_addr, "--spec", json_spec) +func (r *CondorRunner) Loop() (bool, error) { + // We may have a slight race condition where we stop our execution before the + // core agent goroutine in BasicAgent is fully done executing. I don't think this + // will impact users but could be something to clean up in the future. + select { + case <-time.After(5 * time.Second): + r.Log(core.Log_Info, "condor runner heartbeat") + return true, nil + case <-r.Done(): + return false, nil + } +} + +func NewCondorRunner(addr string) *CondorRunner { + return &CondorRunner{ + BasicAgent: client.GetDefaultAgent(addr), + } +} -// r.Log(Log_Info, "dispatching agent with command: "+strings.Join(cmd_split, " ")) +func (r *CondorRunner) RunCondorJob(spec *AgentSpec) error { + return nil +} -// cmd := exec.Command(cmd_split[0], cmd_split[1:]...) -// data, err = cmd.CombinedOutput() -// if err != nil { -// r.Log(Log_Error, fmt.Sprintf("agent %s exited with error: %s", spec.Name, err)) -// log.Info(err, string(data)) -// } -// return nil -// } +// Would be great if zero value were useful +type CondorConfig struct { + // whatever info condor needs + Host string +} + +type CondorAttributes struct { + Config *CondorConfig `yaml:"condor-config"` +} func (r *CondorRunner) RunCondorAgent(spec *AgentSpec) error { + // data, err := protojson.Marshal(spec) + // if err != nil { + // return err + // } - data, err := protojson.Marshal(spec) - if err != nil { - return err - } + condor_config := &CondorConfig{} + + err := json.NewDecoder(bytes.NewReader(spec.Attributes)).Decode(data) - spec_attributes := spec.Attributes - var bb_addr string readCondorMap := map[string]string{} condor_yaml, attribute_exists := spec_attributes["condor_yaml"] if attribute_exists { @@ -103,7 +153,7 @@ func (r *CondorRunner) RunCondorAgent(spec *AgentSpec) error { // something about number of jobs - //json_spec := strings.Replace(string(data), " ", "", -1) + // json_spec := strings.Replace(string(data), " ", "", -1) json_spec := string(data) // sm_core_go_dependency := "#!/usr/bin/env bash\nsource /scratch/wasil/env/etc/profile.d/conda.sh\nconda activate smcore" @@ -134,13 +184,12 @@ func (r *CondorRunner) RunCondorAgent(spec *AgentSpec) error { // log : /radraid/apps/personal/gabriel/ga_test/log/20230518_135403/watcher/01C5_kidneys_lesion_cnn_weight_12_014/log/$(cluster).$(process).log func (r *CondorRunner) PrepCondorSubmit(spec *AgentSpec, readCondorMap map[string]string, cmd string) string { - // condor_attr := spec.Attributes["condor"] name := spec.Name spec_attributes := spec.Attributes // condor_attr := spec.Condor - var defaultCondorMap = map[string]string{ + defaultCondorMap := map[string]string{ "executable": "", "output": "", "error": "", @@ -286,10 +335,8 @@ func (r *CondorRunner) PrepCondorSubmit(spec *AgentSpec, readCondorMap map[strin } func (r *CondorRunner) CreateFile(executable, cmd string) error { - fmt.Println(executable) f, err := os.Create(executable) - if err != nil { fmt.Println("error in executable creation") fmt.Println(err) -- GitLab