Skip to content

Commit

Permalink
plex waits for bacalhau jobs to complete for up to 60 min (#633)
Browse files Browse the repository at this point in the history
Co-authored-by: Niklas Rindtorff <NiklasTR@users.noreply.github.com>
  • Loading branch information
acashmoney and NiklasTR committed Sep 6, 2023
1 parent ba2bcfa commit e6cffcd
Show file tree
Hide file tree
Showing 8 changed files with 43 additions and 20 deletions.
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -322,3 +322,8 @@ gateway/gorm.db
*~
*.swp
*.swo

# Ignore tool specific files
*zinc*
*AF-*
tools/tmp*
2 changes: 1 addition & 1 deletion cmd/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ var initCmd = &cobra.Command{
}

if autoRun {
_, _, err := PlexRun(cid, outputDir, verbose, showAnimation, concurrency, *annotationsForAutoRun)
_, _, err := PlexRun(cid, outputDir, verbose, showAnimation, maxTime, concurrency, *annotationsForAutoRun)
if err != nil {
fmt.Println("Error:", err)
os.Exit(1)
Expand Down
6 changes: 3 additions & 3 deletions cmd/resume.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,18 @@ var resumeCmd = &cobra.Command{
dry := true
upgradePlexVersion(dry)

_, err := Resume(ioJsonPath, outputDir, verbose, showAnimation, retry, concurrency, *annotationsForResume)
_, err := Resume(ioJsonPath, outputDir, verbose, showAnimation, retry, maxTime, concurrency, *annotationsForResume)
if err != nil {
fmt.Println("Error:", err)
}
},
}

func Resume(ioJsonFilePath, outputDir string, verbose, showAnimation, retry bool, concurrency int, annotations []string) (completedIoJsonCid string, err error) {
func Resume(ioJsonFilePath, outputDir string, verbose, showAnimation, retry bool, maxTime, concurrency int, annotations []string) (completedIoJsonCid string, err error) {
fmt.Println("Continuing to process IO JSON file at: ", ioJsonPath)
fmt.Println("Processing IO Entries")
workDirPath := filepath.Dir(ioJsonFilePath)
ipwl.ProcessIOList(workDirPath, ioJsonPath, retry, verbose, showAnimation, concurrency, annotations)
ipwl.ProcessIOList(workDirPath, ioJsonPath, retry, verbose, showAnimation, maxTime, concurrency, annotations)
fmt.Printf("Finished processing, results written to %s\n", ioJsonPath)
completedIoJsonCid, err = ipfs.PinFile(ioJsonPath)
if err != nil {
Expand Down
13 changes: 10 additions & 3 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ var (
outputDir string
verbose bool
showAnimation bool
maxTime int
concurrency int
annotations *[]string
)
Expand All @@ -29,15 +30,15 @@ var runCmd = &cobra.Command{
dry := true
upgradePlexVersion(dry)

_, _, err := PlexRun(ioJsonCid, outputDir, verbose, showAnimation, concurrency, *annotations)
_, _, err := PlexRun(ioJsonCid, outputDir, verbose, showAnimation, maxTime, concurrency, *annotations)
if err != nil {
fmt.Println("Error:", err)
os.Exit(1)
}
},
}

func PlexRun(ioJsonCid, outputDir string, verbose, showAnimation bool, concurrency int, annotations []string) (completedIoJsonCid, ioJsonPath string, err error) {
func PlexRun(ioJsonCid, outputDir string, verbose, showAnimation bool, maxTime, concurrency int, annotations []string) (completedIoJsonCid, ioJsonPath string, err error) {
// Create plex working directory
id := uuid.New()
var cwd string
Expand Down Expand Up @@ -77,9 +78,14 @@ func PlexRun(ioJsonCid, outputDir string, verbose, showAnimation bool, concurren
annotations = append(annotations, fmt.Sprintf("userId=%s", userID))
}

if maxTime > 60 {
fmt.Println("Error: maxTime cannot exceed 60 minutes")
os.Exit(1)
}

retry := false
fmt.Println("Processing IO Entries")
ipwl.ProcessIOList(workDirPath, ioJsonPath, retry, verbose, showAnimation, concurrency, annotations)
ipwl.ProcessIOList(workDirPath, ioJsonPath, retry, verbose, showAnimation, maxTime, concurrency, annotations)
fmt.Printf("Finished processing, results written to %s\n", ioJsonPath)
completedIoJsonCid, err = ipfs.PinFile(ioJsonPath)
if err != nil {
Expand All @@ -96,6 +102,7 @@ func init() {
runCmd.Flags().StringVarP(&outputDir, "outputDir", "o", "", "Output directory")
runCmd.Flags().BoolVarP(&verbose, "verbose", "v", false, "Enable verbose output")
runCmd.Flags().BoolVarP(&showAnimation, "showAnimation", "", true, "Show job processing animation")
runCmd.Flags().IntVarP(&maxTime, "maxTime", "m", 60, "Maximum time (min) to run a job")
runCmd.Flags().IntVarP(&concurrency, "concurrency", "c", 1, "Number of concurrent operations")
annotations = runCmd.Flags().StringArrayP("annotations", "a", []string{}, "Annotations to add to Bacalhau job")

Expand Down
2 changes: 1 addition & 1 deletion cmd/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ var upgradeCmd = &cobra.Command{
}

const (
CurrentPlexVersion = "v0.10.1"
CurrentPlexVersion = "v0.10.2"
ReleaseURL = "https://api.github.com/repos/labdao/plex/releases/latest"
ToolsURL = "https://api.github.com/repos/labdao/plex/contents/tools?ref=main"
)
Expand Down
12 changes: 9 additions & 3 deletions internal/bacalhau/bacalhau.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,12 @@ func SubmitBacalhauJob(job *model.Job) (submittedJob *model.Job, err error) {
return submittedJob, err
}

func GetBacalhauJobResults(submittedJob *model.Job, showAnimation bool) (results []model.PublishedResult, err error) {
func GetBacalhauJobResults(submittedJob *model.Job, showAnimation bool, maxTime int) (results []model.PublishedResult, err error) {
client := CreateBacalhauClient()
maxTrys := 360 // 30 minutes divided by 5 seconds is 360 iterations

sleepConstant := 2
maxTrys := maxTime * 60 / sleepConstant

animation := []string{"\U0001F331", "_", "_", "_", "_"}
fmt.Println("Job running...")

Expand All @@ -90,6 +93,9 @@ func GetBacalhauJobResults(submittedJob *model.Job, showAnimation bool) (results
if err != nil {
return results, err
}
if i == maxTrys-1 {
return results, fmt.Errorf("bacalhau job did not finish within the expected time (~%d min); please check the job status manually with `bacalhau describe %s`", maxTime, submittedJob.Metadata.ID)
}
if updatedJob.State.State == model.JobStateCancelled {
return results, fmt.Errorf("bacalhau cancelled job; please run `bacalhau describe %s` for more details", submittedJob.Metadata.ID)
} else if updatedJob.State.State == model.JobStateError {
Expand All @@ -111,7 +117,7 @@ func GetBacalhauJobResults(submittedJob *model.Job, showAnimation bool) (results
fmt.Printf("////%s////\r", strings.Join(animation, ""))
animation[saplingIndex] = "_"
}
time.Sleep(2 * time.Second)
time.Sleep(time.Duration(sleepConstant) * time.Second)
}
return results, err
}
Expand Down
21 changes: 13 additions & 8 deletions internal/ipwl/ipwl.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (

var errOutputPathEmpty = errors.New("output file path is empty, still waiting")

func ProcessIOList(jobDir, ioJsonPath string, retry, verbose, showAnimation bool, maxConcurrency int, annotations []string) {
func ProcessIOList(jobDir, ioJsonPath string, retry, verbose, showAnimation bool, maxTime, maxConcurrency int, annotations []string) {
// Use a buffered channel as a semaphore to limit the number of concurrent tasks
semaphore := make(chan struct{}, maxConcurrency)

Expand Down Expand Up @@ -58,7 +58,7 @@ func ProcessIOList(jobDir, ioJsonPath string, retry, verbose, showAnimation bool
fmt.Printf("Starting to process IO entry %d \n", index)

// add retry and resume check
err := processIOTask(entry, index, jobDir, ioJsonPath, retry, verbose, showAnimation, annotations, &fileMutex)
err := processIOTask(entry, index, maxTime, jobDir, ioJsonPath, retry, verbose, showAnimation, annotations, &fileMutex)
if errors.Is(err, errOutputPathEmpty) {
fmt.Printf("Waiting to process IO entry %d \n", index)
} else if err != nil {
Expand All @@ -81,7 +81,7 @@ func ProcessIOList(jobDir, ioJsonPath string, retry, verbose, showAnimation bool
}
}

func processIOTask(ioEntry IO, index int, jobDir, ioJsonPath string, retry, verbose, showAnimation bool, annotations []string, fileMutex *sync.Mutex) error {
func processIOTask(ioEntry IO, index, maxTime int, jobDir, ioJsonPath string, retry, verbose, showAnimation bool, annotations []string, fileMutex *sync.Mutex) error {
fileMutex.Lock()
ioGraph, err := ReadIOList(ioJsonPath)
fileMutex.Unlock()
Expand Down Expand Up @@ -181,7 +181,7 @@ func processIOTask(ioEntry IO, index int, jobDir, ioJsonPath string, retry, verb
if verbose {
fmt.Println("Getting Bacalhau job")
}
results, err := bacalhau.GetBacalhauJobResults(submittedJob, showAnimation)
results, err := bacalhau.GetBacalhauJobResults(submittedJob, showAnimation, maxTime)
if err != nil {
updateIOWithError(ioJsonPath, index, err, fileMutex)
return fmt.Errorf("error getting Bacalhau job results: %w", err)
Expand All @@ -201,6 +201,7 @@ func processIOTask(ioEntry IO, index int, jobDir, ioJsonPath string, retry, verb
fmt.Println("Cleaning Bacalhau job")
}
err = cleanBacalhauOutputDir(outputsDirPath, verbose)

if err != nil {
updateIOWithError(ioJsonPath, index, err, fileMutex)
return fmt.Errorf("error cleaning Bacalhau output directory: %w", err)
Expand Down Expand Up @@ -261,26 +262,30 @@ func cleanBacalhauOutputDir(outputsDirPath string, verbose bool) error {
// Move files from /outputs to outputsDirPath
files, err := ioutil.ReadDir(bacalOutputsDirPath)
if err != nil {
return err
return fmt.Errorf("error reading Bacalhau output directory: %w", err)
}

for _, file := range files {
src := filepath.Join(bacalOutputsDirPath, file.Name())
dst := filepath.Join(outputsDirPath, file.Name())

if verbose {
fmt.Printf("Moving %s to %s", src, dst)
fmt.Printf("Moving %s to %s\n", src, dst)
}
// fmt.Printf("Starting to move file from %s to %s\n", src, dst)
if err := os.Rename(src, dst); err != nil {
return err
return fmt.Errorf("error moving file from %s to %s: %w", src, dst, err)
}
// fmt.Printf("Finished moving file from %s to %s\n", src, dst)
}

// remove empty outputs folder now that files have been moved
// fmt.Printf("Starting to remove directory %s\n", bacalOutputsDirPath)
err = os.Remove(bacalOutputsDirPath)
if err != nil {
fmt.Println(err)
return fmt.Errorf("error removing Bacalhau output directory: %w", err)
}
// fmt.Printf("Finished removing directory %s\n", bacalOutputsDirPath)

return nil
}
Expand Down
2 changes: 1 addition & 1 deletion python/src/plex/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class CoreTools(Enum):
BATCH_DLKCAT = "QmQTjvP2utNb1JTtUHeQ8mQPvNkCTg5VRc4LVdptWkUcJ7"
OPENBABEL_PDB_TO_SDF = "QmbbDSDZJp8G7EFaNKsT7Qe7S9iaaemZmyvS6XgZpdR5e3"
OPENBABEL_RMSD = "QmUxrKgAs5r42xVki4vtMskJa1Z7WA64wURkwywPMch7dA"

COLABDESIGN = "QmR25eVUknf9TpkPKwzDJuEW6jjh1Q6rv7dgKACmR4PxLy"

class PlexError(Exception):
def __init__(self, message):
Expand Down

0 comments on commit e6cffcd

Please sign in to comment.