-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Compaction group concurrency #3807
Compaction group concurrency #3807
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this!
This looks quite good, but I wonder if grouper is the right approach. Looks like we are taking planner as blackbox and trying to mitigate things, where actually planner is the thing that should do the most of work (e.g consider bigger group first, not just next one).
Also I don' think this fixes #3806 (comment), this is orthogonal, right?
@@ -206,6 +208,11 @@ type Grouper interface { | |||
// Groups returns the compaction groups for all blocks currently known to the syncer. | |||
// It creates all groups from the scratch on every call. | |||
Groups(blocks map[ulid.ULID]*metadata.Meta) (res []*Group, err error) | |||
// Starting a group lets the grouper know that the blocks in the groups are currently | |||
// being compacted and to ignore them in future plans | |||
StartGroup(group *Group) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not have Group.Lock
method? 🤗
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will update. Thanks for taking a look at these changes. I agree most of the work should be done by the planner, the reason i didn't go that path is the number of changes required would be substantial. Happy to go down that path if it's preferred
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After a second look Group lock wont work because its the grouper that needs to know about whats currently running.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well the grouper can still maintain link to those groups, since those are pointers.
It really depends what is easier to use and understand for customers of this interface. I feel like Getting groups and locking individual ones (literally using Lock
method) might be cleaner. We also get simpler easier interface. We also naturally will allow grouper to do what's needed.
StartGroup(group *Grouper)
might be bit confusing. It's easy to think that this method is starting compaction or something - but actually it's only locking.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WDYT?
pkg/compact/compact.go
Outdated
} | ||
for i, mm := range group.metasByMinTime { | ||
if mm.ULID == m.ULID { | ||
group.metasByMinTime = append(group.metasByMinTime[:i], group.metasByMinTime[i+1:]...) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if this work well.
- First of all it does not detect places where output block has to be used for next compaction, that results in unnecessary compaction without output block causing vertical compaction later on.
- This still does not detect case mentioned in issue which is overlapped blocks not being grouped into single compact, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Again this feature wasn't meant to deduce what can be used for next compactions. If thats desired, I'll have to put some thought into that.
- Im not sure what you mean by that. This line of code is to remove metas from the parent group that have already been planned into a new group. The case mentioned in the issue is that you are behind and have multiple windows of overlapping blocks that can be compacted concurrently.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will create a separate issue to handle predicting what blocks will be needed next and build on this work.
@@ -904,7 +1004,9 @@ func (c *BucketCompactor) Compact(ctx context.Context) (rerr error) { | |||
go func() { | |||
defer wg.Done() | |||
for g := range groupChan { | |||
c.grouper.StartGroup(g) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why we need that BTW? Do we ever plan/group and compact in the same time? 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason for this is compacting methods return shouldRerunGroup as true and when i first tested this it seemed that after one group completed the planner would see blocks that are currently being compacted and add groups for them. So start group keeps track of whats currently being compacted and skip those here /~https://github.com/thanos-io/thanos/pull/3807/files#diff-be21b0381ac14ee139355d057b390aa69ff338fe1c748ae8a475ce46f3583575R311
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After looking into this more, this change I think is still necessary. Say that compaction iterations are set to happen every 30 minutes but a large tenant has compaction runs that take more than 30 minutes. Yes you can increase the compaction frequency but this change would prevent a new iteration trying to compact blocks that are already being compacted.
40a4edd
to
8261f76
Compare
if groupConcurrencyArg == "--no-compact.enable-concurrent-group-compaction" { | ||
expectedOps = 478 | ||
} else { | ||
expectedOps = 480 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is the only metric that the new feature made a couple extra get calls.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks really great. I think the real important parts are handled for compaction. Since you already ran this on your dev cluster, do you have any insights in the increased usage of the compactor component? Hypothetically speaking, if the usage is x2, we should recommend people to adjust resources (and/or concurrency) accordingly. See this as an extra 'helper' by information.
Also small personal nitpicky question on the rand hash 👍
f9adce3
to
4d02880
Compare
@wiardvanrij In our case we saw increased cpu and disk usage because we only have 1 large tenant and it could only use 1 thread at a time. Enabling this feature allowed for utilizing multiple threads. If a user has multiple relatively small tenants I don't think they will notice a difference in usage and might not even need this feature. |
Signed-off-by: Andre Branchizio <andrejbranch@gmail.com>
4d02880
to
65a98dd
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this! This looks like something we definitely want but I found some potential blockers. I think it's best if we reorganize planner/grouper/compact to something that makes sense more. Essentially our planner has to plan things accordinglity. Unfortunately we cannot assume currently the Planner interface can give you properly planned compaction without full overview of what is currently being compacted. Details in comments. 🤗
Thanks for this work, I would love to collaborate on something that will work long term for both Cortex and thanos
|
||
if conf.enableConcurrentGroupCompaction { | ||
level.Info(logger).Log( | ||
"msg", "concurrent group compaction is enabled", "compact.enable-concurrent-group-compaction", fmt.Sprintf("%v", conf.enableConcurrentGroupCompaction), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need for parameter in log line - let's avoid duplication of information
@@ -206,6 +208,11 @@ type Grouper interface { | |||
// Groups returns the compaction groups for all blocks currently known to the syncer. | |||
// It creates all groups from the scratch on every call. | |||
Groups(blocks map[ulid.ULID]*metadata.Meta) (res []*Group, err error) | |||
// Starting a group lets the grouper know that the blocks in the groups are currently | |||
// being compacted and to ignore them in future plans | |||
StartGroup(group *Group) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well the grouper can still maintain link to those groups, since those are pointers.
It really depends what is easier to use and understand for customers of this interface. I feel like Getting groups and locking individual ones (literally using Lock
method) might be cleaner. We also get simpler easier interface. We also naturally will allow grouper to do what's needed.
StartGroup(group *Grouper)
might be bit confusing. It's easy to think that this method is starting compaction or something - but actually it's only locking.
@@ -206,6 +208,11 @@ type Grouper interface { | |||
// Groups returns the compaction groups for all blocks currently known to the syncer. | |||
// It creates all groups from the scratch on every call. | |||
Groups(blocks map[ulid.ULID]*metadata.Meta) (res []*Group, err error) | |||
// Starting a group lets the grouper know that the blocks in the groups are currently | |||
// being compacted and to ignore them in future plans | |||
StartGroup(group *Group) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WDYT?
} | ||
|
||
func (g *DefaultGrouper) CompleteGroup(group *Group) { | ||
if !g.enableConcurrentGroupCompaction { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think those are needed, if you have it disabled we can still lock, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably you want to make it more efficient, I think consistent flow might be superior here. WDYT?
// concurrentGroups returns the compaction groups for all blocks currently known to the syncer. This differs from | ||
// the default grouping in that a single group will be split into a group per available plan. | ||
func (g *DefaultGrouper) concurrentGroups(blocks map[ulid.ULID]*metadata.Meta) (res []*Group, err error) { | ||
groups := map[string]*Group{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again, I would reuse this flow for normal grouper too. We don't care about extra lock if not needed as it's a marginal inefficiency compared to e.g download time. The problem now is the non reused code that is "almost" the same. Let's reduce complexity if we can.
return res, nil | ||
} | ||
|
||
// Recursively plan a groups metas until there are no available plans and return a new group for each plan. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Recursively plan a groups metas until there are no available plans and return a new group for each plan. | |
// splitGroupByPlans iteratively plan a groups metas until there are no available plans and return a new group for each plan. |
|
||
// Recursively plan a groups metas until there are no available plans and return a new group for each plan. | ||
// This allows a single group key to utilize group concurrency when there are multiple plans available to compact. | ||
func (g *DefaultGrouper) splitGroupByPlans(group *Group) ([]*Group, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
func (g *DefaultGrouper) splitGroupByPlans(group *Group) ([]*Group, error) { | |
func (g *DefaultGrouper) splitGroupByPlans(group *Group) (group []*Group, err error) { |
This allows us to avoid velow var and err
if len(group.metasByMinTime) != 0 { | ||
toCompact, err = g.planner.Plan(context.Background(), group.metasByMinTime) | ||
} | ||
if err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's check error close to the invocation
if err != nil { | ||
return nil, errors.Wrap(err, "split group") | ||
} | ||
if len(groups) > 0 && len(toCompact) == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if there is nothing to compact. We never get out of this loop? 🤔
newMetasByMinTime = append(newMetasByMinTime, m) | ||
} | ||
} | ||
group.metasByMinTime = newMetasByMinTime |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, this is something I am the most worried about.
I think this algorithm might be quite ineffective in practice. The reason is that compaction usually depends on each other. I would need to double check but in my experience default compaction planner is pretty naive.
For example, if it sees let's say
[2d][8h][8h][2h][2h][2h][2h][2h]
First plan would say [2d][8h][8h] (let me compact those [2h][2h][2h][2h]) [2h]
If you take them out, compact will assume gap and will compact concurrently [2d][8h][8h] <2d gap> [2h]
into 2d block: [2d] ( let me compact those [8h][8h] <2d gap> ) [2h]
. This is because the compaction algorithm was designed for single group Prometheus one by one blocks.
Consequences for Cortex are lower: You just recompact multiple times unnecessarily. For Thanos for deployments that does not have vertical compaction enabled it will just fail compactor (overlapping blocks).
That's why I believe we have to look at a smarter planner that will give us the planned compactions that we can concurrently attack and will be able to give us feedback anytime (even if other compactions are running) on what to run.
Feel free to change the majority of the interface to accomplish this well with Cortex. We are happy in Thanos to default to the concurrent solution (with simply worker numbers = 1). So I wonder if the current grouper/planner/compaction fits here. Free to rearrange those 3 APIs from ground zero. It's better than just fiddling with something not fitting on existing not well prepared API for concurrency on this level.
Is this still relevant? If so, what is blocking it? Is there anything you can do to help move it forward? This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. |
Friendly ping @andrejbranch |
Changes
Verification
We have a dev cortex cluster using block storage that was 60 days behind, an ideal situation for testing this code. I was able to catch up 40 times faster as I ran this with group concurrency set to 40. Everything looks good so far.