From 8955b8d8a7ae3110acd82f072e9d739be7f73ac1 Mon Sep 17 00:00:00 2001 From: Peter Rabbitson Date: Sat, 26 Sep 2020 21:16:28 +0200 Subject: [PATCH 1/6] Centralize filtering, output wallet addresses --- cmd/lotus-shed/dealtracker.go | 186 ++++++++++++++++++---------------- 1 file changed, 97 insertions(+), 89 deletions(-) diff --git a/cmd/lotus-shed/dealtracker.go b/cmd/lotus-shed/dealtracker.go index d39f51bd167..a219230090a 100644 --- a/cmd/lotus-shed/dealtracker.go +++ b/cmd/lotus-shed/dealtracker.go @@ -5,8 +5,7 @@ import ( "encoding/json" "net" "net/http" - "os" - "strings" + "sync" "github.com/filecoin-project/go-address" "github.com/filecoin-project/lotus/api" @@ -19,23 +18,27 @@ type dealStatsServer struct { api api.FullNode } -var filteredClients map[address.Address]bool +// these lists grow continuously with the network +// TODO: need to switch this to an LRU of sorts, to ensure refreshes +var knownFiltered = new(sync.Map) +var resolvedWallets = new(sync.Map) func init() { - fc := []string{"t0112", "t0113", "t0114", "t010089"} - - filtered, set := os.LookupEnv("FILTERED_CLIENTS") - if set { - fc = strings.Split(filtered, ":") - } - - filteredClients = make(map[address.Address]bool) - for _, a := range fc { - addr, err := address.NewFromString(a) + for _, a := range []string{ + "t0100", // client for genesis miner + "t0112", // client for genesis miner + "t0113", // client for genesis miner + "t0114", // client for genesis miner + "t1nslxql4pck5pq7hddlzym3orxlx35wkepzjkm3i", // SR1 dealbot wallet + "t1stghxhdp2w53dym2nz2jtbpk6ccd4l2lxgmezlq", // SR1 dealbot wallet + "t1mcr5xkgv4jdl3rnz77outn6xbmygb55vdejgbfi", // SR1 dealbot wallet + "t1qiqdbbmrdalbntnuapriirduvxu5ltsc5mhy7si", // SR1 dealbot wallet + } { + a, err := address.NewFromString(a) if err != nil { panic(err) } - filteredClients[addr] = true + knownFiltered.Store(a, true) } } @@ -45,32 +48,16 @@ type dealCountResp struct { } func (dss *dealStatsServer) handleStorageDealCount(w http.ResponseWriter, r *http.Request) { - ctx := context.Background() - - head, err := dss.api.ChainHead(ctx) - if err != nil { - log.Warnf("failed to get chain head: %s", err) - w.WriteHeader(500) - return - } - deals, err := dss.api.StateMarketDeals(ctx, head.Key()) - if err != nil { - log.Warnf("failed to get market deals: %s", err) + epoch, deals := dss.filteredDealList() + if epoch == 0 { w.WriteHeader(500) return } - var count int64 - for _, d := range deals { - if !filteredClients[d.Proposal.Client] { - count++ - } - } - if err := json.NewEncoder(w).Encode(&dealCountResp{ - Total: count, - Epoch: int64(head.Height()), + Total: int64(len(deals)), + Epoch: epoch, }); err != nil { log.Warnf("failed to write back deal count response: %s", err) return @@ -83,34 +70,21 @@ type dealAverageResp struct { } func (dss *dealStatsServer) handleStorageDealAverageSize(w http.ResponseWriter, r *http.Request) { - ctx := context.Background() - - head, err := dss.api.ChainHead(ctx) - if err != nil { - log.Warnf("failed to get chain head: %s", err) - w.WriteHeader(500) - return - } - deals, err := dss.api.StateMarketDeals(ctx, head.Key()) - if err != nil { - log.Warnf("failed to get market deals: %s", err) + epoch, deals := dss.filteredDealList() + if epoch == 0 { w.WriteHeader(500) return } - var count int64 var totalBytes int64 for _, d := range deals { - if !filteredClients[d.Proposal.Client] { - count++ - totalBytes += int64(d.Proposal.PieceSize.Unpadded()) - } + totalBytes += int64(d.deal.Proposal.PieceSize.Unpadded()) } if err := json.NewEncoder(w).Encode(&dealAverageResp{ - AverageSize: totalBytes / count, - Epoch: int64(head.Height()), + AverageSize: totalBytes / int64(len(deals)), + Epoch: epoch, }); err != nil { log.Warnf("failed to write back deal average response: %s", err) return @@ -123,32 +97,20 @@ type dealTotalResp struct { } func (dss *dealStatsServer) handleStorageDealTotalReal(w http.ResponseWriter, r *http.Request) { - ctx := context.Background() - - head, err := dss.api.ChainHead(ctx) - if err != nil { - log.Warnf("failed to get chain head: %s", err) - w.WriteHeader(500) - return - } - - deals, err := dss.api.StateMarketDeals(ctx, head.Key()) - if err != nil { - log.Warnf("failed to get market deals: %s", err) + epoch, deals := dss.filteredDealList() + if epoch == 0 { w.WriteHeader(500) return } var totalBytes int64 for _, d := range deals { - if !filteredClients[d.Proposal.Client] { - totalBytes += int64(d.Proposal.PieceSize.Unpadded()) - } + totalBytes += int64(d.deal.Proposal.PieceSize.Unpadded()) } if err := json.NewEncoder(w).Encode(&dealTotalResp{ TotalBytes: totalBytes, - Epoch: int64(head.Height()), + Epoch: epoch, }); err != nil { log.Warnf("failed to write back deal average response: %s", err) return @@ -168,18 +130,8 @@ type clientStatsOutput struct { } func (dss *dealStatsServer) handleStorageClientStats(w http.ResponseWriter, r *http.Request) { - ctx := context.Background() - - head, err := dss.api.ChainHead(ctx) - if err != nil { - log.Warnf("failed to get chain head: %s", err) - w.WriteHeader(500) - return - } - - deals, err := dss.api.StateMarketDeals(ctx, head.Key()) - if err != nil { - log.Warnf("failed to get market deals: %s", err) + epoch, deals := dss.filteredDealList() + if epoch == 0 { w.WriteHeader(500) return } @@ -187,23 +139,20 @@ func (dss *dealStatsServer) handleStorageClientStats(w http.ResponseWriter, r *h stats := make(map[address.Address]*clientStatsOutput) for _, d := range deals { - if filteredClients[d.Proposal.Client] { - continue - } - st, ok := stats[d.Proposal.Client] + st, ok := stats[d.deal.Proposal.Client] if !ok { st = &clientStatsOutput{ - Client: d.Proposal.Client, + Client: d.resolvedWallet, cids: make(map[cid.Cid]bool), providers: make(map[address.Address]bool), } - stats[d.Proposal.Client] = st + stats[d.deal.Proposal.Client] = st } - st.DataSize += int64(d.Proposal.PieceSize.Unpadded()) - st.cids[d.Proposal.PieceCID] = true - st.providers[d.Proposal.Provider] = true + st.DataSize += int64(d.deal.Proposal.PieceSize.Unpadded()) + st.cids[d.deal.Proposal.PieceCID] = true + st.providers[d.deal.Proposal.Provider] = true st.NumDeals++ } @@ -221,6 +170,65 @@ func (dss *dealStatsServer) handleStorageClientStats(w http.ResponseWriter, r *h } } +type dealInfo struct { + deal api.MarketDeal + resolvedWallet address.Address +} + +// filteredDealList returns the current epoch and a list of filtered deals +// on error returns an epoch of 0 +func (dss *dealStatsServer) filteredDealList() (int64, map[string]dealInfo) { + ctx := context.Background() + + head, err := dss.api.ChainHead(ctx) + if err != nil { + log.Warnf("failed to get chain head: %s", err) + return 0, nil + } + + deals, err := dss.api.StateMarketDeals(ctx, head.Key()) + if err != nil { + log.Warnf("failed to get market deals: %s", err) + return 0, nil + } + + ret := make(map[string]dealInfo, len(deals)) + for dealKey, d := range deals { + + // Counting no-longer-active deals as per Pooja's request + // // /~https://github.com/filecoin-project/specs-actors/blob/v0.9.9/actors/builtin/market/deal.go#L81-L85 + // if d.State.SectorStartEpoch < 0 { + // continue + // } + + if _, isFiltered := knownFiltered.Load(d.Proposal.Client); isFiltered { + continue + } + + if _, wasSeen := resolvedWallets.Load(d.Proposal.Client); !wasSeen { + w, err := dss.api.StateAccountKey(ctx, d.Proposal.Client, head.Key()) + if err != nil { + log.Warnf("failed to resolve id '%s' to wallet address: %s", d.Proposal.Client, err) + continue + } else { + resolvedWallets.Store(d.Proposal.Client, w) + } + } + + w, _ := resolvedWallets.Load(d.Proposal.Client) + if _, isFiltered := knownFiltered.Load(w); isFiltered { + continue + } + + ret[dealKey] = dealInfo{ + deal: d, + resolvedWallet: w.(address.Address), + } + } + + return int64(head.Height()), ret +} + var serveDealStatsCmd = &cli.Command{ Name: "serve-deal-stats", Flags: []cli.Flag{}, From 10cdbadd82158cc36959877733ad008188b6aa14 Mon Sep 17 00:00:00 2001 From: Peter Rabbitson Date: Sat, 26 Sep 2020 21:29:11 +0200 Subject: [PATCH 2/6] Arrange json as the frontend expects it --- cmd/lotus-shed/dealtracker.go | 55 ++++++++++++++++++++++------------- 1 file changed, 35 insertions(+), 20 deletions(-) diff --git a/cmd/lotus-shed/dealtracker.go b/cmd/lotus-shed/dealtracker.go index a219230090a..1340dc9c6fa 100644 --- a/cmd/lotus-shed/dealtracker.go +++ b/cmd/lotus-shed/dealtracker.go @@ -43,8 +43,9 @@ func init() { } type dealCountResp struct { - Total int64 `json:"total"` - Epoch int64 `json:"epoch"` + Epoch int64 `json:"epoch"` + Endpoint string `json:"endpoint"` + Payload int64 `json:"payload"` } func (dss *dealStatsServer) handleStorageDealCount(w http.ResponseWriter, r *http.Request) { @@ -56,8 +57,9 @@ func (dss *dealStatsServer) handleStorageDealCount(w http.ResponseWriter, r *htt } if err := json.NewEncoder(w).Encode(&dealCountResp{ - Total: int64(len(deals)), - Epoch: epoch, + Endpoint: "COUNT_DEALS", + Payload: int64(len(deals)), + Epoch: epoch, }); err != nil { log.Warnf("failed to write back deal count response: %s", err) return @@ -65,8 +67,9 @@ func (dss *dealStatsServer) handleStorageDealCount(w http.ResponseWriter, r *htt } type dealAverageResp struct { - AverageSize int64 `json:"average_size"` - Epoch int64 `json:"epoch"` + Epoch int64 `json:"epoch"` + Endpoint string `json:"endpoint"` + Payload int64 `json:"payload"` } func (dss *dealStatsServer) handleStorageDealAverageSize(w http.ResponseWriter, r *http.Request) { @@ -83,8 +86,9 @@ func (dss *dealStatsServer) handleStorageDealAverageSize(w http.ResponseWriter, } if err := json.NewEncoder(w).Encode(&dealAverageResp{ - AverageSize: totalBytes / int64(len(deals)), - Epoch: epoch, + Endpoint: "AVERAGE_DEAL_SIZE", + Payload: totalBytes / int64(len(deals)), + Epoch: epoch, }); err != nil { log.Warnf("failed to write back deal average response: %s", err) return @@ -92,8 +96,9 @@ func (dss *dealStatsServer) handleStorageDealAverageSize(w http.ResponseWriter, } type dealTotalResp struct { - TotalBytes int64 `json:"total_size"` - Epoch int64 `json:"epoch"` + Epoch int64 `json:"epoch"` + Endpoint string `json:"endpoint"` + Payload int64 `json:"payload"` } func (dss *dealStatsServer) handleStorageDealTotalReal(w http.ResponseWriter, r *http.Request) { @@ -109,8 +114,9 @@ func (dss *dealStatsServer) handleStorageDealTotalReal(w http.ResponseWriter, r } if err := json.NewEncoder(w).Encode(&dealTotalResp{ - TotalBytes: totalBytes, - Epoch: epoch, + Endpoint: "DEAL_BYTES", + Payload: totalBytes, + Epoch: epoch, }); err != nil { log.Warnf("failed to write back deal average response: %s", err) return @@ -119,6 +125,12 @@ func (dss *dealStatsServer) handleStorageDealTotalReal(w http.ResponseWriter, r } type clientStatsOutput struct { + Epoch int64 `json:"epoch"` + Endpoint string `json:"endpoint"` + Payload []*clientStats `json:"payload"` +} + +type clientStats struct { Client address.Address `json:"client"` DataSize int64 `json:"data_size"` NumCids int `json:"num_cids"` @@ -136,13 +148,13 @@ func (dss *dealStatsServer) handleStorageClientStats(w http.ResponseWriter, r *h return } - stats := make(map[address.Address]*clientStatsOutput) + stats := make(map[address.Address]*clientStats) for _, d := range deals { st, ok := stats[d.deal.Proposal.Client] if !ok { - st = &clientStatsOutput{ + st = &clientStats{ Client: d.resolvedWallet, cids: make(map[cid.Cid]bool), providers: make(map[address.Address]bool), @@ -156,12 +168,15 @@ func (dss *dealStatsServer) handleStorageClientStats(w http.ResponseWriter, r *h st.NumDeals++ } - out := make([]*clientStatsOutput, 0, len(stats)) - for _, cso := range stats { - cso.NumCids = len(cso.cids) - cso.NumMiners = len(cso.providers) - - out = append(out, cso) + out := clientStatsOutput{ + Epoch: epoch, + Endpoint: "CLIENT_DEAL_STATS", + Payload: make([]*clientStats, 0, len(stats)), + } + for _, cs := range stats { + cs.NumCids = len(cs.cids) + cs.NumMiners = len(cs.providers) + out.Payload = append(out.Payload, cs) } if err := json.NewEncoder(w).Encode(out); err != nil { From 1483f1e59adcbd91ab90ca01a3b8591e8f37c1fc Mon Sep 17 00:00:00 2001 From: Peter Rabbitson Date: Sat, 26 Sep 2020 21:43:49 +0200 Subject: [PATCH 3/6] Add filtering of addresses associated with miners --- cmd/lotus-shed/dealtracker.go | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/cmd/lotus-shed/dealtracker.go b/cmd/lotus-shed/dealtracker.go index 1340dc9c6fa..719e821a503 100644 --- a/cmd/lotus-shed/dealtracker.go +++ b/cmd/lotus-shed/dealtracker.go @@ -201,6 +201,26 @@ func (dss *dealStatsServer) filteredDealList() (int64, map[string]dealInfo) { return 0, nil } + // Exclude any address associated with a miner + miners, err := dss.api.StateListMiners(ctx, head.Key()) + if err != nil { + log.Warnf("failed to get miner list: %s", err) + return 0, nil + } + for _, m := range miners { + info, err := dss.api.StateMinerInfo(ctx, m, head.Key()) + if err != nil { + log.Warnf("failed to get info for known miner '%s': %s", m, err) + continue + } + + knownFiltered.Store(info.Owner, true) + knownFiltered.Store(info.Worker, true) + for _, a := range info.ControlAddresses { + knownFiltered.Store(a, true) + } + } + deals, err := dss.api.StateMarketDeals(ctx, head.Key()) if err != nil { log.Warnf("failed to get market deals: %s", err) From fb3bcc4ce527961fa172898a180d1fbb865fc133 Mon Sep 17 00:00:00 2001 From: Peter Rabbitson Date: Sat, 26 Sep 2020 21:49:05 +0200 Subject: [PATCH 4/6] Add startup warning --- cmd/lotus-shed/dealtracker.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cmd/lotus-shed/dealtracker.go b/cmd/lotus-shed/dealtracker.go index 719e821a503..8a9d0d6f3fb 100644 --- a/cmd/lotus-shed/dealtracker.go +++ b/cmd/lotus-shed/dealtracker.go @@ -303,6 +303,8 @@ var serveDealStatsCmd = &cli.Command{ panic(err) } + log.Warnf("deal-stat server listening on %s\n== NOTE: QUERIES ARE EXPENSIVE - YOU MUST FRONT-CACHE THIS SERVICE\n", list.Addr().String()) + return s.Serve(list) }, } From e4c1f090af73cc0e9f997b3de5e9a9443b493ad5 Mon Sep 17 00:00:00 2001 From: Peter Rabbitson Date: Sun, 27 Sep 2020 20:44:50 +0200 Subject: [PATCH 5/6] Disable exclusion of miner-associated addresses --- cmd/lotus-shed/dealtracker.go | 42 +++++++++++++++++++---------------- 1 file changed, 23 insertions(+), 19 deletions(-) diff --git a/cmd/lotus-shed/dealtracker.go b/cmd/lotus-shed/dealtracker.go index 8a9d0d6f3fb..083db8ecbaa 100644 --- a/cmd/lotus-shed/dealtracker.go +++ b/cmd/lotus-shed/dealtracker.go @@ -26,6 +26,8 @@ var resolvedWallets = new(sync.Map) func init() { for _, a := range []string{ "t0100", // client for genesis miner + "t0101", // client for genesis miner + "t0102", // client for genesis miner "t0112", // client for genesis miner "t0113", // client for genesis miner "t0114", // client for genesis miner @@ -201,25 +203,27 @@ func (dss *dealStatsServer) filteredDealList() (int64, map[string]dealInfo) { return 0, nil } - // Exclude any address associated with a miner - miners, err := dss.api.StateListMiners(ctx, head.Key()) - if err != nil { - log.Warnf("failed to get miner list: %s", err) - return 0, nil - } - for _, m := range miners { - info, err := dss.api.StateMinerInfo(ctx, m, head.Key()) - if err != nil { - log.Warnf("failed to get info for known miner '%s': %s", m, err) - continue - } - - knownFiltered.Store(info.Owner, true) - knownFiltered.Store(info.Worker, true) - for _, a := range info.ControlAddresses { - knownFiltered.Store(a, true) - } - } + // Disabled as per @pooja's request + // + // // Exclude any address associated with a miner + // miners, err := dss.api.StateListMiners(ctx, head.Key()) + // if err != nil { + // log.Warnf("failed to get miner list: %s", err) + // return 0, nil + // } + // for _, m := range miners { + // info, err := dss.api.StateMinerInfo(ctx, m, head.Key()) + // if err != nil { + // log.Warnf("failed to get info for known miner '%s': %s", m, err) + // continue + // } + + // knownFiltered.Store(info.Owner, true) + // knownFiltered.Store(info.Worker, true) + // for _, a := range info.ControlAddresses { + // knownFiltered.Store(a, true) + // } + // } deals, err := dss.api.StateMarketDeals(ctx, head.Key()) if err != nil { From be5dc2c57fb8dfde934ddaa63b22d25dd5ca0356 Mon Sep 17 00:00:00 2001 From: Peter Rabbitson Date: Sun, 27 Sep 2020 20:45:45 +0200 Subject: [PATCH 6/6] Walk back 10 epochs for stat generation --- cmd/lotus-shed/dealtracker.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/cmd/lotus-shed/dealtracker.go b/cmd/lotus-shed/dealtracker.go index 083db8ecbaa..8ded6bf4acd 100644 --- a/cmd/lotus-shed/dealtracker.go +++ b/cmd/lotus-shed/dealtracker.go @@ -8,6 +8,7 @@ import ( "sync" "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/lotus/api" lcli "github.com/filecoin-project/lotus/cli" "github.com/ipfs/go-cid" @@ -18,6 +19,10 @@ type dealStatsServer struct { api api.FullNode } +// Requested by @jbenet +// How many epochs back to look at for dealstats +var epochLookback = abi.ChainEpoch(10) + // these lists grow continuously with the network // TODO: need to switch this to an LRU of sorts, to ensure refreshes var knownFiltered = new(sync.Map) @@ -203,6 +208,12 @@ func (dss *dealStatsServer) filteredDealList() (int64, map[string]dealInfo) { return 0, nil } + head, err = dss.api.ChainGetTipSetByHeight(ctx, head.Height()-epochLookback, head.Key()) + if err != nil { + log.Warnf("failed to walk back %s epochs: %s", epochLookback, err) + return 0, nil + } + // Disabled as per @pooja's request // // // Exclude any address associated with a miner