Skip to content

Commit

Permalink
Merge pull request #21 from salesforce/error-handling
Browse files Browse the repository at this point in the history
  • Loading branch information
twarit-waikar authored Mar 25, 2022
2 parents 7442dff + b3e1692 commit f8c8b2c
Show file tree
Hide file tree
Showing 18 changed files with 310 additions and 160 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ This tool uses C++11 and thus it should work with much older GCC versions. We ha
./generate_cache.sh Debug
```
Replace `Debug` with `Release` for a speed-optimized binary. Debug will run marginally slower (considering the tool is mostly bottlenecked by network I/O) but will contain debug symbols and allows a better debugging experience while working with a debugger.
Replace `Debug` with `Release` or `RelWithDebInfo` or `MinSizeRel` for a differently optimized binary. Debug will run marginally slower (considering the tool is mostly bottlenecked by network I/O) but will contain debug symbols and allows a better debugging experience while working with a debugger.
By default tracing is disabled in p4-fusion. It can be enabled by including `p` in the second argument while generating the CMake cache. If tracing is enabled, p4-fusion generates trace JSON files in the cloning directory. These files can be opened in the `about:tracing` window in Chromium web browsers to view the tracing data.
Expand Down
63 changes: 42 additions & 21 deletions p4-fusion/commands/change_list.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,14 @@

#include "thread_pool.h"

ChangeList::ChangeList(const std::string& clNumber, const std::string& clDescription, const std::string& userID, const int64_t& clTimestamp)
: number(clNumber)
, user(userID)
, description(clDescription)
, timestamp(clTimestamp)
{
}

void ChangeList::PrepareDownload()
{
ChangeList& cl = *this;
Expand All @@ -25,7 +33,7 @@ void ChangeList::PrepareDownload()
std::unique_lock<std::mutex> lock((*(cl.canDownloadMutex)));
*cl.canDownload = true;
}
cl.canDownloadCV->notify_one();
cl.canDownloadCV->notify_all();
});
}

Expand Down Expand Up @@ -71,32 +79,41 @@ void ChangeList::StartDownload(const std::string& depotPath, const int& printBat
cl.commitCV->notify_all();
}

// Empty the batches
if (printBatchFiles->size() > printBatch || i == cl.changedFiles.size() - 1)
// Clear the batches if it fits
if (printBatchFiles->size() == printBatch)
{
if (printBatchFiles->empty())
{
continue;
}

ThreadPool::GetSingleton()->AddJob([&cl, printBatchFiles, printBatchFileData](P4API* p4)
{
const PrintResult& printData = p4->PrintFiles(*printBatchFiles);

for (int i = 0; i < printBatchFiles->size(); i++)
{
printBatchFileData->at(i)->contents = std::move(printData.GetPrintData().at(i).contents);
}

(*cl.filesDownloaded) += printBatchFiles->size();

cl.commitCV->notify_all();
});
cl.Flush(printBatchFiles, printBatchFileData);

// We let go of the refs held by us and create new ones to queue the next batch
printBatchFiles = std::make_shared<std::vector<std::string>>();
printBatchFileData = std::make_shared<std::vector<FileData*>>();
// Now only the thread job has access to the older batch
}
}

// Flush any remaining files that were smaller in number than the total batch size
if (!printBatchFiles->empty())
{
cl.Flush(printBatchFiles, printBatchFileData);
}
});
}

void ChangeList::Flush(std::shared_ptr<std::vector<std::string>> printBatchFiles, std::shared_ptr<std::vector<FileData*>> printBatchFileData)
{
// Share ownership of this batch with the thread job
ThreadPool::GetSingleton()->AddJob([this, printBatchFiles, printBatchFileData](P4API* p4)
{
const PrintResult& printData = p4->PrintFiles(*printBatchFiles);

for (int i = 0; i < printBatchFiles->size(); i++)
{
printBatchFileData->at(i)->contents = std::move(printData.GetPrintData().at(i).contents);
}

(*filesDownloaded) += printBatchFiles->size();

commitCV->notify_all();
});
}

Expand All @@ -113,7 +130,11 @@ void ChangeList::Clear()
user.clear();
description.clear();
changedFiles.clear();

filesDownloaded.reset();
canDownload.reset();
canDownloadMutex.reset();
canDownloadCV.reset();
commitMutex.reset();
commitCV.reset();
}
24 changes: 17 additions & 7 deletions p4-fusion/commands/change_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,27 @@ struct ChangeList
std::string number;
std::string user;
std::string description;
long long timestamp;
int64_t timestamp = 0;
std::vector<FileData> changedFiles;
std::shared_ptr<std::atomic<int>> filesDownloaded;
std::shared_ptr<std::atomic<bool>> canDownload;
std::shared_ptr<std::mutex> canDownloadMutex;
std::shared_ptr<std::condition_variable> canDownloadCV;
std::shared_ptr<std::mutex> commitMutex;
std::shared_ptr<std::condition_variable> commitCV;

std::shared_ptr<std::atomic<int>> filesDownloaded = std::make_shared<std::atomic<int>>(-1);
std::shared_ptr<std::atomic<bool>> canDownload = std::make_shared<std::atomic<bool>>(false);
std::shared_ptr<std::mutex> canDownloadMutex = std::make_shared<std::mutex>();
std::shared_ptr<std::condition_variable> canDownloadCV = std::make_shared<std::condition_variable>();
std::shared_ptr<std::mutex> commitMutex = std::make_shared<std::mutex>();
std::shared_ptr<std::condition_variable> commitCV = std::make_shared<std::condition_variable>();

ChangeList(const std::string& number, const std::string& description, const std::string& user, const int64_t& timestamp);

ChangeList(const ChangeList& other) = default;
ChangeList& operator=(const ChangeList&) = delete;
ChangeList(ChangeList&&) = default;
ChangeList& operator=(ChangeList&&) = default;
~ChangeList() = default;

void PrepareDownload();
void StartDownload(const std::string& depotPath, const int& printBatch, const bool includeBinaries);
void Flush(std::shared_ptr<std::vector<std::string>> printBatchFiles, std::shared_ptr<std::vector<FileData*>> printBatchFileData);
void WaitForDownload();
void Clear();
};
18 changes: 5 additions & 13 deletions p4-fusion/commands/changes_result.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,9 @@

void ChangesResult::OutputStat(StrDict* varList)
{
m_Changes.push_back({});
ChangeList& data = m_Changes.back();

data.number = varList->GetVar("change")->Text();
data.description = varList->GetVar("desc")->Text();
data.user = varList->GetVar("user")->Text();
data.timestamp = varList->GetVar("time")->Atoi64();
data.filesDownloaded = std::make_shared<std::atomic<int>>(-1);
data.canDownload = std::make_shared<std::atomic<bool>>(false);
data.canDownloadMutex = std::make_shared<std::mutex>();
data.canDownloadCV = std::make_shared<std::condition_variable>();
data.commitMutex = std::make_shared<std::mutex>();
data.commitCV = std::make_shared<std::condition_variable>();
m_Changes.emplace_back(
varList->GetVar("change")->Text(),
varList->GetVar("desc")->Text(),
varList->GetVar("user")->Text(),
varList->GetVar("time")->Atoi64());
}
18 changes: 7 additions & 11 deletions p4-fusion/commands/info_result.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,15 @@
* For full license text, see the LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause
*/
#include "info_result.h"
#include "utils/time_helpers.h"

InfoResult::InfoResult()
: m_TimezoneMinutes(0)
{
}

void InfoResult::OutputStat(StrDict* varList)
{
std::string serverDate = varList->GetVar("serverDate")->Text();

// ... serverDate 2021/09/06 04:49:28 -0700 PDT
// ^ ^
// 0 20
std::string timezone = serverDate.substr(20, 5);

int hours = std::stoi(timezone.substr(1, 2));
int minutes = std::stoi(timezone.substr(3, 2));
int sign = timezone[0] == '-' ? -1 : +1;

m_TimezoneMinutes = sign * (hours * 60 + minutes);
m_TimezoneMinutes = Time::GetTimezoneMinutes(serverDate);
}
2 changes: 2 additions & 0 deletions p4-fusion/commands/info_result.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ class InfoResult : public Result
int m_TimezoneMinutes;

public:
InfoResult();

int GetServerTimezoneMinutes() const { return m_TimezoneMinutes; };

void OutputStat(StrDict* varList) override;
Expand Down
13 changes: 11 additions & 2 deletions p4-fusion/commands/users_result.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,19 @@

void UsersResult::OutputStat(StrDict* varList)
{
UserID userID = varList->GetVar("User")->Text();
StrPtr* userIDPtr = varList->GetVar("User");
StrPtr* emailPtr = varList->GetVar("Email");

if (!userIDPtr || !emailPtr)
{
ERR("UserID or email not found for a Perforce user");
return;
}

UserID userID = userIDPtr->Text();
UserData userData;

userData.email = varList->GetVar("Email")->Text();
userData.email = emailPtr->Text();

StrPtr* fullNamePtr = varList->GetVar("FullName");
if (fullNamePtr)
Expand Down
35 changes: 18 additions & 17 deletions p4-fusion/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -181,39 +181,40 @@ int Main(int argc, char** argv)

PRINT("Creating " << networkThreads << " network threads");
ThreadPool::GetSingleton()->Initialize(networkThreads);
SUCCESS("Created " << ThreadPool::GetSingleton()->GetThreadCount() << " threads in thread pool");

int startupDownloadsCount = 0;
long long lastDownloadCL = -1;

// Go in the chronological order
for (size_t i = 0; i < changes.size(); i++)
size_t lastDownloadedCL = 0;
for (size_t currentCL = 0; currentCL < changes.size() && currentCL < lookAhead; currentCL++)
{
if (startupDownloadsCount == lookAhead)
{
break;
}
ChangeList& cl = changes.at(currentCL);

// Start running `p4 print` on changed files
ChangeList& cl = changes.at(i);
// Start gathering changed files with `p4 describe`
cl.PrepareDownload();

// Start running `p4 print` on changed files when the describe is finished
cl.StartDownload(depotPath, printBatch, includeBinaries);
startupDownloadsCount++;
lastDownloadCL = i;

lastDownloadedCL = currentCL;
}

SUCCESS("Queued first " << startupDownloadsCount << " CLs for downloading");
SUCCESS("Queued first " << startupDownloadsCount << " CLs up until CL " << changes.at(lastDownloadedCL).number << " for downloading");

int timezoneMinutes = p4.Info().GetServerTimezoneMinutes();
SUCCESS("Perforce server timezone is " << timezoneMinutes << " minutes");

// Map usernames to emails
const UsersResult& usersResult = p4.Users();
const std::unordered_map<UsersResult::UserID, UsersResult::UserData>& users = usersResult.GetUserEmails();

const std::unordered_map<UsersResult::UserID, UsersResult::UserData> users = std::move(p4.Users().GetUserEmails());
SUCCESS("Received userbase details from the Perforce server");

// Commit procedure start
Timer commitTimer;

PRINT("Last CL to start downloading is CL " << changes.at(lastDownloadedCL).number);

git.CreateIndex();
for (size_t i = 0; i < changes.size(); i++)
{
Expand Down Expand Up @@ -270,15 +271,15 @@ int Main(int argc, char** argv)

SUCCESS(
"CL " << cl.number << " --> Commit " << commitSHA
<< " with " << cl.changedFiles.size() << " files (" << i << "/" << changes.size() << "|" << lastDownloadCL - (long long)i << "). "
<< " with " << cl.changedFiles.size() << " files (" << i + 1 << "/" << changes.size() << "|" << lastDownloadedCL - (long long)i << "). "
<< "Elapsed " << commitTimer.GetTimeS() / 60.0f << " mins. "
<< ((commitTimer.GetTimeS() / 60.0f) / (float)(i + 1)) * (changes.size() - i - 1) << " mins left.");

// Start downloading the CL chronologically after the last CL that was previously downloaded, if there's still some left
if (lastDownloadCL + 1 < changes.size())
if (lastDownloadedCL + 1 < changes.size())
{
lastDownloadCL++;
ChangeList& downloadCL = changes.at(lastDownloadCL);
lastDownloadedCL++;
ChangeList& downloadCL = changes.at(lastDownloadedCL);
downloadCL.PrepareDownload();
downloadCL.StartDownload(depotPath, printBatch, includeBinaries);
}
Expand Down
23 changes: 14 additions & 9 deletions p4-fusion/p4_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,20 @@ std::string P4API::P4USER;
std::string P4API::P4CLIENT;
int P4API::CommandRetries = 1;
int P4API::CommandRefreshThreshold = 1;
std::mutex P4API::ReinitializationMutex;
std::mutex P4API::InitializationMutex;

P4API::P4API()
{
if (!Initialize())
{
ERR("Could not initialize P4API");
return;
// Helix Core C++ API seems to crash while making connections parallely.
// Although, this function is not currently accessed in parallel, it can
// be during retries.
std::unique_lock<std::mutex> lock(InitializationMutex);
if (!Initialize())
{
ERR("Could not initialize P4API");
return;
}
}

AddClientSpecView(ClientSpec.mapping);
Expand All @@ -43,6 +49,7 @@ bool P4API::Initialize()
m_ClientAPI.SetClient(P4CLIENT.c_str());
m_ClientAPI.SetProtocol("tag", "");
m_ClientAPI.Init(&e);

if (!CheckErrors(e, msg))
{
ERR("Could not initialize Helix Core C/C++ API");
Expand All @@ -54,23 +61,21 @@ bool P4API::Initialize()

bool P4API::Deinitialize()
{
std::unique_lock<std::mutex> lock(InitializationMutex);

Error e;
StrBuf msg;

m_ClientAPI.Final(&e);
CheckErrors(e, msg);

return true;
}

bool P4API::Reinitialize()
{
MTR_SCOPE("P4", __func__);

// Helix Core C++ API seems to crash while making connections parallely.
// The Initialize() function is immune to this because it is never run in
// parrallel, while Reinitialize() function can get in a situation where it is
// called in parallel.
std::unique_lock<std::mutex> lock(ReinitializationMutex);
bool status = Deinitialize() && Initialize();
return status;
}
Expand Down
7 changes: 4 additions & 3 deletions p4-fusion/p4_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

class P4API
{

ClientApi m_ClientAPI;
MapApi m_ClientMapping;
int m_Usage;
Expand All @@ -44,7 +43,9 @@ class P4API
static ClientResult::ClientSpecData ClientSpec;
static int CommandRetries;
static int CommandRefreshThreshold;
static std::mutex ReinitializationMutex;

// Helix Core C++ API seems to crash while making connections parallely.
static std::mutex InitializationMutex;

static bool InitializeLibraries();
static bool ShutdownLibraries();
Expand Down Expand Up @@ -73,7 +74,7 @@ class P4API
Result Sync(const std::string& path);
SyncResult GetFilesToSyncAtCL(const std::string& path, const std::string& cl);
PrintResult PrintFile(const std::string& filePathRevision);
PrintResult PrintFiles(const std::vector<std::string>& files);
PrintResult PrintFiles(const std::vector<std::string>& fileRevisions);
void UpdateClientSpec();
ClientResult Client();
UsersResult Users();
Expand Down
2 changes: 0 additions & 2 deletions p4-fusion/thread_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,6 @@ void ThreadPool::Initialize(int size)
}
}));
}

SUCCESS("Created " << size << " threads in thread pool");
}

ThreadPool::~ThreadPool()
Expand Down
Loading

0 comments on commit f8c8b2c

Please sign in to comment.