Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed issue #41 #47

Merged
merged 1 commit into from
Feb 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions PurpleExplorer/Helpers/ITopicHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ namespace PurpleExplorer.Helpers
public interface ITopicHelper
{
public Task<NamespaceInfo> GetNamespaceInfo(string connectionString);
public Task<IList<ServiceBusTopic>> GetTopics(string connectionString);
public Task<IList<ServiceBusTopic>> GetTopicsAndSubscriptions(string connectionString);
public Task<ServiceBusTopic> GetTopic(string connectionString, string topicPath, bool retrieveSubscriptions);
public Task<IList<ServiceBusSubscription>> GetSubscriptions(string connectionString, string topicPath);
public Task<ServiceBusSubscription> GetSubscription(string connectionString, string topicPath, string subscriptionName);
public Task<IList<Message>> GetDlqMessages(string connectionString, string topic, string subscription);
public Task<IList<Models.Message>> GetMessagesBySubscription(string connectionString, string topicName, string subscriptionName);
public Task SendMessage(string connectionString, string topicPath, string content);
public Task SendMessage(string connectionString, string topicPath, AzureMessage message);
public Task DeleteMessage(string connectionString, string topicPath, string subscriptionPath, Message message, bool isDlq);
public Task<SubscriptionRuntimeInfo> GetSubscriptionRuntimeInfo(string connectionString, string topicPath,
string subscriptionName);
public Task<long> PurgeMessages(string connectionString, string topicPath, string subscriptionPath, bool isDlq);
public Task<long> TransferDlqMessages(string connectionString, string topicPath, string subscriptionPath);
public Task ResubmitDlqMessage(string connectionString, string topicPath, string subscriptionPath,
Expand Down
37 changes: 24 additions & 13 deletions PurpleExplorer/Helpers/TopicHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,38 +16,49 @@ public class TopicHelper : ITopicHelper
{
private int _maxMessageCount = 100;

public async Task<IList<ServiceBusTopic>> GetTopics(string connectionString)
public async Task<IList<ServiceBusTopic>> GetTopicsAndSubscriptions(string connectionString)
{
IList<ServiceBusTopic> topics = new List<ServiceBusTopic>();
var client = new ManagementClient(connectionString);
var busTopics = await client.GetTopicsAsync();
await client.CloseAsync();

await Task.WhenAll(busTopics.Select(async t =>
await Task.WhenAll(busTopics.Select(async topic =>
{
var topicName = t.Path;
var subscriptions = await GetSubscriptions(connectionString, topicName);

var newTopic = new ServiceBusTopic
{
Name = topicName
};
var newTopic = new ServiceBusTopic(topic);

var subscriptions = await GetSubscriptions(connectionString, newTopic.Name);
newTopic.AddSubscriptions(subscriptions.ToArray());
topics.Add(newTopic);
}));

return topics;
}

public async Task<SubscriptionRuntimeInfo> GetSubscriptionRuntimeInfo(string connectionString,
string topicPath, string subscriptionName)
public async Task<ServiceBusTopic> GetTopic(string connectionString, string topicPath, bool retrieveSubscriptions)
{
ManagementClient client = new ManagementClient(connectionString);
var client = new ManagementClient(connectionString);
var busTopics = await client.GetTopicAsync(topicPath);
await client.CloseAsync();

var newTopic = new ServiceBusTopic(busTopics);

if (retrieveSubscriptions)
{
var subscriptions = await GetSubscriptions(connectionString, newTopic.Name);
newTopic.AddSubscriptions(subscriptions.ToArray());
}

return newTopic;
}

public async Task<ServiceBusSubscription> GetSubscription(string connectionString, string topicPath, string subscriptionName)
{
var client = new ManagementClient(connectionString);
var runtimeInfo = await client.GetSubscriptionRuntimeInfoAsync(topicPath, subscriptionName);
await client.CloseAsync();

return runtimeInfo;
return new ServiceBusSubscription(runtimeInfo);
}

public async Task<IList<ServiceBusSubscription>> GetSubscriptions(string connectionString, string topicPath)
Expand Down
26 changes: 20 additions & 6 deletions PurpleExplorer/Models/MessageCollection.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Linq;
using DynamicData;
using ReactiveUI;

Expand All @@ -10,6 +11,7 @@ namespace PurpleExplorer.Models
/// </summary>
public abstract class MessageCollection : ReactiveObject
{
// These are needed to be set before fetching messages, in the second constructor
private long _messageCount;
private long _dlqCount;

Expand All @@ -19,13 +21,13 @@ public abstract class MessageCollection : ReactiveObject
public long MessageCount
{
get => _messageCount;
set => this.RaiseAndSetIfChanged(ref _messageCount, value);
private set => this.RaiseAndSetIfChanged(ref _messageCount, value);
}

public long DlqCount
{
get => _dlqCount;
set => this.RaiseAndSetIfChanged(ref _dlqCount, value);
private set => this.RaiseAndSetIfChanged(ref _dlqCount, value);
}

protected MessageCollection()
Expand All @@ -43,25 +45,37 @@ protected MessageCollection(long messageCount, long dlqCount) : this()
public void AddMessages(IEnumerable<Message> messages)
{
Messages.AddRange(messages);
_messageCount = Messages.Count;
MessageCount = Messages.Count;
}

public void RemoveMessage(string messageId)
{
Messages.Remove(Messages.Single(msg => msg.MessageId.Equals(messageId)));
MessageCount = Messages.Count;
}

public void ClearMessages()
{
Messages.Clear();
_messageCount = 0;
MessageCount = Messages.Count;
}

public void AddDlqMessages(IEnumerable<Message> dlqMessages)
{
DlqMessages.AddRange(dlqMessages);
_dlqCount = DlqMessages.Count;
DlqCount = DlqMessages.Count;
}

public void RemoveDlqMessage(string messageId)
{
DlqMessages.Remove(DlqMessages.Single(msg => msg.MessageId.Equals(messageId)));
DlqCount = DlqMessages.Count;
}

public void ClearDlqMessages()
{
DlqMessages.Clear();
_dlqCount = 0;
DlqCount = DlqMessages.Count;
}
}
}
8 changes: 8 additions & 0 deletions PurpleExplorer/Models/ServiceBusResource.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
using System;
using System.Collections.ObjectModel;

namespace PurpleExplorer.Models
{
public class ServiceBusResource
{
public string Name { get; set; }
public DateTime CreatedTime { get; set; }
public string ConnectionString { get; set; }
public ObservableCollection<ServiceBusQueue> Queues { get; private set; }
public ObservableCollection<ServiceBusTopic> Topics { get; private set; }
Expand All @@ -30,5 +32,11 @@ public void AddQueues(params ServiceBusQueue[] queues)
Queues.Add(queue);
}
}

public override bool Equals(object? obj)
{
var comparingResource = obj as ServiceBusResource;
return Name.Equals(comparingResource.Name) && CreatedTime.Equals(comparingResource.CreatedTime);
}
}
}
11 changes: 11 additions & 0 deletions PurpleExplorer/Models/ServiceBusTopic.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Collections.ObjectModel;
using Microsoft.Azure.ServiceBus.Management;

namespace PurpleExplorer.Models
{
Expand All @@ -8,6 +9,16 @@ public class ServiceBusTopic
public ObservableCollection<ServiceBusSubscription> Subscriptions { get; private set; }
public ServiceBusResource ServiceBus { get; set; }

public ServiceBusTopic()
{
}

public ServiceBusTopic(TopicDescription topicDescription)
{
Name = topicDescription.Path;
}


public void AddSubscriptions(params ServiceBusSubscription[] subscriptions)
{
Subscriptions ??= new ObservableCollection<ServiceBusSubscription>();
Expand Down
44 changes: 36 additions & 8 deletions PurpleExplorer/ViewModels/MainWindowViewModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -183,18 +183,19 @@ await ModalWindowHelper.ShowModalWindow<ConnectionStringWindow, ConnectionString
LoggingService.Log("Connecting...");

var namespaceInfo = await _topicHelper.GetNamespaceInfo(ConnectionString);
var topics = await _topicHelper.GetTopics(ConnectionString);
var topics = await _topicHelper.GetTopicsAndSubscriptions(ConnectionString);
var queues = await _queueHelper.GetQueues(ConnectionString);

var newResource = new ServiceBusResource
var serviceBusResource = new ServiceBusResource
{
Name = namespaceInfo.Name,
ConnectionString = this.ConnectionString
CreatedTime = namespaceInfo.CreatedTime,
ConnectionString = ConnectionString
};

newResource.AddQueues(queues.ToArray());
newResource.AddTopics(topics.ToArray());
ConnectedServiceBuses.Add(newResource);
serviceBusResource.AddQueues(queues.ToArray());
serviceBusResource.AddTopics(topics.ToArray());
ConnectedServiceBuses.Add(serviceBusResource);
LoggingService.Log("Connected to Service Bus: " + namespaceInfo.Name);
}
catch (ArgumentException)
Expand Down Expand Up @@ -306,6 +307,20 @@ public void RefreshTabHeaders()
}
}

public async Task RefreshConnectedServiceBuses()
{
foreach (var serviceBusResource in ConnectedServiceBuses)
{
var topicsAndSubscriptions = await _topicHelper.GetTopicsAndSubscriptions(serviceBusResource.ConnectionString);
var serviceBusQueues = await _queueHelper.GetQueues(serviceBusResource.ConnectionString);

serviceBusResource.Topics.Clear();
serviceBusResource.Queues.Clear();
serviceBusResource.AddTopics(topicsAndSubscriptions.ToArray());
serviceBusResource.AddQueues(serviceBusQueues.ToArray());
}
}

public async void AddMessage()
{
var viewModal = new AddMessageWindowViewModal();
Expand Down Expand Up @@ -419,21 +434,34 @@ public async void PurgeMessages(string isDlqText)
var connectionString = CurrentSubscription.Topic.ServiceBus.ConnectionString;
purgedCount = await _topicHelper.PurgeMessages(connectionString, _currentTopic.Name,
_currentSubscription.Name, isDlq);

if (!isDlq)
CurrentSubscription.ClearMessages();
else
CurrentSubscription.ClearDlqMessages();
}

if (CurrentQueue != null)
{
var connectionString = CurrentQueue.ServiceBus.ConnectionString;
purgedCount = await _queueHelper.PurgeMessages(connectionString, _currentQueue.Name, isDlq);

if (!isDlq)
CurrentQueue.ClearMessages();
else
CurrentQueue.ClearDlqMessages();
}

LoggingService.Log($"Purged {purgedCount} messages in {purgingPath}");

// Refreshing messages
await FetchMessages();
}

public async Task Refresh()
{
await FetchMessages();
await RefreshConnectedServiceBuses();
RefreshTabHeaders();
await FetchMessages();
}

public async Task FetchMessages()
Expand Down
10 changes: 10 additions & 0 deletions PurpleExplorer/ViewModels/MessageDetailsWindowViewModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,22 @@ public async void DeleteMessage(Window window)
var connectionString = Subscription.Topic.ServiceBus.ConnectionString;
await _topicHelper.DeleteMessage(connectionString, Subscription.Topic.Name, Subscription.Name,
Message, Message.IsDlq);

if(!Message.IsDlq)
Subscription.RemoveMessage(Message.MessageId);
else
Subscription.RemoveDlqMessage(Message.MessageId);
}

if (Queue != null)
{
var connectionString = Queue.ServiceBus.ConnectionString;
await _queueHelper.DeleteMessage(connectionString, Queue.Name, Message, Message.IsDlq);

if(!Message.IsDlq)
Queue.RemoveMessage(Message.MessageId);
else
Queue.RemoveDlqMessage(Message.MessageId);
}

_loggingService.Log($"Message deleted, MessageId: {Message.MessageId}");
Expand Down