Skip to content

Commit

Permalink
Merge branch 'develop'
Browse files Browse the repository at this point in the history
  • Loading branch information
kupferk committed Jan 28, 2022
2 parents 3033035 + 17aa0c4 commit 20eba69
Show file tree
Hide file tree
Showing 90 changed files with 462 additions and 306 deletions.
7 changes: 6 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
# Version 0.21.0 - 2033-01-26
# Version 0.21.1 - 2022-01-28

* flowexec now returns different exit codes depending on the processing result


# Version 0.21.0 - 2022-01-26

* Fix wrong dependencies in Swagger plugin
* Implement basic schema inference for local CSV files
Expand Down
2 changes: 1 addition & 1 deletion docker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
<parent>
<groupId>com.dimajix.flowman</groupId>
<artifactId>flowman-root</artifactId>
<version>0.21.0</version>
<version>0.21.1</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
13 changes: 13 additions & 0 deletions docs/cli/flowexec.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,19 @@ or for inspecting individual entities.
* `--spark-name <application_name>` Sets the Spark application name


## Exit Codes

`flowexec` provides different exit codes depending on the result of the execution

| exit code | description |
|-----------|--------------------------------------------------------------------------------|
| 0 | Everything worked out nicely, no error. This includes skipped |
| 2 | There were individual errors, but the run was successful (Success with Errors) |
| 3 | There were execution errors |
| 4 | The command line was not correct |
| 5 | An uncaught exception occurred |


## Project Commands
The most important command group is for executing a specific lifecycle or a individual phase for the whole project.
```shell script
Expand Down
2 changes: 1 addition & 1 deletion flowman-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<parent>
<groupId>com.dimajix.flowman</groupId>
<artifactId>flowman-root</artifactId>
<version>0.21.0</version>
<version>0.21.1</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion flowman-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<parent>
<groupId>com.dimajix.flowman</groupId>
<artifactId>flowman-root</artifactId>
<version>0.21.0</version>
<version>0.21.1</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion flowman-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<parent>
<groupId>com.dimajix.flowman</groupId>
<artifactId>flowman-root</artifactId>
<version>0.21.0</version>
<version>0.21.1</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,23 +53,9 @@ class AssertionRunner(
execution.monitorAssertion(assertion) { execution =>
if (!error || keepGoing) {
val result = executeAssertion(execution, assertion, dryRun)
val success = result.success
error |= !success
error |= !result.success

val description = result.description.getOrElse(result.name)
if (result.exception.nonEmpty) {
val ex = result.exception.get
logger.error(s" ✘ exception: $description: ${ex.getMessage}")
}
else if (!success) {
logger.error(red(s" ✘ failed: $description"))
result.children.filter(_.failure).foreach { result =>
logger.error(red(s" ✘ failed ${result.name}"))
}
}
else {
logger.info(green(s" ✓ passed: $description"))
}
logResult(result)

result
}
Expand All @@ -83,6 +69,27 @@ class AssertionRunner(
}
}

private def logResult(result:AssertionResult) : Unit = {
val description = result.description.getOrElse(result.name)
result.exception match {
case Some(ex) =>
logger.error(s" ✘ exception $description: ${ex.getMessage}")
case None if (!result.success) =>
logger.error(red(s" ✘ failed: $description"))
// If an error occured, walk through the children to find a possible exception or failure to display
result.children.filter(_.failure).foreach { result =>
result.exception match {
case Some(ex) =>
logger.error(red(s" ✘ exception ${result.name}: ${ex.getMessage}"))
case None =>
logger.error(red(s" ✘ failed ${result.name}"))
}
}
case None =>
logger.info(green(s" ✓ passed: $description"))
}
}

private def executeAssertion(execution:Execution, assertion: Assertion, dryRun:Boolean) : AssertionResult = {
val startTime = Instant.now()
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ private[execution] sealed class RunnerImpl {
case Status.SUCCESS =>
logger.info(green(s"Successfully finished phase '$phase' for target '${target.identifier}' in ${fmt(duration)}"))
case Status.SUCCESS_WITH_ERRORS =>
logger.info(yellow(s"Successfully finished phase '$phase' for target '${target.identifier}' with errors in ${fmt(duration)}"))
logger.warn(yellow(s"Successfully finished phase '$phase' for target '${target.identifier}' with errors in ${fmt(duration)}"))
case Status.SKIPPED =>
logger.info(green(s"Skipped phase '$phase' for target '${target.identifier}'"))
case Status.FAILED if result.exception.nonEmpty =>
Expand Down Expand Up @@ -116,8 +116,9 @@ private[execution] sealed class RunnerImpl {
result
}

private val lineSize = 109
private val separator = boldWhite(StringUtils.repeat('-', lineSize))
protected val lineSize = 109
protected val separator = boldWhite(StringUtils.repeat('-', lineSize))
protected val doubleSeparator = boldWhite(StringUtils.repeat('=', lineSize))
def logSubtitle(s:String) : Unit = {
val l = (lineSize - 2 - s.length) / 2
val t = if (l > 3) {
Expand Down Expand Up @@ -149,7 +150,7 @@ private[execution] sealed class RunnerImpl {
logger.info("")
}

def logStatus(title:String, status:Status, duration: Duration, endTime:Instant) : Unit = {
def logStatus(title:String, status:Status, duration: Duration, endTime:Instant, double:Boolean=false) : Unit = {
val msg = status match {
case Status.SUCCESS|Status.SKIPPED =>
boldGreen(s"${status.upper} $title")
Expand All @@ -163,27 +164,46 @@ private[execution] sealed class RunnerImpl {
boldRed(s"UNKNOWN STATE '$status' in $title. Assuming failure")
}

logger.info(separator)
val sep = if (double) doubleSeparator else separator
logger.info(sep)
logger.info(msg)
logger.info(separator)
logger.info(sep)
logger.info(s"Total time: ${fmt(duration)}")
logger.info(s"Finished at: ${endTime.atZone(ZoneId.systemDefault())}")
logger.info(separator)
logger.info(sep)
}

def logResult(title:String, result:Result[_]) : Unit = {
logger.info(separator)
logger.info(boldWhite(s"Execution summary for ${result.category.lower} '${result.identifier}'"))
logger.info("")
for (child <- result.children) {
val name = child.identifier.toString
val status = s"${this.status(child.status)} [${StringUtils.leftPad(fmt(child.duration), 10)}]"
val dots = StringUtils.repeat('.', lineSize - child.status.upper.length - name.length - 15)
logger.info(s"$name $dots $status")
def logJobResult(title:String, result:JobResult) : Unit = {
if (result.children.length > 1) {
logger.info(separator)
logger.info(boldWhite(s"Execution summary for ${result.category.lower} '${result.identifier}'"))
logger.info("")
for (child <- result.children) {
val name = child.identifier.toString
val status = s"${this.status(child.status)} [${StringUtils.leftPad(fmt(child.duration), 10)}]"
val dots = StringUtils.repeat('.', lineSize - child.status.upper.length - name.length - 15)
logger.info(s"$name $dots $status")
}
}
logStatus(title, result.status, result.duration, result.endTime)
}

def logLifecycleResult(title:String, result:LifecycleResult) : Unit = {
logger.info("")
if (result.children.length > 1) {
logger.info(doubleSeparator)
logger.info(boldWhite(s"Overall lifecycle summary for ${result.category.lower} '${result.identifier}'"))
logger.info("")
for (child <- result.children) {
val name = s"Phase ${child.phase.upper}"
val status = s"${this.status(child.status)} [${StringUtils.leftPad(fmt(child.duration), 10)}]"
val dots = StringUtils.repeat('.', lineSize - child.status.upper.length - name.length - 15)
logger.info(s"$name $dots $status")
}
}
logStatus(title, result.status, result.duration, result.endTime, double=true)
}

private def status(status:Status) : String = {
status match {
case Status.SUCCESS|Status.SKIPPED => boldGreen(status.upper)
Expand Down Expand Up @@ -231,12 +251,17 @@ private[execution] final class JobRunnerImpl(runner:Runner) extends RunnerImpl {
require(phases != null)
require(args != null)

logger.info("")
logger.info(separator)
logger.info(s"Executing phases ${phases.map(p => "'" + p + "'").mkString(",")} for job '${job.identifier}'")

val startTime = Instant.now()
val isolated2 = isolated || job.parameters.nonEmpty || job.environment.nonEmpty
withExecution(parentExecution, isolated2) { execution =>
runner.withJobContext(job, args, Some(execution), force, dryRun, isolated2) { (context, arguments) =>
val title = s"lifecycle for job '${job.identifier}' ${arguments.map(kv => kv._1 + "=" + kv._2).mkString(", ")}"
val listeners = if (!dryRun) stateStoreListener +: (runner.hooks ++ job.hooks).map(_.instantiate(context)) else Seq()
execution.withListeners(listeners) { execution =>
val result = execution.withListeners(listeners) { execution =>
execution.monitorLifecycle(job, arguments, phases) { execution =>
val results = Result.flatMap(phases, keepGoing) { phase =>
// Check if build phase really contains any active target. Otherwise we skip this phase and mark it
Expand All @@ -263,6 +288,9 @@ private[execution] final class JobRunnerImpl(runner:Runner) extends RunnerImpl {
LifecycleResult(job, instance, results, startTime)
}
}

logLifecycleResult(title, result)
result
}
}
}
Expand Down Expand Up @@ -302,7 +330,7 @@ private[execution] final class JobRunnerImpl(runner:Runner) extends RunnerImpl {
}
}

logResult(title, result)
logJobResult(title, result)
result
}
}
Expand Down Expand Up @@ -582,7 +610,6 @@ final class Runner(
require(phases != null)
require(args != null)

logger.info(s"Executing phases ${phases.map(p => "'" + p + "'").mkString(",")} for job '${job.identifier}'")
val runner = new JobRunnerImpl(this)
val result = runner.executeJob(job, phases, args, targets, dirtyTargets=dirtyTargets, force=force, keepGoing=keepGoing, dryRun=dryRun, isolated=isolated)
result.status
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ object LifecycleResult {
startTime=startTime,
endTime=Instant.now()
)
def apply(job:Job, lifecycle: JobLifecycle, children : Seq[Result[_]], startTime:Instant) : LifecycleResult =
def apply(job:Job, lifecycle: JobLifecycle, children : Seq[JobResult], startTime:Instant) : LifecycleResult =
LifecycleResult(
job,
lifecycle,
Expand All @@ -147,7 +147,7 @@ object LifecycleResult {
final case class LifecycleResult(
job: Job,
lifecycle: JobLifecycle,
override val children : Seq[Result[_]],
override val children : Seq[JobResult],
override val status: Status,
override val exception: Option[Throwable] = None,
override val startTime : Instant,
Expand Down
2 changes: 1 addition & 1 deletion flowman-dist/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
<parent>
<groupId>com.dimajix.flowman</groupId>
<artifactId>flowman-root</artifactId>
<version>0.21.0</version>
<version>0.21.1</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion flowman-dsl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<parent>
<artifactId>flowman-root</artifactId>
<groupId>com.dimajix.flowman</groupId>
<version>0.21.0</version>
<version>0.21.1</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion flowman-hub/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<parent>
<artifactId>flowman-root</artifactId>
<groupId>com.dimajix.flowman</groupId>
<version>0.21.0</version>
<version>0.21.1</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion flowman-parent/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
<parent>
<groupId>com.dimajix.flowman</groupId>
<artifactId>flowman-root</artifactId>
<version>0.21.0</version>
<version>0.21.1</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion flowman-plugins/aws/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<parent>
<groupId>com.dimajix.flowman</groupId>
<artifactId>flowman-root</artifactId>
<version>0.21.0</version>
<version>0.21.1</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion flowman-plugins/azure/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<parent>
<groupId>com.dimajix.flowman</groupId>
<artifactId>flowman-root</artifactId>
<version>0.21.0</version>
<version>0.21.1</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion flowman-plugins/delta/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<parent>
<groupId>com.dimajix.flowman</groupId>
<artifactId>flowman-root</artifactId>
<version>0.21.0</version>
<version>0.21.1</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion flowman-plugins/impala/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<parent>
<groupId>com.dimajix.flowman</groupId>
<artifactId>flowman-root</artifactId>
<version>0.21.0</version>
<version>0.21.1</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion flowman-plugins/json/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<parent>
<groupId>com.dimajix.flowman</groupId>
<artifactId>flowman-root</artifactId>
<version>0.21.0</version>
<version>0.21.1</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion flowman-plugins/kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<parent>
<groupId>com.dimajix.flowman</groupId>
<artifactId>flowman-root</artifactId>
<version>0.21.0</version>
<version>0.21.1</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion flowman-plugins/mariadb/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<parent>
<groupId>com.dimajix.flowman</groupId>
<artifactId>flowman-root</artifactId>
<version>0.21.0</version>
<version>0.21.1</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion flowman-plugins/mssqlserver/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<parent>
<groupId>com.dimajix.flowman</groupId>
<artifactId>flowman-root</artifactId>
<version>0.21.0</version>
<version>0.21.1</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion flowman-plugins/mysql/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<parent>
<groupId>com.dimajix.flowman</groupId>
<artifactId>flowman-root</artifactId>
<version>0.21.0</version>
<version>0.21.1</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion flowman-plugins/openapi/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<parent>
<groupId>com.dimajix.flowman</groupId>
<artifactId>flowman-root</artifactId>
<version>0.21.0</version>
<version>0.21.1</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
Loading

0 comments on commit 20eba69

Please sign in to comment.