Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove kamon tracing #1662

Merged
merged 1 commit into from
Jan 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 0 additions & 13 deletions eclair-core/src/main/scala/fr/acinq/eclair/PimpKamon.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@

package fr.acinq.eclair

import fr.acinq.eclair.payment.{LocalFailure, PaymentFailure, RemoteFailure, UnreadableRemoteFailure}
import kamon.metric.Timer
import kamon.trace.Span

import scala.concurrent.{ExecutionContext, Future}

Expand All @@ -40,15 +38,4 @@ object KamonExt {
res
}

/**
* A helper function that fails a span with proper messages when dealing with payments
*/
def failSpan(span: Span, failure: PaymentFailure) = {
failure match {
case LocalFailure(_, t) => span.fail("local failure", t)
case RemoteFailure(_, e) => span.fail(s"remote failure: origin=${e.originNode} error=${e.failureMessage}")
case UnreadableRemoteFailure(_) => span.fail("unreadable remote failure")
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import fr.acinq.eclair.ShortChannelId.coordinates
import fr.acinq.eclair.TxCoordinates
import fr.acinq.eclair.blockchain.{GetTxWithMetaResponse, UtxoStatus, ValidateResult}
import fr.acinq.eclair.wire.ChannelAnnouncement
import kamon.Kamon
import org.json4s.Formats
import org.json4s.JsonAST._

Expand Down Expand Up @@ -170,31 +169,20 @@ class ExtendedBitcoinClient(val rpcClient: BitcoinJsonRPCClient) {

def validate(c: ChannelAnnouncement)(implicit ec: ExecutionContext): Future[ValidateResult] = {
val TxCoordinates(blockHeight, txIndex, outputIndex) = coordinates(c.shortChannelId)
val span = Kamon.spanBuilder("validate-bitcoin-client").start()
for {
_ <- Future.successful(0)
span0 = Kamon.spanBuilder("getblockhash").start()
blockHash <- rpcClient.invoke("getblockhash", blockHeight).map(_.extractOpt[String].map(ByteVector32.fromValidHex).getOrElse(ByteVector32.Zeroes))
_ = span0.finish()
span1 = Kamon.spanBuilder("getblock").start()
txid: ByteVector32 <- rpcClient.invoke("getblock", blockHash).map(json => Try {
val JArray(txs) = json \ "tx"
ByteVector32.fromValidHex(txs(txIndex).extract[String])
}.getOrElse(ByteVector32.Zeroes))
_ = span1.finish()
span2 = Kamon.spanBuilder("getrawtx").start()
tx <- getRawTransaction(txid)
_ = span2.finish()
span3 = Kamon.spanBuilder("utxospendable-mempool").start()
unspent <- isTransactionOutputSpendable(txid, outputIndex, includeMempool = true)
_ = span3.finish()
fundingTxStatus <- if (unspent) {
Future.successful(UtxoStatus.Unspent)
} else {
// if this returns true, it means that the spending tx is *not* in the blockchain
isTransactionOutputSpendable(txid, outputIndex, includeMempool = false).map(res => UtxoStatus.Spent(spendingTxConfirmed = !res))
}
_ = span.finish()
} yield ValidateResult(c, Right((Transaction.read(tx), fundingTxStatus)))
} recover {
case t: Throwable => ValidateResult(c, Left(t))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ import fr.acinq.eclair.router.RouteCalculation
import fr.acinq.eclair.router.Router._
import fr.acinq.eclair.wire._
import fr.acinq.eclair.{CltvExpiry, FSMDiagnosticActorLogging, Logs, MilliSatoshi, MilliSatoshiLong, NodeParams}
import kamon.Kamon
import kamon.context.Context

import java.util.UUID
import java.util.concurrent.TimeUnit
Expand All @@ -57,13 +55,6 @@ class MultiPartPaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig,
val start = System.currentTimeMillis
private var retriedFailedChannels = false

private val span = Kamon.spanBuilder("multi-part-payment")
.tag(Tags.ParentId, cfg.parentId.toString)
.tag(Tags.PaymentHash, paymentHash.toHex)
.tag(Tags.RecipientNodeId, cfg.recipientNodeId.toString())
.tag(Tags.RecipientAmount, cfg.recipientAmount.toLong)
.start()

startWith(WAIT_FOR_PAYMENT_REQUEST, WaitingForRequest)

when(WAIT_FOR_PAYMENT_REQUEST) {
Expand All @@ -83,11 +74,7 @@ class MultiPartPaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig,
val (toSend, maxFee) = remainingToSend(nodeParams, d.request, d.pending.values)
if (routes.map(_.amount).sum == toSend) {
val childPayments = routes.map(route => (UUID.randomUUID(), route)).toMap
Kamon.runWithContextEntry(parentPaymentIdKey, cfg.parentId) {
Kamon.runWithSpan(span, finishSpan = true) {
childPayments.foreach { case (childId, route) => spawnChildPaymentFsm(childId) ! createChildPayment(self, route, d.request) }
}
}
childPayments.foreach { case (childId, route) => spawnChildPaymentFsm(childId) ! createChildPayment(self, route, d.request) }
goto(PAYMENT_IN_PROGRESS) using d.copy(remainingAttempts = (d.remainingAttempts - 1).max(0), pending = d.pending ++ childPayments)
} else {
// If a child payment failed while we were waiting for routes, the routes we received don't cover the whole
Expand Down Expand Up @@ -242,7 +229,6 @@ class MultiPartPaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig,
case Left(paymentFailed) =>
log.warning("multi-part payment failed")
reply(origin, paymentFailed)
span.fail("payment failed")
case Right(paymentSent) =>
log.info("multi-part payment succeeded")
reply(origin, paymentSent)
Expand All @@ -254,7 +240,6 @@ class MultiPartPaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig,
if (retriedFailedChannels) {
Metrics.RetryFailedChannelsResult.withTag(Tags.Success, event.isRight).increment()
}
span.finish()
stop(FSM.Normal)
}

Expand All @@ -278,8 +263,6 @@ class MultiPartPaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig,

object MultiPartPaymentLifecycle {

val parentPaymentIdKey = Context.key[UUID]("parentPaymentId", UUID.fromString("00000000-0000-0000-0000-000000000000"))

def props(nodeParams: NodeParams, cfg: SendPaymentConfig, router: ActorRef, register: ActorRef) = Props(new MultiPartPaymentLifecycle(nodeParams, cfg, router, register))

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

package fr.acinq.eclair.payment.send

import java.util.concurrent.TimeUnit

import akka.actor.{ActorRef, FSM, Props, Status}
import akka.event.Logging.MDC
import fr.acinq.bitcoin.ByteVector32
Expand All @@ -36,9 +34,8 @@ import fr.acinq.eclair.router.Router._
import fr.acinq.eclair.router._
import fr.acinq.eclair.wire.Onion._
import fr.acinq.eclair.wire._
import kamon.Kamon
import kamon.trace.Span

import java.util.concurrent.TimeUnit
import scala.util.{Failure, Success}

/**
Expand All @@ -52,28 +49,10 @@ class PaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig, router: A
private val paymentsDb = nodeParams.db.payments
private val start = System.currentTimeMillis

private val span = Kamon.runWithContextEntry(MultiPartPaymentLifecycle.parentPaymentIdKey, cfg.parentId) {
val spanBuilder = if (Kamon.currentSpan().isEmpty) {
Kamon.spanBuilder("single-payment")
} else {
Kamon.spanBuilder("payment-part").asChildOf(Kamon.currentSpan())
}
spanBuilder
.tag(Tags.PaymentId, cfg.id.toString)
.tag(Tags.PaymentHash, paymentHash.toHex)
.tag(Tags.RecipientNodeId, cfg.recipientNodeId.toString())
.tag(Tags.RecipientAmount, cfg.recipientAmount.toLong)
.start()
}

startWith(WAITING_FOR_REQUEST, WaitingForRequest)

when(WAITING_FOR_REQUEST) {
case Event(c: SendPaymentToRoute, WaitingForRequest) =>
span.tag(Tags.TargetNodeId, c.targetNodeId.toString())
span.tag(Tags.Amount, c.finalPayload.amount.toLong)
span.tag(Tags.TotalAmount, c.finalPayload.totalAmount.toLong)
span.tag(Tags.Expiry, c.finalPayload.expiry.toLong)
log.debug("sending {} to route {}", c.finalPayload.amount, c.printRoute())
val send = SendPayment(c.replyTo, c.targetNodeId, c.finalPayload, maxAttempts = 1, assistedRoutes = c.assistedRoutes)
c.route.fold(
Expand All @@ -86,10 +65,6 @@ class PaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig, router: A
goto(WAITING_FOR_ROUTE) using WaitingForRoute(send, Nil, Ignore.empty)

case Event(c: SendPayment, WaitingForRequest) =>
span.tag(Tags.TargetNodeId, c.targetNodeId.toString())
span.tag(Tags.Amount, c.finalPayload.amount.toLong)
span.tag(Tags.TotalAmount, c.finalPayload.totalAmount.toLong)
span.tag(Tags.Expiry, c.finalPayload.expiry.toLong)
log.debug("sending {} to {}", c.finalPayload.amount, c.targetNodeId)
router ! RouteRequest(nodeParams.nodeId, c.targetNodeId, c.finalPayload.amount, c.getMaxFee(nodeParams), c.assistedRoutes, routeParams = c.routeParams, paymentContext = Some(cfg.paymentContext))
if (cfg.storeInDb) {
Expand All @@ -109,7 +84,7 @@ class PaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig, router: A
log.warning("router error: {}", t.getMessage)
Metrics.PaymentError.withTag(Tags.Failure, Tags.FailureType(LocalFailure(Nil, t))).increment()
onFailure(c.replyTo, PaymentFailed(id, paymentHash, failures :+ LocalFailure(Nil, t)))
myStop()
stop(FSM.Normal)
}

when(WAITING_FOR_PAYMENT_COMPLETE) {
Expand All @@ -122,7 +97,7 @@ class PaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig, router: A
Metrics.PaymentAttempt.withTag(Tags.MultiPart, value = false).record(d.failures.size + 1)
val p = PartialPayment(id, d.c.finalPayload.amount, d.cmd.amount - d.c.finalPayload.amount, htlc.channelId, Some(cfg.fullRoute(d.route)))
onSuccess(d.c.replyTo, cfg.createPaymentSent(fulfill.paymentPreimage, p :: Nil))
myStop()
stop(FSM.Normal)

case Event(RES_ADD_SETTLED(_, _, fail: HtlcResult.Fail), d: WaitingForComplete) =>
fail match {
Expand All @@ -143,24 +118,6 @@ class PaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig, router: A
}
}

private var stateSpan: Option[Span] = None

onTransition {
case _ -> state2 =>
// whenever there is a transition we stop the current span and start a new one, this way we can track each state
val stateSpanBuilder = Kamon.spanBuilder(state2.toString).asChildOf(span)
nextStateData match {
case d: WaitingForRoute =>
// this means that previous state was WAITING_FOR_COMPLETE
d.failures.lastOption.foreach(failure => stateSpan.foreach(span => KamonExt.failSpan(span, failure)))
case d: WaitingForComplete =>
stateSpanBuilder.tag("route", s"${cfg.fullRoute(d.route).map(_.nextNodeId).mkString("->")}")
case _ => ()
}
stateSpan.foreach(_.finish())
stateSpan = Some(stateSpanBuilder.start())
}

whenUnhandled {
case Event(_: TransportHandler.ReadAck, _) => stay // ignored, router replies with this when we forward a channel_update
}
Expand All @@ -187,7 +144,7 @@ class PaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig, router: A
retry(failure, d)
} else {
onFailure(d.c.replyTo, PaymentFailed(id, paymentHash, d.failures :+ LocalFailure(cfg.fullRoute(d.route), t)))
myStop()
stop(FSM.Normal)
}
}

Expand All @@ -205,7 +162,7 @@ class PaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig, router: A
// if destination node returns an error, we fail the payment immediately
log.warning(s"received an error message from target nodeId=$nodeId, failing the payment (failure=$failureMessage)")
onFailure(c.replyTo, PaymentFailed(id, paymentHash, failures :+ RemoteFailure(cfg.fullRoute(route), e)))
myStop()
stop(FSM.Normal)
case res if failures.size + 1 >= c.maxAttempts =>
// otherwise we never try more than maxAttempts, no matter the kind of error returned
val failure = res match {
Expand All @@ -222,7 +179,7 @@ class PaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig, router: A
}
log.warning(s"too many failed attempts, failing the payment")
onFailure(c.replyTo, PaymentFailed(id, paymentHash, failures :+ failure))
myStop()
stop(FSM.Normal)
case Failure(t) =>
log.warning(s"cannot parse returned error: ${t.getMessage}, route=${route.printNodes()}")
val failure = UnreadableRemoteFailure(cfg.fullRoute(route))
Expand Down Expand Up @@ -300,12 +257,6 @@ class PaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig, router: A
}
}

private def myStop(): State = {
stateSpan.foreach(_.finish())
span.finish()
stop(FSM.Normal)
}

private def onSuccess(replyTo: ActorRef, result: PaymentSent): Unit = {
if (cfg.storeInDb) paymentsDb.updateOutgoingPayment(result)
replyTo ! result
Expand All @@ -317,7 +268,6 @@ class PaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig, router: A
}

private def onFailure(replyTo: ActorRef, result: PaymentFailed): Unit = {
span.fail("payment failed")
if (cfg.storeInDb) paymentsDb.updateOutgoingPayment(result)
replyTo ! result
if (cfg.publishEvent) context.system.eventStream.publish(result)
Expand Down
Loading