Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Execute the program with multi threads #6223

Merged
merged 9 commits into from
Dec 20, 2017

Conversation

Yancey1989
Copy link
Contributor

Fixed #6209

the `MultiCPUExecutor` will call `block.clone()` and `scope.clone()` to make
a list of blocks and scopes, the size equals the thread number, and then execute
the graph in each thread.
1. Collect the gradients and update the parameters
Copy link
Member

@QiJune QiJune Dec 4, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In multi-threads, we need only one copy parameter in CPU memory. It's different from multi-GPUs, where each GPU will hold parameter in its own GPU memory.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, there would be one parameters and it could be on the global scope, for anther, the gradients for each thread is different, so maybe we also need to create the gradients on multi scope.

MULTIPLE_NODE = 3
};

class ExecutionPlan {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These definitions should be in #6078

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, delete this description.


For the data parallelism, we need to pass the attribution `start` and `end`
index for the mini-batch, and this will be calculated in the optimizer step.
1. `multiCPUExecutor` execute the ExecutionPlan which the type is Multi CPU
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need a multiCPUExecutor, just check trainer_count in the current Executor and do the copy if trainer_count > 1

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same with @typhoonzero . I think that we do not need a MultiCPUExecutor, since every thread can visit the same memory, I can not see any benefit to split thread execution into separated scope/block.
And how should it be when the math library has built-in openmp. Should we classified it as single thread executor or MultiCPUExecutor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As the comment: #6223 (comment), for my understand, each thread will calculate the gradient independently, and the graident has the same variable name, so maybe they should be create on different scope, or maybe my understanding has something wrong?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Yancey1989 you are right, scopes can have child scopes to resolve this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely, at least, there should be different gradients variables(not only different name). Scope is just grouping them in somewhere -- for example, created different gradients variables with specific name prefix in global scope is a visible solution. Create a new Scope or not isn't the key issue to this concept.

I believe that our Executor should have the ability to run in multithread enviroment. We do not need the concept of MultiCPUExecutor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dzhwinter

And how should it be when the math library has built-in openmp

I think it's independent with the data parallelism, users could set the thread count of openmp or the threads of executor by themselves.

Create a new Scope or not isn't the key issue to this concept

Got it, and if we need the multiple variable with different prefix, The idea of @helinwang is nice, #6223 (comment)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dzhwinter

I believe that our Executor should have the ability to run in multithread enviroment. We do not need the concept of MultiCPUExecutor.

It's a good idea, we can pass a parameter thread_count to Executor, and the Executor will execute the ExecutionPlan with multi threads. For another, I think we also need to insert some sync Op while Split X and Reduce dW.

@helinwang
Copy link
Contributor

helinwang commented Dec 4, 2017

Thanks for the PR, graph is well done!

Several thoughts (CC: @typhoonzero @dzhwinter @QiJune @reyoung ):

  1. ExecutionPlan should never care about how many thread is running it. How many thread is a property of Executor. The Executor should be able to distribute the computation inside ExecutionPlan to different threads automatically.
  2. We should only have one Executor implementation (e.g., no multiCPUExecutor) which can run any ExecutionPlan.
  3. The Executor should know anything about different roles (e.g., is it running as a trainer or a perserver), thus Executor should not know about trainer_count.

Other details:

Given that ExecutionPlan should know about how many thread is running it, there are many places in this PR describing what different thread should do, we probably don't need them.

Here is what I envision (only drew the forward pass to save space):

screen shot 2017-12-04 at 1 17 23 pm

In the above image, the program at left is converted to the program at right for data parallelism (2x parallelism). X0, X1, Y0, Y1 are all generated variable names, they are not named with X and Y because in one ProgramDesc, we could not have multiple variables with the same name.

The converted program is still a single program, it does not know how many threads run them. The converted program can better utilize two threads, but we can use the Executor of any number of threads to run it.

@typhoonzero
Copy link
Contributor

We need a way to sync threads, maybe using "Channels" is a good idea. Like when we need to merge updated weights, each thread can push a signal message to a channel, the Executor is trying to get n signal for each mini-batch.

@Yancey1989
Copy link
Contributor Author

Yancey1989 commented Dec 5, 2017

Thanks @helinwang , one ProgramDesc is a good idea! And aggree with 1 and 2.
For the 3 point:

The Executor should know anything about different roles (e.g., is it running as a trainer or a perserver), thus Executor should not know about trainer_count.

I'm confusion about this point, if the Executor does not care the trainer_count, you mean the Executor just execute the graph, and there is another MultiThreadExecutor(or other name) create multi threads and call the Executor?

@typhoonzero

We need a way to sync threads, maybe using "Channels" is a good idea. Like when we need to merge updated weights, each thread can push a signal message to a channel, the Executor is trying to get n signal for each mini-batch.

The Channels is nice, and how about we implement the sync threads with some operators, and we can create the condition variable. I think the benefits are:

  1. We can describe the sync threads on the graph.
  2. The Executor does need to care the opportunity of do the sync, it defined by the graph.

@helinwang
Copy link
Contributor

helinwang commented Dec 5, 2017

@Yancey1989

I'm confusion about this point, if the Executor does not care the trainer_count, you mean the Executor just execute the graph, and there is another MultiThreadExecutor(or other name) create multi threads and call the Executor?

Sorry maybe I did not explain clearly. There will only be one executor implementation, and how many thread it runs can be configured. There will be one executor for each trainer, so executor should not care trainer_count.

@helinwang
Copy link
Contributor

helinwang commented Dec 5, 2017

@typhoonzero

We need a way to sync threads, maybe using "Channels" is a good idea. Like when we need to merge updated weights, each thread can push a signal message to a channel, the Executor is trying to get n signal for each mini-batch.

There will be only one executor, the executor can be configured to run multiple threads. each OP will be scheduled to run only after all OPs it depends on is finished. Yes, to make sure "all OPs it depends on is finished", we need proper synchronization.

@Yancey1989 Yancey1989 changed the title Add multi cpu design Execute the program with multi threads Dec 7, 2017
@@ -0,0 +1,49 @@
# Design Doc: Execute the Program with Multi Thread

In PaddlePaddle, the user could declare **wich** operators will be
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wich -> which

with parallel.do(thread_count=N):
y_predict = fluid.fc(input=x, size=1)
cost = fluid.layers.square_error_cost(input=y_predict, label=y)
avg_cost = fluid.layers.mean(x=cost)
Copy link
Contributor

@helinwang helinwang Dec 11, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand you have put a lot of time thinking about the API, but as a user looking at the API I have several questions:

  • x is split by fluid.split(x, N), how about y_predict, y and avg_cost, do the user need to split it?
  • If the avg_cost is split into N, can the user set thread_count to M in:
    with parallel.do(thread_count=M):
      sgd_optimizer.minimize(avg_cost)
    
  • why should the user call fluid.merge_grad_avg(x, N), what will happen if the user forget to call it, and how can the user debug this problem?

From the above questions, I think the Python API for parallel.Do is very hard to design and hard to understand for the user. I briefly talked with @wangkuiyi last week, and we agreed that the Python API is lower priority comparing to the auto transpiler.

So maybe we should postpone the Python API design for parallel.Do for now (maybe focus more on transpiler in this PR?), and focus on the automatic program transpiler. In this way the user don't even need to worry about the Python API for parallel.Do. If there are the power user who want to use the Python API (I highly doubt it), we can add the Python API at that time.

Copy link
Contributor Author

@Yancey1989 Yancey1989 Dec 12, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why should the user call fluid.merge_grad_avg(x, N), what will happen if the user forget to call it, and how can the user debug this problem?

Sorry, x should be w, this will merge the dW comes from all parallel threads(each thread will run a block on a scope).
And I think in data parallelism, use should specify(or average_gradients by default?) how to merge the gradients, this is the same with multi GPU.

So maybe we should postpone the Python API design for parallel.Do for now (maybe focus more on transpiler in this PR?),

Got it, and will do that, maybe we can discuss the Python API in #6394 .

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!


## Operator Kernel

- We need a global threadpool, and initialize the threadpool when the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the OP could create the Executor (e.g., while OP), the Executor creation needs to be very lightweight. So perhaps the thread pool initialization should relate with Executor.

Copy link
Contributor Author

@Yancey1989 Yancey1989 Dec 12, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean initialize the ThreadPool in Parallel Op? If that I agree with you.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think ThreadPool will be a singleton (only one instance per process), so it could be initialized in the beginning when the application starts, or lazily when it gets used the first time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, lazily is great, :)


- We need a global threadpool, and initialize the threadpool when the
Executor run the Parallel Op for the first time.
- The Op kernel will create an Executor instance, and send the blocks which number
Copy link
Contributor

@helinwang helinwang Dec 11, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By "Op kernel" do you mean the parallel.Do op? If so, I think it should just create N (N=thread_count) executors, and run them (rather than create one executor and letting it know N). How the synchronization is done depends on the answer to this question: #6394 (comment)

Sorry I changed my mind about "there should only be a single instance of Executor", since we decided to separate Executor and thread pool in the last meeting. I think we can have as many executor as we want, and a single thread pool instance.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will follow #6394 (comment).

I think we can have as many executor as we want, and a single thread pool instance.

Agree with that.

@Yancey1989
Copy link
Contributor Author

Update: auto transpiler the user-defined graph to multi-thread graph.

Op graph to a multi-thread Op graph, and run `ParallelDo` Op to run the
multi-thread graph.

## Graph Converter
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should rename to a Transpiler

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


- `Multi-Thread Transpiler` will convert the graph to a multi-threads graph
which would be executed with multi-threads.
- `BlockingCounter` will `Init/Decrement` a condition variable, and Blocking `Wait`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Name to ConditionVariable should be fine. ConditionVariable can store any type of "condition", not only counter.

Copy link
Contributor Author

@Yancey1989 Yancey1989 Dec 18, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the ConditionVariable is a great idea, how about adding this type in Variable in fluid? And it also needs some Op like condition_varaible.wait()/notify_all()/notify_one(). How about adding a design doc for Condition Variable firstly?

- Use a list of block id as the input, and create multi Executor to run
these Blocks with multi-threads.
- Initialize a `BlockingCounter` instance and wait until all threads are done.
- `Split` Operator will split the Input Tensor into N slices.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can split and merge reuse current operators like split_op and mean_op to merge?

Copy link
Contributor Author

@Yancey1989 Yancey1989 Dec 18, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we need to do something enhancement such as how to split a LodTensor, and Merge maybe including mean and other mathematical methods such as sum/max ..., but you're right, mean_op/split_op would be reused.


After converted:

<img src="src/multi-threads/multi-threads@3x.png" width="1000">
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this figure also show the scopes these blocks use? And when how to ensure thread-safety when doing the merge?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the Merge Op should waiting for all forward/backward Ops completed.

- `BlockingCounter` will `Init/Decrement` a condition variable, and Blocking `Wait`
for the condition variable become `0`:
```cpp
BlockingCounter bc(thread_count);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it same with WaitGroup?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, they are the same, WaitGroup is here


<img src="src/multi-threads/multi-threads@3x.png" width="1000">

## Implement
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My question is same with today's meeting. How about we have two kinds of thread? say aggregating thread and computing thread? Then we can use the thread mutex and conditional variable without design them in our op level.

Another question if we keep the thread same as your post, then which thread should run the merge op(Block 0)? If the optimization step is very heavy, does the computing gradient thread should wait for it?

Third question, some LBFGS optimize algorithm need more than one pass, how should we distribute the computing thread and optimizing thread?
LBFGS = > http://pytorch.org/docs/master/optim.html#optimizer-step-closure

Copy link
Contributor Author

@Yancey1989 Yancey1989 Dec 18, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My question is same with today's meeting. How about we have two kinds of thread? say aggregating thread and computing thread? Then we can use the thread mutex and conditional variable without design them in our op level

I think aggregating thread and computing thread is a great idea, but I'm not sure it will improve performance, because we need to know when the dW from all threads were been calculated, this also need more context switch, and fewer threads will execute the computing, how about adding this optimize to TODO?

Another question if we keep the thread same as your post, then which thread should run the merge op(Block 0)? If the optimization step is very heavy, does the computing gradient thread should wait for it?

I think we can improve the performance by executing the optimizer with multi-threads, such as distribute multi parameters W0, W1, dW0, dW1 ... to the different thread and execute the optimizer in the different thread. And I will add TODO named Execute the optimizer with multi-threads.

Third question, some LBFGS optimize algorithm need more than one pass, how should we distribute the computing thread and optimizing thread?

The same as above, maybe we can improve the performance for future, but maybe we could execute the optimizer with a single thread.

}
bc.Wait();
```
- `ParallelDo` Operator
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In /~https://github.com/PaddlePaddle/Paddle/pull/6394/files#r157330955, design for the future that we should have an API for parallel synchronization. It seems that Go's preemptive is similar to CSP, but deep learning communications are more suitable to BSP which needs a Barrier.

But, if we are considering developing a more "general programming language", like enabling users to define data streams, using CSP is better.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a good topic, I think the key point is that we need to make a choice from

  1. general programming language: describe a program with fluid API.
  2. model language: describe a model (data stream) with fluid API.

And we could continue the discussion at /~https://github.com/PaddlePaddle/Paddle/pull/6394/files#r157330955

Copy link
Contributor

@dzhwinter dzhwinter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Multi-CPU design given is relevant for parallel.for. We have some discussions about optimizer design. I believe giving a fast implementation will lead us to a better result.

@Yancey1989 Yancey1989 merged commit 2d5ec16 into PaddlePaddle:develop Dec 20, 2017
@Yancey1989 Yancey1989 deleted the multi_cpu_design branch December 20, 2017 05:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants