Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
Signed-off-by: Chris Martin <chris@cmartinit.co.uk>
  • Loading branch information
d80tb7 committed Feb 5, 2024
1 parent fdff87d commit 6c42fb9
Showing 1 changed file with 7 additions and 15 deletions.
22 changes: 7 additions & 15 deletions internal/armada/server/submit_to_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/apache/pulsar-client-go/pulsar"
"github.com/gogo/protobuf/types"
"github.com/gogo/status"
"github.com/google/uuid"
pool "github.com/jolestar/go-commons-pool"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -115,6 +114,13 @@ func (srv *PulsarSubmitServer) SubmitJobs(grpcCtx context.Context, req *api.JobS
log.WithError(err).Warn("Error fetching original job ids, deduplication will not occur.")
}

// Check if all jobs can be scheduled.
// This check uses the NodeDb of the new scheduler and
// can check if all jobs in a gang can go onto the same cluster.
if canSchedule, reason := srv.SubmitChecker.CheckApiJobs(apiJobs); !canSchedule {
return nil, status.Errorf(codes.InvalidArgument, "at least one job or gang is unschedulable:\n%s", reason)
}

pulsarJobDetails := make([]*schedulerobjects.PulsarSchedulerJobDetails, 0)

for i, apiJob := range apiJobs {
Expand Down Expand Up @@ -829,20 +835,6 @@ func (srv *PulsarSubmitServer) storeOriginalJobIds(ctx *armadacontext.Context, a
return srv.KVStore.Store(ctx, kvs)
}

// groupJobsByGangId partitions the provided jobs by gang id.
// Jobs with no gang id are treated as gangs of cardinality 1.
func (srv *PulsarSubmitServer) groupJobsByGangId(jobs []*api.Job) map[string][]*api.Job {
jobsByGangId := make(map[string][]*api.Job)
for _, job := range jobs {
gangId, ok := job.Annotations[srv.GangIdAnnotation]
if !ok {
gangId = uuid.NewString()
}
jobsByGangId[gangId] = append(jobsByGangId[gangId], job)
}
return jobsByGangId
}

// resolveQueueAndJobsetForJob returns the queue and jobset for a job.
// If no job can be retrieved then an error is returned.
func (srv *PulsarSubmitServer) resolveQueueAndJobsetForJob(jobId string) (string, string, error) {
Expand Down

0 comments on commit 6c42fb9

Please sign in to comment.