Skip to content

Commit

Permalink
Merge branch 'refactor_pipeline-build-ing' into add-cron-jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
6543 committed Jun 15, 2022
2 parents 3b54d6e + cb46f82 commit 8f49c95
Show file tree
Hide file tree
Showing 14 changed files with 180 additions and 80 deletions.
16 changes: 12 additions & 4 deletions agent/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,17 +81,20 @@ func (r *Runner) Run(ctx context.Context) error {
timeout = time.Duration(minutes) * time.Minute
}

repoName := extractRepositoryName(work.Config) // hack
buildNumber := extractBuildNumber(work.Config) // hack

r.counter.Add(
work.ID,
timeout,
extractRepositoryName(work.Config), // hack
extractBuildNumber(work.Config), // hack
repoName,
buildNumber,
)
defer r.counter.Done(work.ID)

logger := log.With().
Str("repo", extractRepositoryName(work.Config)). // hack
Str("build", extractBuildNumber(work.Config)). // hack
Str("repo", repoName).
Str("build", buildNumber).
Str("id", work.ID).
Logger()

Expand Down Expand Up @@ -308,6 +311,11 @@ func (r *Runner) Run(ctx context.Context) error {
pipeline.WithLogger(defaultLogger),
pipeline.WithTracer(defaultTracer),
pipeline.WithEngine(*r.engine),
pipeline.WithDescription(map[string]string{
"ID": work.ID,
"Repo": repoName,
"Build": buildNumber,
}),
).Run()

state.Finished = time.Now().Unix()
Expand Down
3 changes: 3 additions & 0 deletions cli/exec/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,9 @@ func execWithAxis(c *cli.Context, file, repoPath string, axis matrix.Axis) error
pipeline.WithTracer(pipeline.DefaultTracer),
pipeline.WithLogger(defaultLogger),
pipeline.WithEngine(engine),
pipeline.WithDescription(map[string]string{
"CLI": "exec",
}),
).Run()
}

Expand Down
6 changes: 6 additions & 0 deletions pipeline/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,9 @@ func WithContext(ctx context.Context) Option {
r.ctx = ctx
}
}

func WithDescription(desc map[string]string) Option {
return func(r *Runtime) {
r.Description = desc
}
}
70 changes: 62 additions & 8 deletions pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"strings"
"time"

"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"golang.org/x/sync/errgroup"

Expand Down Expand Up @@ -40,12 +41,15 @@ type Runtime struct {
ctx context.Context
tracer Tracer
logger Logger

Description map[string]string // The runtime descriptors.
}

// New returns a new runtime using the specified runtime
// configuration and runtime engine.
func New(spec *backend.Config, opts ...Option) *Runtime {
r := new(Runtime)
r.Description = map[string]string{}
r.spec = spec
r.ctx = context.Background()
for _, opts := range opts {
Expand All @@ -54,11 +58,33 @@ func New(spec *backend.Config, opts ...Option) *Runtime {
return r
}

func (r *Runtime) MakeLogger() zerolog.Logger {
logCtx := log.With()
for key, val := range r.Description {
logCtx = logCtx.Str(key, val)
}
return logCtx.Logger()
}

// Starts the execution of the pipeline and waits for it to complete
func (r *Runtime) Run() error {
logger := r.MakeLogger()
logger.Debug().Msgf("Executing %d stages, in order of:", len(r.spec.Stages))
for _, stage := range r.spec.Stages {
steps := []string{}
for _, step := range stage.Steps {
steps = append(steps, step.Name)
}

logger.Debug().
Str("Stage", stage.Name).
Str("Steps", strings.Join(steps, ",")).
Msg("stage")
}

defer func() {
if err := r.engine.Destroy(r.ctx, r.spec); err != nil {
log.Error().Err(err).Msg("could not destroy pipeline")
logger.Error().Err(err).Msg("could not destroy engine")
}
}()

Expand Down Expand Up @@ -104,13 +130,17 @@ func (r *Runtime) traceStep(processState *backend.State, err error, step *backen
state.Process = processState // empty
state.Pipeline.Error = r.err

return r.tracer.Trace(state)
if traceErr := r.tracer.Trace(state); traceErr != nil {
return traceErr
}
return err
}

// Executes a set of parallel steps
func (r *Runtime) execAll(steps []*backend.Step) <-chan error {
var g errgroup.Group
done := make(chan error)
logger := r.MakeLogger()

for _, step := range steps {
// required since otherwise the loop variable
Expand All @@ -119,10 +149,21 @@ func (r *Runtime) execAll(steps []*backend.Step) <-chan error {
step := step
g.Go(func() error {
// Case the pipeline was already complete.
logger.Debug().
Str("Step", step.Name).
Msg("Prepare")

switch {
case r.err != nil && !step.OnFailure:
logger.Debug().
Str("Step", step.Name).
Err(r.err).
Msgf("Skipped due to OnFailure=%t", step.OnFailure)
return nil
case r.err == nil && !step.OnSuccess:
logger.Debug().
Str("Step", step.Name).
Msgf("Skipped due to OnSuccess=%t", step.OnSuccess)
return nil
}

Expand All @@ -132,15 +173,26 @@ func (r *Runtime) execAll(steps []*backend.Step) <-chan error {
return err
}

logger.Debug().
Str("Step", step.Name).
Msg("Executing")

processState, err := r.exec(step)

// Return the error after tracing it.
traceErr := r.traceStep(processState, err, step)
if traceErr != nil {
return traceErr
logger.Debug().
Str("Step", step.Name).
Msg("Complete")

// if we got a nil process but an error state
// then we need to log the internal error to the step.
if r.logger != nil && err != nil && processState == nil {
_ = r.logger.Log(step, multipart.New(strings.NewReader(
"Backend engine error while running step: "+err.Error(),
)))
}

return err
// Return the error after tracing it.
return r.traceStep(processState, err, step)
})
}

Expand Down Expand Up @@ -171,8 +223,10 @@ func (r *Runtime) exec(step *backend.Step) (*backend.State, error) {
}

go func() {
logger := r.MakeLogger()

if err := r.logger.Log(step, multipart.New(rc)); err != nil {
log.Error().Err(err).Msg("process logging failed")
logger.Error().Err(err).Msg("process logging failed")
}
_ = rc.Close()
}()
Expand Down
10 changes: 4 additions & 6 deletions server/api/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ func DeleteBuild(c *gin.Context) {
}
}

// PostApproval start pipelines in gated repos
func PostApproval(c *gin.Context) {
var (
_store = store.FromContext(c)
Expand All @@ -230,10 +231,6 @@ func PostApproval(c *gin.Context) {
_ = c.AbortWithError(404, err)
return
}
if build.Status != model.StatusBlocked {
c.String(http.StatusBadRequest, "cannot decline a build with status %s", build.Status)
return
}

newBuild, err := pipeline.Approve(c, _store, build, user, repo)
if err != nil {
Expand All @@ -243,6 +240,7 @@ func PostApproval(c *gin.Context) {
}
}

// PostDecline decline pipelines in gated repos
func PostDecline(c *gin.Context) {
var (
_store = store.FromContext(c)
Expand Down Expand Up @@ -274,7 +272,7 @@ func GetBuildQueue(c *gin.Context) {
c.JSON(200, out)
}

// PostBuild restarts a build
// PostBuild restarts a build optional with altered event, deploy or environment
func PostBuild(c *gin.Context) {
_store := store.FromContext(c)
repo := session.Repo(c)
Expand Down Expand Up @@ -331,7 +329,7 @@ func PostBuild(c *gin.Context) {
}
}

newBuild, err := pipeline.ReStart(c, _store, build, user, repo, envs)
newBuild, err := pipeline.Restart(c, _store, build, user, repo, envs)
if err != nil {
handlePipelineErr(c, err)
} else {
Expand Down
2 changes: 1 addition & 1 deletion server/api/hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func BlockTilQueueHasRunningItem(c *gin.Context) {
c.Status(http.StatusOK)
}

// Start a pipeline triggered by a forges post webhook
// PostHook start a pipeline triggered by a forges post webhook
func PostHook(c *gin.Context) {
_store := store.FromContext(c)

Expand Down
13 changes: 9 additions & 4 deletions server/pipeline/approve.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,20 @@ import (
"fmt"

"github.com/rs/zerolog/log"

"github.com/woodpecker-ci/woodpecker/server/model"
"github.com/woodpecker-ci/woodpecker/server/remote"
"github.com/woodpecker-ci/woodpecker/server/shared"
"github.com/woodpecker-ci/woodpecker/server/store"
)

func Approve(ctx context.Context, store store.Store,
build *model.Build, user *model.User, repo *model.Repo,
) (*model.Build, error) {
// Approve update the status to pending for blocked build because of a gated repo
// and start them afterwards
func Approve(ctx context.Context, store store.Store, build *model.Build, user *model.User, repo *model.Repo) (*model.Build, error) {
if build.Status != model.StatusBlocked {
return nil, ErrBadRequest{Msg: fmt.Sprintf("cannot decline a build with status %s", build.Status)}
}

// fetch the build file from the database
configs, err := store.ConfigsForBuild(build.ID)
if err != nil {
Expand All @@ -52,7 +57,7 @@ func Approve(ctx context.Context, store store.Store,
return nil, err
}

build, err = Start(ctx, store, build, user, repo, buildItems)
build, err = start(ctx, store, build, user, repo, buildItems)
if err != nil {
msg := fmt.Sprintf("failure to start build for %s: %v", repo.FullName, err)
log.Error().Err(err).Msg(msg)
Expand Down
56 changes: 56 additions & 0 deletions server/pipeline/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright 2022 Woodpecker Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package pipeline

import (
"crypto/sha256"
"fmt"

"github.com/woodpecker-ci/woodpecker/server/model"
"github.com/woodpecker-ci/woodpecker/server/remote"
"github.com/woodpecker-ci/woodpecker/server/shared"
"github.com/woodpecker-ci/woodpecker/server/store"
)

func findOrPersistPipelineConfig(store store.Store, build *model.Build, remoteYamlConfig *remote.FileMeta) (*model.Config, error) {
sha := fmt.Sprintf("%x", sha256.Sum256(remoteYamlConfig.Data))
conf, err := store.ConfigFindIdentical(build.RepoID, sha)
if err != nil {
conf = &model.Config{
RepoID: build.RepoID,
Data: remoteYamlConfig.Data,
Hash: sha,
Name: shared.SanitizePath(remoteYamlConfig.Name),
}
err = store.ConfigCreate(conf)
if err != nil {
// retry in case we receive two hooks at the same time
conf, err = store.ConfigFindIdentical(build.RepoID, sha)
if err != nil {
return nil, err
}
}
}

buildConfig := &model.BuildConfig{
ConfigID: conf.ID,
BuildID: build.ID,
}
if err := store.BuildConfigCreate(buildConfig); err != nil {
return nil, err
}

return conf, nil
}
3 changes: 2 additions & 1 deletion server/pipeline/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/woodpecker-ci/woodpecker/server/store"
)

// Create a new build and start it
func Create(ctx context.Context, _store store.Store, repo *model.Repo, build *model.Build) (*model.Build, error) {
repoUser, err := _store.GetUser(repo.UserID)
if err != nil {
Expand Down Expand Up @@ -123,7 +124,7 @@ func Create(ctx context.Context, _store store.Store, repo *model.Repo, build *mo
return build, nil
}

build, err = Start(ctx, _store, build, repoUser, repo, buildItems)
build, err = start(ctx, _store, build, repoUser, repo, buildItems)
if err != nil {
msg := fmt.Sprintf("failure to start build for %s", repo.FullName)
log.Error().Err(err).Msg(msg)
Expand Down
1 change: 1 addition & 0 deletions server/pipeline/decline.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/woodpecker-ci/woodpecker/server/store"
)

// Decline update the status to declined for blocked build because of a gated repo
func Decline(ctx context.Context, store store.Store, build *model.Build, user *model.User, repo *model.Repo) (*model.Build, error) {
if build.Status != model.StatusBlocked {
return nil, fmt.Errorf("cannot decline a build with status %s", build.Status)
Expand Down
Loading

0 comments on commit 8f49c95

Please sign in to comment.