Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Notify client when Server's send buffer limit has been reached #201

Merged
merged 3 commits into from
Mar 27, 2023
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 58 additions & 53 deletions foxglove_bridge_base/include/foxglove_bridge/websocket_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,16 +82,16 @@ enum class StatusLevel : uint8_t {
Error = 2,
};

constexpr const char* StatusLevelToString(StatusLevel level) {
constexpr websocketpp::log::level StatusLevelToLogLevel(StatusLevel level) {
switch (level) {
case StatusLevel::Info:
return "INFO";
return APP;
case StatusLevel::Warning:
return "WARN";
return WARNING;
case StatusLevel::Error:
return "ERROR";
return RECOVERABLE;
default:
return "UNKNOWN";
return RECOVERABLE;
}
}

Expand Down Expand Up @@ -191,7 +191,8 @@ class Server final : public ServerInterface<ConnHandle> {
void sendJson(ConnHandle hdl, json&& payload);
void sendJsonRaw(ConnHandle hdl, const std::string& payload);
void sendBinary(ConnHandle hdl, const uint8_t* payload, size_t payloadSize);
void sendStatus(ConnHandle clientHandle, const StatusLevel level, const std::string& message);
void sendStatusAndLogMsg(ConnHandle clientHandle, const StatusLevel level,
const std::string& message);
void unsubscribeParamsWithoutSubscriptions(ConnHandle hdl,
const std::unordered_set<std::string>& paramNames);
bool isParameterSubscribed(const std::string& paramName) const;
Expand Down Expand Up @@ -523,13 +524,15 @@ inline void Server<ServerConfiguration>::sendBinary(ConnHandle hdl, const uint8_
}

template <typename ServerConfiguration>
inline void Server<ServerConfiguration>::sendStatus(ConnHandle clientHandle,
const StatusLevel level,
const std::string& message) {
inline void Server<ServerConfiguration>::sendStatusAndLogMsg(ConnHandle clientHandle,
const StatusLevel level,
const std::string& message) {
const std::string endpoint = remoteEndpointString(clientHandle);
const std::string logMessage =
"sendStatus(" + endpoint + ", " + StatusLevelToString(level) + ", " + message + ")";
_server.get_elog().write(RECOVERABLE, logMessage);
const std::string logMessage = endpoint + ": " + message;
const auto logLevel = StatusLevelToLogLevel(level);
auto logger = level == StatusLevel::Error ? _server.get_elog() : _server.get_alog();
logger.write(logLevel, logMessage);

sendJson(clientHandle, json{
{"op", "status"},
{"level", static_cast<uint8_t>(level)},
Expand All @@ -554,7 +557,8 @@ inline void Server<ServerConfiguration>::handleMessage(ConnHandle hdl, MessagePt
break;
}
} catch (std::exception const& ex) {
sendStatus(hdl, StatusLevel::Error, std::string{"Error parsing message: "} + ex.what());
sendStatusAndLogMsg(hdl, StatusLevel::Error,
std::string{"Error parsing message: "} + ex.what());
}
}

Expand All @@ -566,9 +570,9 @@ inline void Server<ServerConfiguration>::handleTextMessage(ConnHandle hdl, const
const auto requiredCapabilityIt = CAPABILITY_BY_CLIENT_OPERATION.find(op);
if (requiredCapabilityIt != CAPABILITY_BY_CLIENT_OPERATION.end() &&
!hasCapability(requiredCapabilityIt->second)) {
sendStatus(hdl, StatusLevel::Error,
"Operation '" + op + "' not supported as server capability '" +
requiredCapabilityIt->second + "' is missing");
sendStatusAndLogMsg(hdl, StatusLevel::Error,
"Operation '" + op + "' not supported as server capability '" +
requiredCapabilityIt->second + "' is missing");
return;
}

Expand Down Expand Up @@ -599,14 +603,14 @@ inline void Server<ServerConfiguration>::handleTextMessage(ConnHandle hdl, const
SubscriptionId subId = sub.at("id");
ChannelId channelId = sub.at("channelId");
if (findSubscriptionBySubId(subId) != clientInfo.subscriptionsByChannel.end()) {
sendStatus(hdl, StatusLevel::Error,
"Client subscription id " + std::to_string(subId) +
" was already used; ignoring subscription");
sendStatusAndLogMsg(hdl, StatusLevel::Error,
"Client subscription id " + std::to_string(subId) +
" was already used; ignoring subscription");
continue;
}
const auto& channelIt = _channels.find(channelId);
if (channelIt == _channels.end()) {
sendStatus(
sendStatusAndLogMsg(
hdl, StatusLevel::Warning,
"Channel " + std::to_string(channelId) + " is not available; ignoring subscription");
continue;
Expand All @@ -622,9 +626,9 @@ inline void Server<ServerConfiguration>::handleTextMessage(ConnHandle hdl, const
SubscriptionId subId = subIdJson;
const auto& sub = findSubscriptionBySubId(subId);
if (sub == clientInfo.subscriptionsByChannel.end()) {
sendStatus(hdl, StatusLevel::Warning,
"Client subscription id " + std::to_string(subId) +
" did not exist; ignoring unsubscription");
sendStatusAndLogMsg(hdl, StatusLevel::Warning,
"Client subscription id " + std::to_string(subId) +
" did not exist; ignoring unsubscription");
continue;
}
ChannelId chanId = sub->first;
Expand All @@ -644,16 +648,16 @@ inline void Server<ServerConfiguration>::handleTextMessage(ConnHandle hdl, const
for (const auto& chan : payload.at("channels")) {
ClientChannelId channelId = chan.at("id");
if (!isFirstPublication && clientPublications.find(channelId) != clientPublications.end()) {
sendStatus(hdl, StatusLevel::Error,
"Channel " + std::to_string(channelId) + " was already advertised");
sendStatusAndLogMsg(hdl, StatusLevel::Error,
"Channel " + std::to_string(channelId) + " was already advertised");
continue;
}

const auto topic = chan.at("topic").get<std::string>();
if (!isWhitelisted(topic, _options.clientTopicWhitelistPatterns)) {
sendStatus(hdl, StatusLevel::Error,
"Can't advertise channel " + std::to_string(channelId) + ", topic '" + topic +
"' not whitelisted");
sendStatusAndLogMsg(hdl, StatusLevel::Error,
"Can't advertise channel " + std::to_string(channelId) + ", topic '" +
topic + "' not whitelisted");
continue;
}
ClientAdvertisement advertisement{};
Expand All @@ -672,7 +676,7 @@ inline void Server<ServerConfiguration>::handleTextMessage(ConnHandle hdl, const
std::unique_lock<std::shared_mutex> clientChannelsLock(_clientChannelsMutex);
auto clientPublicationsIt = _clientChannels.find(hdl);
if (clientPublicationsIt == _clientChannels.end()) {
sendStatus(hdl, StatusLevel::Error, "Client has no advertised channels");
sendStatusAndLogMsg(hdl, StatusLevel::Error, "Client has no advertised channels");
break;
}

Expand Down Expand Up @@ -801,12 +805,12 @@ inline void Server<ServerConfiguration>::handleTextMessage(ConnHandle hdl, const
_handlers.subscribeConnectionGraphHandler(false);
}
} else {
sendStatus(hdl, StatusLevel::Error,
"Client was not subscribed to connection graph updates");
sendStatusAndLogMsg(hdl, StatusLevel::Error,
"Client was not subscribed to connection graph updates");
}
} break;
default: {
sendStatus(hdl, StatusLevel::Error, "Unrecognized client opcode \"" + op + "\"");
sendStatusAndLogMsg(hdl, StatusLevel::Error, "Unrecognized client opcode \"" + op + "\"");
} break;
}
}
Expand All @@ -815,7 +819,7 @@ template <typename ServerConfiguration>
inline void Server<ServerConfiguration>::handleBinaryMessage(ConnHandle hdl, const uint8_t* msg,
size_t length) {
if (length < 1) {
sendStatus(hdl, StatusLevel::Error, "Received an empty binary message");
sendStatusAndLogMsg(hdl, StatusLevel::Error, "Received an empty binary message");
return;
}

Expand All @@ -824,17 +828,18 @@ inline void Server<ServerConfiguration>::handleBinaryMessage(ConnHandle hdl, con
const auto requiredCapabilityIt = CAPABILITY_BY_CLIENT_BINARY_OPERATION.find(op);
if (requiredCapabilityIt != CAPABILITY_BY_CLIENT_BINARY_OPERATION.end() &&
!hasCapability(requiredCapabilityIt->second)) {
sendStatus(hdl, StatusLevel::Error,
"Binary operation '" + std::to_string(static_cast<int>(op)) +
"' not supported as server capability '" + requiredCapabilityIt->second +
"' is missing");
sendStatusAndLogMsg(hdl, StatusLevel::Error,
"Binary operation '" + std::to_string(static_cast<int>(op)) +
"' not supported as server capability '" + requiredCapabilityIt->second +
"' is missing");
return;
}

switch (op) {
case ClientBinaryOpcode::MESSAGE_DATA: {
if (length < 5) {
sendStatus(hdl, StatusLevel::Error, "Invalid message length " + std::to_string(length));
sendStatusAndLogMsg(hdl, StatusLevel::Error,
"Invalid message length " + std::to_string(length));
return;
}
const auto timestamp = std::chrono::duration_cast<std::chrono::nanoseconds>(
Expand All @@ -845,15 +850,15 @@ inline void Server<ServerConfiguration>::handleBinaryMessage(ConnHandle hdl, con

auto clientPublicationsIt = _clientChannels.find(hdl);
if (clientPublicationsIt == _clientChannels.end()) {
sendStatus(hdl, StatusLevel::Error, "Client has no advertised channels");
sendStatusAndLogMsg(hdl, StatusLevel::Error, "Client has no advertised channels");
return;
}

auto& clientPublications = clientPublicationsIt->second;
const auto& channelIt = clientPublications.find(channelId);
if (channelIt == clientPublications.end()) {
sendStatus(hdl, StatusLevel::Error,
"Channel " + std::to_string(channelId) + " is not advertised");
sendStatusAndLogMsg(hdl, StatusLevel::Error,
"Channel " + std::to_string(channelId) + " is not advertised");
return;
}

Expand All @@ -872,8 +877,8 @@ inline void Server<ServerConfiguration>::handleBinaryMessage(ConnHandle hdl, con
case ClientBinaryOpcode::SERVICE_CALL_REQUEST: {
ServiceRequest request;
if (length < request.size()) {
sendStatus(hdl, StatusLevel::Error,
"Invalid service call request length " + std::to_string(length));
sendStatusAndLogMsg(hdl, StatusLevel::Error,
"Invalid service call request length " + std::to_string(length));
return;
}

Expand All @@ -882,8 +887,9 @@ inline void Server<ServerConfiguration>::handleBinaryMessage(ConnHandle hdl, con
{
std::shared_lock<std::shared_mutex> lock(_servicesMutex);
if (_services.find(request.serviceId) == _services.end()) {
sendStatus(hdl, StatusLevel::Error,
"Service " + std::to_string(request.serviceId) + " is not advertised");
sendStatusAndLogMsg(
hdl, StatusLevel::Error,
"Service " + std::to_string(request.serviceId) + " is not advertised");
return;
}
}
Expand All @@ -893,8 +899,8 @@ inline void Server<ServerConfiguration>::handleBinaryMessage(ConnHandle hdl, con
}
} break;
default: {
sendStatus(hdl, StatusLevel::Error,
"Unrecognized client opcode " + std::to_string(uint8_t(op)));
sendStatusAndLogMsg(hdl, StatusLevel::Error,
"Unrecognized client opcode " + std::to_string(uint8_t(op)));
} break;
}
}
Expand Down Expand Up @@ -1057,12 +1063,11 @@ inline void Server<ServerConfiguration>::sendMessage(ConnHandle clientHandle, Ch

const auto bufferSizeinBytes = con->get_buffered_amount();
if (bufferSizeinBytes + payloadSize >= _options.sendBufferLimitBytes) {
FOXGLOVE_DEBOUNCE(
[this]() {
_server.get_elog().write(
WARNING, "Connection send buffer limit reached, messages will be dropped...");
},
2500);
const auto logFn = [this, clientHandle]() {
sendStatusAndLogMsg(clientHandle, StatusLevel::Warning,
"Connection send buffer limit reached, messages will be dropped...");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a user I'd find it more helpful if this said something like "Server send buffer full. Dropping message on topic /foo/bar".

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that this would be more helpful, but due to log debounce, you wouldn't see it for every topic for which messages are dropped

};
FOXGLOVE_DEBOUNCE(logFn, 2500);
return;
}

Expand Down