diff --git a/cmd/lotus-shed/dealtracker.go b/cmd/lotus-shed/dealtracker.go index d39f51bd167..8ded6bf4acd 100644 --- a/cmd/lotus-shed/dealtracker.go +++ b/cmd/lotus-shed/dealtracker.go @@ -5,10 +5,10 @@ import ( "encoding/json" "net" "net/http" - "os" - "strings" + "sync" "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/lotus/api" lcli "github.com/filecoin-project/lotus/cli" "github.com/ipfs/go-cid" @@ -19,58 +19,54 @@ type dealStatsServer struct { api api.FullNode } -var filteredClients map[address.Address]bool +// Requested by @jbenet +// How many epochs back to look at for dealstats +var epochLookback = abi.ChainEpoch(10) -func init() { - fc := []string{"t0112", "t0113", "t0114", "t010089"} - - filtered, set := os.LookupEnv("FILTERED_CLIENTS") - if set { - fc = strings.Split(filtered, ":") - } +// these lists grow continuously with the network +// TODO: need to switch this to an LRU of sorts, to ensure refreshes +var knownFiltered = new(sync.Map) +var resolvedWallets = new(sync.Map) - filteredClients = make(map[address.Address]bool) - for _, a := range fc { - addr, err := address.NewFromString(a) +func init() { + for _, a := range []string{ + "t0100", // client for genesis miner + "t0101", // client for genesis miner + "t0102", // client for genesis miner + "t0112", // client for genesis miner + "t0113", // client for genesis miner + "t0114", // client for genesis miner + "t1nslxql4pck5pq7hddlzym3orxlx35wkepzjkm3i", // SR1 dealbot wallet + "t1stghxhdp2w53dym2nz2jtbpk6ccd4l2lxgmezlq", // SR1 dealbot wallet + "t1mcr5xkgv4jdl3rnz77outn6xbmygb55vdejgbfi", // SR1 dealbot wallet + "t1qiqdbbmrdalbntnuapriirduvxu5ltsc5mhy7si", // SR1 dealbot wallet + } { + a, err := address.NewFromString(a) if err != nil { panic(err) } - filteredClients[addr] = true + knownFiltered.Store(a, true) } } type dealCountResp struct { - Total int64 `json:"total"` - Epoch int64 `json:"epoch"` + Epoch int64 `json:"epoch"` + Endpoint string `json:"endpoint"` + Payload int64 `json:"payload"` } func (dss *dealStatsServer) handleStorageDealCount(w http.ResponseWriter, r *http.Request) { - ctx := context.Background() - head, err := dss.api.ChainHead(ctx) - if err != nil { - log.Warnf("failed to get chain head: %s", err) + epoch, deals := dss.filteredDealList() + if epoch == 0 { w.WriteHeader(500) return } - deals, err := dss.api.StateMarketDeals(ctx, head.Key()) - if err != nil { - log.Warnf("failed to get market deals: %s", err) - w.WriteHeader(500) - return - } - - var count int64 - for _, d := range deals { - if !filteredClients[d.Proposal.Client] { - count++ - } - } - if err := json.NewEncoder(w).Encode(&dealCountResp{ - Total: count, - Epoch: int64(head.Height()), + Endpoint: "COUNT_DEALS", + Payload: int64(len(deals)), + Epoch: epoch, }); err != nil { log.Warnf("failed to write back deal count response: %s", err) return @@ -78,39 +74,28 @@ func (dss *dealStatsServer) handleStorageDealCount(w http.ResponseWriter, r *htt } type dealAverageResp struct { - AverageSize int64 `json:"average_size"` - Epoch int64 `json:"epoch"` + Epoch int64 `json:"epoch"` + Endpoint string `json:"endpoint"` + Payload int64 `json:"payload"` } func (dss *dealStatsServer) handleStorageDealAverageSize(w http.ResponseWriter, r *http.Request) { - ctx := context.Background() - head, err := dss.api.ChainHead(ctx) - if err != nil { - log.Warnf("failed to get chain head: %s", err) + epoch, deals := dss.filteredDealList() + if epoch == 0 { w.WriteHeader(500) return } - deals, err := dss.api.StateMarketDeals(ctx, head.Key()) - if err != nil { - log.Warnf("failed to get market deals: %s", err) - w.WriteHeader(500) - return - } - - var count int64 var totalBytes int64 for _, d := range deals { - if !filteredClients[d.Proposal.Client] { - count++ - totalBytes += int64(d.Proposal.PieceSize.Unpadded()) - } + totalBytes += int64(d.deal.Proposal.PieceSize.Unpadded()) } if err := json.NewEncoder(w).Encode(&dealAverageResp{ - AverageSize: totalBytes / count, - Epoch: int64(head.Height()), + Endpoint: "AVERAGE_DEAL_SIZE", + Payload: totalBytes / int64(len(deals)), + Epoch: epoch, }); err != nil { log.Warnf("failed to write back deal average response: %s", err) return @@ -118,37 +103,27 @@ func (dss *dealStatsServer) handleStorageDealAverageSize(w http.ResponseWriter, } type dealTotalResp struct { - TotalBytes int64 `json:"total_size"` - Epoch int64 `json:"epoch"` + Epoch int64 `json:"epoch"` + Endpoint string `json:"endpoint"` + Payload int64 `json:"payload"` } func (dss *dealStatsServer) handleStorageDealTotalReal(w http.ResponseWriter, r *http.Request) { - ctx := context.Background() - - head, err := dss.api.ChainHead(ctx) - if err != nil { - log.Warnf("failed to get chain head: %s", err) - w.WriteHeader(500) - return - } - - deals, err := dss.api.StateMarketDeals(ctx, head.Key()) - if err != nil { - log.Warnf("failed to get market deals: %s", err) + epoch, deals := dss.filteredDealList() + if epoch == 0 { w.WriteHeader(500) return } var totalBytes int64 for _, d := range deals { - if !filteredClients[d.Proposal.Client] { - totalBytes += int64(d.Proposal.PieceSize.Unpadded()) - } + totalBytes += int64(d.deal.Proposal.PieceSize.Unpadded()) } if err := json.NewEncoder(w).Encode(&dealTotalResp{ - TotalBytes: totalBytes, - Epoch: int64(head.Height()), + Endpoint: "DEAL_BYTES", + Payload: totalBytes, + Epoch: epoch, }); err != nil { log.Warnf("failed to write back deal average response: %s", err) return @@ -157,6 +132,12 @@ func (dss *dealStatsServer) handleStorageDealTotalReal(w http.ResponseWriter, r } type clientStatsOutput struct { + Epoch int64 `json:"epoch"` + Endpoint string `json:"endpoint"` + Payload []*clientStats `json:"payload"` +} + +type clientStats struct { Client address.Address `json:"client"` DataSize int64 `json:"data_size"` NumCids int `json:"num_cids"` @@ -168,51 +149,41 @@ type clientStatsOutput struct { } func (dss *dealStatsServer) handleStorageClientStats(w http.ResponseWriter, r *http.Request) { - ctx := context.Background() - - head, err := dss.api.ChainHead(ctx) - if err != nil { - log.Warnf("failed to get chain head: %s", err) - w.WriteHeader(500) - return - } - - deals, err := dss.api.StateMarketDeals(ctx, head.Key()) - if err != nil { - log.Warnf("failed to get market deals: %s", err) + epoch, deals := dss.filteredDealList() + if epoch == 0 { w.WriteHeader(500) return } - stats := make(map[address.Address]*clientStatsOutput) + stats := make(map[address.Address]*clientStats) for _, d := range deals { - if filteredClients[d.Proposal.Client] { - continue - } - st, ok := stats[d.Proposal.Client] + st, ok := stats[d.deal.Proposal.Client] if !ok { - st = &clientStatsOutput{ - Client: d.Proposal.Client, + st = &clientStats{ + Client: d.resolvedWallet, cids: make(map[cid.Cid]bool), providers: make(map[address.Address]bool), } - stats[d.Proposal.Client] = st + stats[d.deal.Proposal.Client] = st } - st.DataSize += int64(d.Proposal.PieceSize.Unpadded()) - st.cids[d.Proposal.PieceCID] = true - st.providers[d.Proposal.Provider] = true + st.DataSize += int64(d.deal.Proposal.PieceSize.Unpadded()) + st.cids[d.deal.Proposal.PieceCID] = true + st.providers[d.deal.Proposal.Provider] = true st.NumDeals++ } - out := make([]*clientStatsOutput, 0, len(stats)) - for _, cso := range stats { - cso.NumCids = len(cso.cids) - cso.NumMiners = len(cso.providers) - - out = append(out, cso) + out := clientStatsOutput{ + Epoch: epoch, + Endpoint: "CLIENT_DEAL_STATS", + Payload: make([]*clientStats, 0, len(stats)), + } + for _, cs := range stats { + cs.NumCids = len(cs.cids) + cs.NumMiners = len(cs.providers) + out.Payload = append(out.Payload, cs) } if err := json.NewEncoder(w).Encode(out); err != nil { @@ -221,6 +192,93 @@ func (dss *dealStatsServer) handleStorageClientStats(w http.ResponseWriter, r *h } } +type dealInfo struct { + deal api.MarketDeal + resolvedWallet address.Address +} + +// filteredDealList returns the current epoch and a list of filtered deals +// on error returns an epoch of 0 +func (dss *dealStatsServer) filteredDealList() (int64, map[string]dealInfo) { + ctx := context.Background() + + head, err := dss.api.ChainHead(ctx) + if err != nil { + log.Warnf("failed to get chain head: %s", err) + return 0, nil + } + + head, err = dss.api.ChainGetTipSetByHeight(ctx, head.Height()-epochLookback, head.Key()) + if err != nil { + log.Warnf("failed to walk back %s epochs: %s", epochLookback, err) + return 0, nil + } + + // Disabled as per @pooja's request + // + // // Exclude any address associated with a miner + // miners, err := dss.api.StateListMiners(ctx, head.Key()) + // if err != nil { + // log.Warnf("failed to get miner list: %s", err) + // return 0, nil + // } + // for _, m := range miners { + // info, err := dss.api.StateMinerInfo(ctx, m, head.Key()) + // if err != nil { + // log.Warnf("failed to get info for known miner '%s': %s", m, err) + // continue + // } + + // knownFiltered.Store(info.Owner, true) + // knownFiltered.Store(info.Worker, true) + // for _, a := range info.ControlAddresses { + // knownFiltered.Store(a, true) + // } + // } + + deals, err := dss.api.StateMarketDeals(ctx, head.Key()) + if err != nil { + log.Warnf("failed to get market deals: %s", err) + return 0, nil + } + + ret := make(map[string]dealInfo, len(deals)) + for dealKey, d := range deals { + + // Counting no-longer-active deals as per Pooja's request + // // /~https://github.com/filecoin-project/specs-actors/blob/v0.9.9/actors/builtin/market/deal.go#L81-L85 + // if d.State.SectorStartEpoch < 0 { + // continue + // } + + if _, isFiltered := knownFiltered.Load(d.Proposal.Client); isFiltered { + continue + } + + if _, wasSeen := resolvedWallets.Load(d.Proposal.Client); !wasSeen { + w, err := dss.api.StateAccountKey(ctx, d.Proposal.Client, head.Key()) + if err != nil { + log.Warnf("failed to resolve id '%s' to wallet address: %s", d.Proposal.Client, err) + continue + } else { + resolvedWallets.Store(d.Proposal.Client, w) + } + } + + w, _ := resolvedWallets.Load(d.Proposal.Client) + if _, isFiltered := knownFiltered.Load(w); isFiltered { + continue + } + + ret[dealKey] = dealInfo{ + deal: d, + resolvedWallet: w.(address.Address), + } + } + + return int64(head.Height()), ret +} + var serveDealStatsCmd = &cli.Command{ Name: "serve-deal-stats", Flags: []cli.Flag{}, @@ -260,6 +318,8 @@ var serveDealStatsCmd = &cli.Command{ panic(err) } + log.Warnf("deal-stat server listening on %s\n== NOTE: QUERIES ARE EXPENSIVE - YOU MUST FRONT-CACHE THIS SERVICE\n", list.Addr().String()) + return s.Serve(list) }, }