Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Queue declare fails for specific values of x-expires #97

Open
sphuber opened this issue Nov 16, 2020 · 3 comments
Open

Queue declare fails for specific values of x-expires #97

sphuber opened this issue Nov 16, 2020 · 3 comments

Comments

@sphuber
Copy link

sphuber commented Nov 16, 2020

Consider the following simple script:

#!/usr/bin/env python
import asyncio
import aiormq

X_EXPIRES = 32767

async def main():
    # Perform connection
    connection = await aiormq.connect("amqp://guest:guest@localhost:5672/")

    # Creating a channel
    channel = await connection.channel()
    queue = await channel.queue_declare('test2', arguments={'x-expires': X_EXPIRES})

    # Sending the message
    await channel.basic_publish(b'Hello World!', routing_key='hello')
    print(" [x] Sent 'Hello World!'")


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

With the value X_EXPIRES = 32767 this works just fine, however, when upping it by one to X_EXPIRES = 32768 the code will except with the following stacktrace:

Traceback (most recent call last):
  File "/home/aiida/.virtualenvs/aiida_dev/lib/python3.7/site-packages/aiormq/base.py", line 25, in __inner
    return await self.task
concurrent.futures._base.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "./test_rmq.py", line 20, in <module>
    loop.run_until_complete(main())
  File "/usr/lib/python3.7/asyncio/base_events.py", line 587, in run_until_complete
    return future.result()
  File "./test_rmq.py", line 12, in main
    queue = await channel.queue_declare('test2', arguments={'x-expires': 32768})
  File "/home/aiida/.virtualenvs/aiida_dev/lib/python3.7/site-packages/aiormq/channel.py", line 713, in queue_declare
    timeout=timeout,
  File "/home/aiida/.virtualenvs/aiida_dev/lib/python3.7/site-packages/aiormq/base.py", line 168, in wrap
    return await self.create_task(func(self, *args, **kwargs))
  File "/home/aiida/.virtualenvs/aiida_dev/lib/python3.7/site-packages/aiormq/base.py", line 27, in __inner
    raise self.exception from e
  File "/usr/lib/python3.7/asyncio/tasks.py", line 630, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/home/aiida/.virtualenvs/aiida_dev/lib/python3.7/site-packages/aiormq/base.py", line 27, in __inner
    raise self.exception from e
  File "/usr/lib/python3.7/asyncio/tasks.py", line 630, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/home/aiida/.virtualenvs/aiida_dev/lib/python3.7/site-packages/aiormq/base.py", line 27, in __inner
    raise self.exception from e
  File "/home/aiida/.virtualenvs/aiida_dev/lib/python3.7/site-packages/aiormq/connection.py", line 423, in __close_writer
    await writer.wait_closed()
  File "/usr/lib/python3.7/asyncio/streams.py", line 323, in wait_closed
    await self._protocol._closed
  File "/home/aiida/.virtualenvs/aiida_dev/lib/python3.7/site-packages/aiormq/connection.py", line 375, in __reader
    weight, channel, frame = await self.__receive_frame()
  File "/home/aiida/.virtualenvs/aiida_dev/lib/python3.7/site-packages/aiormq/connection.py", line 327, in __receive_frame
    frame_header = await self.reader.readexactly(1)
  File "/usr/lib/python3.7/asyncio/streams.py", line 679, in readexactly
    await self._wait_for_data('readexactly')
  File "/usr/lib/python3.7/asyncio/streams.py", line 473, in _wait_for_data
    await self._waiter
  File "/usr/lib/python3.7/asyncio/selector_events.py", line 814, in _read_ready__data_received
    data = self._sock.recv(self.max_size)
ConnectionResetError: [Errno 104] Connection reset by peer

It seems like it may have something to do with the incorrect interpretation or casting of the integer value, because the magical limit corresponds to 2^15 == 32768 so maybe it is defined as signed two byte int. The same behavior happens for example at 2^16 == 65536 where X_EXPIRES = 66535 fails but X_EXPIRES = 66536 works again. So it seems the range [32768 - 65535] i.e. 2^15 <= X_EXPIRES < 2^16 will cause the exception.

Note that this only seems to happen with RabbitMQ 3.5. I cannot reproduce this with RabbitMQ 3.6.
Have you got an idea why this might be happening, and would it be possible to create a patch for this?

@mosquito
Copy link
Owner

Please provide your RabbitMQ version and try using the new one. A similar problem has been found in pamqp before.

@sphuber
Copy link
Author

sphuber commented Nov 16, 2020

My RabbitMQ version is 3.5.7. As I mentioned, I can confirm that this bug is not present in combination with RMQ 3.6. I realize that this version has been EOL as of 31 October 2016, but it is still the default version of Ubuntu 16.04, which is an LTS and which has support until 2024.

@sphuber
Copy link
Author

sphuber commented Nov 16, 2020

Update: adding the following at the top of the script will make things work:

import pamqp.encode
pamqp.encode.support_deprecated_rabbitmq()

This suggestion came from a comment on a related issue on aio-pika by @gmr.

By the way, this made me realize that aiormq does not explicitly declare pamqp as a direct dependency in setup.py even though it does use it extensively. Do you want me to open another issue for this?
Edit: I must have misread the setup.py because it seems it does already specify an explicit dependency on pamqp

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants