Skip to content

Commit

Permalink
Update README.md
Browse files Browse the repository at this point in the history
  • Loading branch information
lukas8219 authored Nov 17, 2023
1 parent c9720e4 commit 2f17202
Showing 1 changed file with 35 additions and 61 deletions.
96 changes: 35 additions & 61 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,18 +1,7 @@
# Last value caching exchange
# JSON filtering Exchange

This is a pretty simple implementation of a last value cache using
RabbitMQ's pluggable exchange types feature.

The last value cache is intended to solve problems like the following:
say I am using messaging to send notifications of some changing values
to clients; now, when a new client connects, it won't know the value
until it changes.

The last value exchange acts like a direct exchange (binding keys are
compared for equality with routing keys); but, it also keeps track of
the last value that was published with each routing key, and when a
queue is bound, it automatically enqueues the last value for the
binding key.
Plugin built for routing based upon JSON fields. Route your messages using JSON filters and operators such as `$or` and `$ne`.
Roadmap: Support overriding JSON filters so you can have you own implementation.

## Supported RabbitMQ Versions

Expand All @@ -24,6 +13,15 @@ Latest version of this plugin [requires Erlang 25.0 or later versions](https://w

## Installation

TL;DR:
```base
make
DIST_AS_EZ=yes make dist
```

Copy the .EZ file generated on `plugins` dir into your Broker plugin directory

Binary builds of this plugin from
the [Community Plugins page](https://www.rabbitmq.com/community-plugins.html).

Expand All @@ -37,58 +35,34 @@ You can build and install it like any other plugin (see

## Usage

To use the LVC exchange, with e.g., py-amqp:
To use the Json Filters exchange, here's an example with AMQPlib
```javascript
const { queue: emailProcessorQueue } = await channel.assertQueue('processor:email:queue');
const { queue: phoneProcessorQueue } = await channel.assertQueue('processor:phone:queue');
const { queue: generalProcessorQueue } = await channel.assertQueue('processor:contact:queue');

import amqplib.client_0_8 as amqp
ch = amqp.Connection().channel()
ch.exchange_declare("lvc", type="x-lvc")
ch.basic_publish(amqp.Message("value"),
exchange="lvc", routing_key="rabbit")
ch.queue_declare("q")
ch.queue_bind("q", "lvc", "rabbit")
print ch.basic_get("q").body
const { exchange } = await channel.assertExchange('processor:contact:exchange', 'json');

## Limitations
channel.bindQueue(emailProcessorQueue, exchange, '', { 'x-json-filters': JSON.stringify([{ contactType: 'email' }]) });
channel.bindQueue(phoneProcessorQueue, exchange, '', { 'x-json-filters': JSON.stringify([{ contactType: 'phone' }]) });
channel.bindQueue(generalProcessorQueue, exchange, '', { 'x-json-filters': JSON.stringify([{ $or: [{ contactType: 'email' }, { contactType: 'phone' }] }]) });
```

### "Recent value cache"

Message publishing in AMQP 0-9-1 is asynchronous by design and thus introduces natural race conditions
when there's more than one publisher. It is quite possible to see different
last-values but the same subsequent message stream, from different
clients.

This won't matter if you simply want to have a value to show until you
get an update. If it does matter, consider e.g. using sequence IDs so you
can notice out-of-order messages.

There's also a race in the pluggable exchanges hook, so that clients
can "see" the binding before the hook has been run; for the LVC, this
means that there's a possiblity that messages will get queued before
the last value. For this reason, I'm thinking of tagging the last
value messages so that clients can fast-forward to it, or ignore it,
if necessary.

### Values v. deltas
## Limitations
The JSON filtering is not well suited for production yet.
- Filters are only applied at Root-level of JSON for now (It is not clear wether we shall support N-level nested filtering)
-

One question that springs to mind when considering last value caches
is "what if I'm sending deltas rather than the whole value?". The
LVC exchange doesn't address this use case, but you could do it by
using two exchanges and posting full values to the LVC (from the
originating process -- presumably you'd be using deltas to save on
downstream bandwidth).
### Performance Limitation
We can expect a lower performance than the Headers Exchange, since this is actually headers exchange implementation ons Steroids.

### Direct exchanges only
### Supported operators
- PlainObject = an JSON object to match ALL the values specified
- $or = An array of PlainObject. Will match if any of those are true
- $ne = An array of PlainObject. Will match if none of those are true

The semantics of another kind of value-caching exchange (other than
fanout) aren't obvious. To choose one option though, say a
newly-bound queue was to be given all values that match its binding
key -- this would require every supported exchange type to supply a
reverse routing match procedure.
### What if I want to go into production w/ it?
It's up to you. Feel free to benchmark w/ real cases using RabbitMQ PerfTool. Enhancements are always welcome!

## Creating a Release

1. Update `broker_version_requirements` in `helpers.bzl` & `Makefile` (Optional)
1. Update the plugin version in `MODULE.bazel`
1. Push a tag (i.e. `v3.12.0`) with the matching version
1. Allow the Release workflow to run and create a draft release
1. Review and publish the release
You can nest operators.

0 comments on commit 2f17202

Please sign in to comment.