-
Notifications
You must be signed in to change notification settings - Fork 5.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Execute the program with multi threads #6223
Conversation
doc/design/refactor/multi_device.md
Outdated
the `MultiCPUExecutor` will call `block.clone()` and `scope.clone()` to make | ||
a list of blocks and scopes, the size equals the thread number, and then execute | ||
the graph in each thread. | ||
1. Collect the gradients and update the parameters |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In multi-threads, we need only one copy parameter in CPU memory. It's different from multi-GPUs, where each GPU will hold parameter in its own GPU memory.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it, there would be one parameters and it could be on the global scope, for anther, the gradients for each thread is different, so maybe we also need to create the gradients on multi scope.
doc/design/refactor/multi_device.md
Outdated
MULTIPLE_NODE = 3 | ||
}; | ||
|
||
class ExecutionPlan { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These definitions should be in #6078
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, delete this description.
doc/design/refactor/multi_device.md
Outdated
|
||
For the data parallelism, we need to pass the attribution `start` and `end` | ||
index for the mini-batch, and this will be calculated in the optimizer step. | ||
1. `multiCPUExecutor` execute the ExecutionPlan which the type is Multi CPU |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need a multiCPUExecutor
, just check trainer_count
in the current Executor
and do the copy if trainer_count > 1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same with @typhoonzero . I think that we do not need a MultiCPUExecutor
, since every thread can visit the same memory, I can not see any benefit to split thread execution into separated scope/block.
And how should it be when the math library has built-in openmp. Should we classified it as single thread executor
or MultiCPUExecutor
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As the comment: #6223 (comment), for my understand, each thread will calculate the gradient independently, and the graident has the same variable name, so maybe they should be create on different scope, or maybe my understanding has something wrong?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Yancey1989 you are right, scopes can have child scopes to resolve this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Definitely, at least, there should be different gradients variables(not only different name). Scope is just grouping them in somewhere -- for example, created different gradients variables with specific name prefix in global scope is a visible solution. Create a new Scope
or not isn't the key issue to this concept.
I believe that our Executor
should have the ability to run in multithread enviroment. We do not need the concept of MultiCPUExecutor
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And how should it be when the math library has built-in openmp
I think it's independent with the data parallelism, users could set the thread count of openmp or the threads of executor by themselves.
Create a new Scope or not isn't the key issue to this concept
Got it, and if we need the multiple variable with different prefix, The idea of @helinwang is nice, #6223 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe that our Executor should have the ability to run in multithread enviroment. We do not need the concept of MultiCPUExecutor.
It's a good idea, we can pass a parameter thread_count
to Executor
, and the Executor
will execute the ExecutionPlan with multi threads. For another, I think we also need to insert some sync Op while Split X
and Reduce dW
.
Thanks for the PR, graph is well done! Several thoughts (CC: @typhoonzero @dzhwinter @QiJune @reyoung ):
Other details: Given that ExecutionPlan should know about how many thread is running it, there are many places in this PR describing what different thread should do, we probably don't need them. Here is what I envision (only drew the forward pass to save space): In the above image, the program at left is converted to the program at right for data parallelism (2x parallelism). X0, X1, Y0, Y1 are all generated variable names, they are not named with X and Y because in one The converted program is still a single program, it does not know how many threads run them. The converted program can better utilize two threads, but we can use the Executor of any number of threads to run it. |
We need a way to sync threads, maybe using "Channels" is a good idea. Like when we need to merge updated weights, each thread can push a signal message to a channel, the |
Thanks @helinwang , one ProgramDesc is a good idea! And aggree with 1 and 2.
I'm confusion about this point, if the Executor does not care the
The
|
Sorry maybe I did not explain clearly. There will only be one executor implementation, and how many thread it runs can be configured. There will be one executor for each trainer, so executor should not care trainer_count. |
There will be only one executor, the executor can be configured to run multiple threads. each OP will be scheduled to run only after all OPs it depends on is finished. Yes, to make sure "all OPs it depends on is finished", we need proper synchronization. |
doc/design/refactor/multi_device.md
Outdated
@@ -0,0 +1,49 @@ | |||
# Design Doc: Execute the Program with Multi Thread | |||
|
|||
In PaddlePaddle, the user could declare **wich** operators will be |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wich -> which
doc/design/refactor/multi_device.md
Outdated
with parallel.do(thread_count=N): | ||
y_predict = fluid.fc(input=x, size=1) | ||
cost = fluid.layers.square_error_cost(input=y_predict, label=y) | ||
avg_cost = fluid.layers.mean(x=cost) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand you have put a lot of time thinking about the API, but as a user looking at the API I have several questions:
x
is split byfluid.split(x, N)
, how abouty_predict
,y
andavg_cost
, do the user need to split it?- If the
avg_cost
is split intoN
, can the user setthread_count
toM
in:with parallel.do(thread_count=M): sgd_optimizer.minimize(avg_cost)
- why should the user call
fluid.merge_grad_avg(x, N)
, what will happen if the user forget to call it, and how can the user debug this problem?
From the above questions, I think the Python API for parallel.Do
is very hard to design and hard to understand for the user. I briefly talked with @wangkuiyi last week, and we agreed that the Python API is lower priority comparing to the auto transpiler.
So maybe we should postpone the Python API design for parallel.Do
for now (maybe focus more on transpiler in this PR?), and focus on the automatic program transpiler. In this way the user don't even need to worry about the Python API for parallel.Do
. If there are the power user who want to use the Python API (I highly doubt it), we can add the Python API at that time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why should the user call fluid.merge_grad_avg(x, N), what will happen if the user forget to call it, and how can the user debug this problem?
Sorry, x
should be w
, this will merge the dW comes from all parallel threads(each thread will run a block on a scope).
And I think in data parallelism, use should specify(or average_gradients by default?) how to merge the gradients, this is the same with multi GPU.
So maybe we should postpone the Python API design for parallel.Do for now (maybe focus more on transpiler in this PR?),
Got it, and will do that, maybe we can discuss the Python API in #6394 .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
doc/design/refactor/multi_device.md
Outdated
|
||
## Operator Kernel | ||
|
||
- We need a global threadpool, and initialize the threadpool when the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the OP could create the Executor (e.g., while OP), the Executor creation needs to be very lightweight. So perhaps the thread pool initialization should relate with Executor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean initialize the ThreadPool in Parallel Op? If that I agree with you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think ThreadPool will be a singleton (only one instance per process), so it could be initialized in the beginning when the application starts, or lazily when it gets used the first time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, lazily is great, :)
doc/design/refactor/multi_device.md
Outdated
|
||
- We need a global threadpool, and initialize the threadpool when the | ||
Executor run the Parallel Op for the first time. | ||
- The Op kernel will create an Executor instance, and send the blocks which number |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By "Op kernel" do you mean the parallel.Do
op? If so, I think it should just create N (N=thread_count
) executors, and run them (rather than create one executor and letting it know N). How the synchronization is done depends on the answer to this question: #6394 (comment)
Sorry I changed my mind about "there should only be a single instance of Executor", since we decided to separate Executor and thread pool in the last meeting. I think we can have as many executor as we want, and a single thread pool instance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will follow #6394 (comment).
I think we can have as many executor as we want, and a single thread pool instance.
Agree with that.
Update: auto transpiler the user-defined graph to multi-thread graph. |
doc/design/refactor/multi_device.md
Outdated
Op graph to a multi-thread Op graph, and run `ParallelDo` Op to run the | ||
multi-thread graph. | ||
|
||
## Graph Converter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should rename to a Transpiler
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
doc/design/refactor/multi_device.md
Outdated
|
||
- `Multi-Thread Transpiler` will convert the graph to a multi-threads graph | ||
which would be executed with multi-threads. | ||
- `BlockingCounter` will `Init/Decrement` a condition variable, and Blocking `Wait` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Name to ConditionVariable
should be fine. ConditionVariable
can store any type of "condition", not only counter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the ConditionVariable
is a great idea, how about adding this type in Variable in fluid? And it also needs some Op like condition_varaible.wait()/notify_all()/notify_one()
. How about adding a design doc for Condition Variable
firstly?
doc/design/refactor/multi_device.md
Outdated
- Use a list of block id as the input, and create multi Executor to run | ||
these Blocks with multi-threads. | ||
- Initialize a `BlockingCounter` instance and wait until all threads are done. | ||
- `Split` Operator will split the Input Tensor into N slices. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can split and merge reuse current operators like split_op and mean_op to merge?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we need to do something enhancement such as how to split a LodTensor, and Merge maybe including mean and other mathematical methods such as sum/max ..., but you're right, mean_op/split_op
would be reused.
doc/design/refactor/multi_device.md
Outdated
|
||
After converted: | ||
|
||
<img src="src/multi-threads/multi-threads@3x.png" width="1000"> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this figure also show the scopes these blocks use? And when how to ensure thread-safety when doing the merge?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the Merge
Op should waiting for all forward/backward Ops completed.
doc/design/refactor/multi_device.md
Outdated
- `BlockingCounter` will `Init/Decrement` a condition variable, and Blocking `Wait` | ||
for the condition variable become `0`: | ||
```cpp | ||
BlockingCounter bc(thread_count); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it same with WaitGroup
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, they are the same, WaitGroup is here
doc/design/refactor/multi_device.md
Outdated
|
||
<img src="src/multi-threads/multi-threads@3x.png" width="1000"> | ||
|
||
## Implement |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My question is same with today's meeting. How about we have two kinds of thread? say aggregating thread and computing thread? Then we can use the thread mutex and conditional variable without design them in our op level.
Another question if we keep the thread same as your post, then which thread should run the merge op(Block 0)? If the optimization step is very heavy, does the computing gradient thread should wait for it?
Third question, some LBFGS
optimize algorithm need more than one pass, how should we distribute the computing thread and optimizing thread?
LBFGS = > http://pytorch.org/docs/master/optim.html#optimizer-step-closure
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My question is same with today's meeting. How about we have two kinds of thread? say aggregating thread and computing thread? Then we can use the thread mutex and conditional variable without design them in our op level
I think aggregating thread and computing thread
is a great idea, but I'm not sure it will improve performance, because we need to know when the dW
from all threads were been calculated, this also need more context switch, and fewer threads will execute the computing, how about adding this optimize to TODO?
Another question if we keep the thread same as your post, then which thread should run the merge op(Block 0)? If the optimization step is very heavy, does the computing gradient thread should wait for it?
I think we can improve the performance by executing the optimizer with multi-threads, such as distribute multi parameters W0, W1, dW0, dW1 ...
to the different thread and execute the optimizer in the different thread. And I will add TODO named Execute the optimizer with multi-threads.
Third question, some LBFGS optimize algorithm need more than one pass, how should we distribute the computing thread and optimizing thread?
The same as above, maybe we can improve the performance for future, but maybe we could execute the optimizer with a single thread.
} | ||
bc.Wait(); | ||
``` | ||
- `ParallelDo` Operator |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In /~https://github.com/PaddlePaddle/Paddle/pull/6394/files#r157330955, design for the future that we should have an API for parallel synchronization. It seems that Go's preemptive is similar to CSP
, but deep learning communications are more suitable to BSP
which needs a Barrier
.
But, if we are considering developing a more "general programming language", like enabling users to define data streams, using CSP
is better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a good topic, I think the key point is that we need to make a choice from
- general programming language: describe a program with fluid API.
- model language: describe a model (data stream) with fluid API.
And we could continue the discussion at /~https://github.com/PaddlePaddle/Paddle/pull/6394/files#r157330955
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Multi-CPU design given is relevant for parallel.for
. We have some discussions about optimizer design. I believe giving a fast implementation will lead us to a better result.
Fixed #6209