Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

calling thread.stop() can cause AttributeError #833

Closed
cooraz opened this issue Feb 8, 2017 · 10 comments
Closed

calling thread.stop() can cause AttributeError #833

cooraz opened this issue Feb 8, 2017 · 10 comments

Comments

@cooraz
Copy link

cooraz commented Feb 8, 2017

I am using the publish/subscribe part of redis and using the builtin thread to receive events.
However, sometimes it will fail when calling stop()

I am using a 'with' statement to start and stop the thread, as shown below:

class redis_handler:
    def __init__(self, port=0):
        self.__thread = None
        if port > 0:
            self.__r = redis.StrictRedis(host='127.0.0.1', port=port)
        else:
            self.__r = redis.StrictRedis(host='127.0.0.1')
        self.__p = self.__r.pubsub()

    def __enter__(self):
        self.__p.subscribe(**{"dummy": self.redis_subscribe_cb})
        self.__thread = self.__p.run_in_thread(sleep_time=0.05)
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.__thread.stop()

    def redis_subscribe_cb(self, message):
        print "event received:", message

In the 'with' statement i will normally wait for an specific event, but sometimes it will fail when exiting the 'with' statement
The traceback when this happens are:

redis_test/redis_handler.py:28: in __exit__
    self.__thread.stop()
/usr/lib/python2.7/site-packages/redis/client.py:2354: in stop
    self.pubsub.punsubscribe()
/usr/lib/python2.7/site-packages/redis/client.py:2213: in punsubscribe
    return self.execute_command('PUNSUBSCRIBE', *args)
/usr/lib/python2.7/site-packages/redis/client.py:2161: in execute_command
    self._execute(connection, connection.send_command, *args)
/usr/lib/python2.7/site-packages/redis/client.py:2165: in _execute
    return command(*args)
/usr/lib/python2.7/site-packages/redis/connection.py:563: in send_command
    self.send_packed_command(self.pack_command(*args))
/usr/lib/python2.7/site-packages/redis/connection.py:543: in send_packed_command
    self._sock.sendall(item)
E   AttributeError: 'NoneType' object has no attribute 'sendall'

it seems like _sock is getting deleted while the send_packed_command is running

@rolette
Copy link

rolette commented Feb 22, 2017

It's another incarnation of #732. There's a PR that fixes it, but hasn't been merged yet.

@cooraz
Copy link
Author

cooraz commented Apr 6, 2017

I have implemented the fix on my clients, and it seems to work quiet well on the linux clients. However, it can still happen on the windows clients:

redis_test\redis_handler.py:29: in exit
self.__thread.stop()
C:\Python27\lib\site-packages\redis\client.py:2354: in stop
self.pubsub.punsubscribe()
C:\Python27\lib\site-packages\redis\client.py:2213: in punsubscribe
return self.execute_command('PUNSUBSCRIBE', *args)
C:\Python27\lib\site-packages\redis\client.py:2161: in execute_command
self._execute(connection, connection.send_command, *args)
C:\Python27\lib\site-packages\redis\client.py:2165: in _execute
return command(*args)
C:\Python27\lib\site-packages\redis\connection.py:580: in send_command
self.send_packed_command(self.pack_command(*args))
C:\Python27\lib\site-packages\redis\connection.py:560: in send_packed_command
self._sock.sendall(item)
E AttributeError: 'NoneType' object has no attribute 'sendall'

@rolette
Copy link

rolette commented Apr 6, 2017

Interesting. I don't have a Windows client to test with, but if you come up with a fix, I'm happy to merge it into my PR for #732 (assuming it still hasn't been merged).

@cooraz
Copy link
Author

cooraz commented Jun 12, 2017

I have a minimal example code working on a windows client right now.
I have one thread simulating a client publsihing messages
and one thread simulating a server that are both sending and listening for messages.

RedisTest.py:

import SyncServer
import time
import threading

sync = SyncServer.SyncServer()


class RunnerThread(threading.Thread):
    def __init__(self, sync):
        threading.Thread.__init__(self, name="PThread")
        self.sync = sync

    def run(self):
        while True:
            time.sleep(0.1)
            sync.publish_event("Thread", "Test")


sync.add_client("Thread")

thread1 = RunnerThread(sync)
thread1.start()

while True:
    with sync as s:
        s.waitForClients("Test")
    sync.clear()
    time.sleep(0.1)

SyncServer.py:

import redis
import time
import logging


class SyncServer:
    def __init__(self, port=0):
        self.received_events = []
        self.clients = []
        self.__thread = None
        self.last_event = None
        self.last_name = None
        self.resend_last = False
        if port != 0:
            self.__r = redis.StrictRedis(host='172.29.15.191', port=port)
        else:
            self.__r = redis.StrictRedis(host='172.29.15.191')
        self.__p = self.__r.pubsub()

    def __enter__(self):
        for c in self.clients:
            self.__p.subscribe(**{c: self.redis_subscribe_cb})
        self.__thread = self.__p.run_in_thread(sleep_time=0.05)
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        time.sleep(0.1)
        self.__thread.stop()

    def redis_subscribe_cb(self, message):
        logging.debug("Received msg. {0}".format(message))
        self.received_events.append([message['channel'], message['data']])

    def wait_for_all(self, clientList, event):
        Found = False
        while not Found:
            Found = True
            for c in clientList:
                try:
                    self.received_events.index([c, str(event)])
                    Found &= True
                except ValueError:
                    Found = False
            if self.resend_last:
                self.__r.publish(self.last_name, self.last_event)
        self.resend_last = False
        return Found

    def waitForClients(self, event):
            return self.wait_for_all(self.clients, event)

    def add_client(self, name):
        self.clients.append(name)

    def publish_event(self, name, event):
        self.__r.publish(name, event)
        self.last_event = event
        self.last_name = name
        self.resend_last = True

    def clear(self):
        self.received_events = []

This seems to replicate the error pretty consistent on my windows client (windows server 2008).

After adding some logging, it seems like the main thread and the subscriber thread is have a race condition on the connection.
I have a log here:

2017-06-12 13:44:45,721 MainThread connection.py:417 (init): Sock init
2017-06-12 13:44:45,721 MainThread connection.py:567 (send_packed_command): sock is disconnected
2017-06-12 13:44:45,721 MainThread connection.py:461 (connect): sock is connected
2017-06-12 13:44:45,721 MainThread connection.py:572 (send_packed_command): total items: 1
2017-06-12 13:44:45,721 MainThread connection.py:574 (send_packed_command): item is being send
2017-06-12 13:44:46,065 PThread connection.py:417 (init): Sock init
2017-06-12 13:44:46,065 PThread connection.py:567 (send_packed_command): sock is disconnected
2017-06-12 13:44:46,065 PThread connection.py:461 (connect): sock is connected
2017-06-12 13:44:46,065 PThread connection.py:572 (send_packed_command): total items: 1
2017-06-12 13:44:46,065 PThread connection.py:574 (send_packed_command): item is being send
2017-06-12 13:44:46,065 WorkerThread SyncServer.py:31 (redis_subscribe_cb): Received msg. {'pattern': None, 'type': 'message', 'channel': 'Thread', 'data': 'Test'}
2017-06-12 13:44:46,065 MainThread connection.py:572 (send_packed_command): total items: 1
2017-06-12 13:44:46,065 MainThread connection.py:574 (send_packed_command): item is being send
2017-06-12 13:44:46,065 WorkerThread SyncServer.py:31 (redis_subscribe_cb): Received msg. {'pattern': None, 'type': 'message', 'channel': 'Thread', 'data': 'Test'}
2017-06-12 13:44:46,174 PThread connection.py:572 (send_packed_command): total items: 1
2017-06-12 13:44:46,174 PThread connection.py:574 (send_packed_command): item is being send
2017-06-12 13:44:46,174 MainThread connection.py:572 (send_packed_command): total items: 1
2017-06-12 13:44:46,174 WorkerThread SyncServer.py:31 (redis_subscribe_cb): Received msg. {'pattern': None, 'type': 'message', 'channel': 'Thread', 'data': 'Test'}
2017-06-12 13:44:46,174 MainThread connection.py:574 (send_packed_command): item is being send
2017-06-12 13:44:46,174 MainThread connection.py:572 (send_packed_command): total items: 1
2017-06-12 13:44:46,174 WorkerThread connection.py:561 (disconnect): sock closed
2017-06-12 13:44:46,174 MainThread connection.py:574 (send_packed_command): item is being send

It seem like the WorkerThread is closing the connection while send_packed_command is using it. To avoid this, I tried to add a mutex protecting on disconnect and send_packed_command in connection.py.
It clearly helps, but after a while, an AttributeError occurs on the connection variable in client.py (execute_command)

I will keep you updated, if I find a more permanent solution

@cooraz
Copy link
Author

cooraz commented Jun 13, 2017

Ok, it seems like i found the cause of the second issue.
First of all the traceback:

Traceback (most recent call last):
  File "C:/RedisStresser/RedisTest.py", line 27, in <module>
    s.waitForClients("Test")
  File "C:\RedisStresser\SyncServer.py", line 28, in __exit__
    self.__thread.stop()
  File "C:\Python27\lib\site-packages\redis\client.py", line 2366, in stop
    self.pubsub.punsubscribe()
  File "C:\Python27\lib\site-packages\redis\client.py", line 2224, in punsubscribe
    return self.execute_command('PUNSUBSCRIBE', *args)
  File "C:\Python27\lib\site-packages\redis\client.py", line 2171, in execute_command
    self._execute(connection, connection.send_command, *args)
AttributeError: 'NoneType' object has no attribute 'send_command'

And here is what happens:
When the 'with'-clause of MainThread is done, it calls self.__thread.stop()
This will call unsubscribe() and punsubscribe() in client.py
unsubscribe() goes fine, but because I have no pattern subscribtions, WorkerThread jumps out of the while-loop and calls connection reset before punsubscribe is executed, leaving the connection variable a NoneType.

The logs to back it up:

2017-06-13 09:52:19,486 MainThread client.py:2252 (unsubscribe): Unsubscribing
2017-06-13 09:52:19,486 MainThread client.py:2170 (execute_command): Execute pubsub cmd
2017-06-13 09:52:19,486 WorkerThread SyncServer.py:31 (redis_subscribe_cb): Received msg. {'pattern': None, 'type': 'message', 'channel': 'Thread', 'data': 'Test'}
2017-06-13 09:52:19,486 MainThread client.py:2221 (punsubscribe): pUnsubscribing
2017-06-13 09:52:19,486 WorkerThread client.py:2104 (reset): Connection reset
2017-06-13 09:52:19,486 MainThread client.py:2170 (execute_command): Execute pubsub cmd

The proposed fix for this would be to improve the stop() method of the thread, only unsubscribing channels/patterns, if there are any.

class PubSubWorkerThread(threading.Thread):
    def __init__(self, pubsub, sleep_time):
        super(PubSubWorkerThread, self).__init__()
        self.pubsub = pubsub
        self.sleep_time = sleep_time
        self._running = False

    def run(self):
        if self._running:
            return
        self._running = True
        pubsub = self.pubsub
        sleep_time = self.sleep_time
        while pubsub.subscribed:
            pubsub.get_message(ignore_subscribe_messages=True,
                               timeout=sleep_time)
        pubsub.close()
        self._running = False

    def stop(self):
        # stopping simply unsubscribes from all channels and patterns.
        # the unsubscribe responses that are generated will short circuit
        # the loop in run(), calling pubsub.close() to clean up the connection
        if self.pubsub.channels:
            self.pubsub.unsubscribe()
        if self.pubsub.patterns:
            self.pubsub.punsubscribe()

cooraz added a commit to cooraz/redis-py that referenced this issue Jun 13, 2017
@cooraz
Copy link
Author

cooraz commented Jun 13, 2017

Actually the latter seems to be the key issue. I have now removed the mutex from connection.py and only implemented the fix in stop().
My RedisTest has been running for a few hours without faults, whereas it would normally cause an AttributeError in matter of minutes prior to the fix.

I have implemented this fix on my four windows clients, just to give it some exercise.

Btw. I replicated the error on my windows server 2008 clients, however on my windows 10 pc, I haven't had any success with reproducing the error.

@rolette I committed the fix on my fork, happy to hear your thoughts on it.

@cooraz
Copy link
Author

cooraz commented Jun 13, 2017

Quick update:
I got an exception on my linux clients now:

Exception in thread Thread-2:
Traceback (most recent call last):
  File "/usr/lib64/python2.7/threading.py", line 811, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/site-packages/redis/client.py", line 2345, in run
    timeout=sleep_time)
  File "/usr/lib/python2.7/site-packages/redis/client.py", line 2262, in get_message
    return self.handle_message(response, ignore_subscribe_messages)
  File "/usr/lib/python2.7/site-packages/redis/client.py", line 2271, in handle_message
    message_type = nativestr(response[0])
TypeError: 'long' object has no attribute '__getitem__'


Exception in thread Thread-1:
Traceback (most recent call last):
  File "/usr/lib64/python2.7/threading.py", line 811, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/site-packages/redis/client.py", line 2345, in run
    timeout=sleep_time)
  File "/usr/lib/python2.7/site-packages/redis/client.py", line 2262, in get_message
    return self.handle_message(response, ignore_subscribe_messages)
  File "/usr/lib/python2.7/site-packages/redis/client.py", line 2271, in handle_message
    message_type = nativestr(response[0])
TypeError: 'long' object has no attribute '__getitem__'

This is actually an error I saw earlier and made a small quick-and-dirty fix for it, to debug the other issue (I was pretty sure it was because I stressed the redis server)

The fix was just to check that the response received was not a long:

    def get_message(self, ignore_subscribe_messages=False, timeout=0):
        """
        Get the next message if one is available, otherwise None.

        If timeout is specified, the system will wait for `timeout` seconds
        before returning. Timeout should be specified as a floating point
        number.
        """
        response = self.parse_response(block=False, timeout=timeout)
        if response and not isinstance(response, long):
            return self.handle_message(response, ignore_subscribe_messages)
        return None

It seems like the redis server did send an integer reply back and it is unexpected. Anyway I don't believe the code should crash because of that.

I might look in to it, if I get some time.
My redis server is running 3.2.5 and the first step would be to upgrade it to the latest version I guess.

@rolette
Copy link

rolette commented Jun 13, 2017

I saw your update earlier this morning and realized that this is not the same issue as #732, but it follows the same pattern. Most of the stack trace is the same, but it's coming from a different path.

I've never used the pub/sub bits in redis-py, but based on your log messages, it appears that two different threads are trying to use the same connection. If the main thread is trying to reset the connection while the worker thread is still in its polling loop, you'll run into the same sort of issues I fixed on the connection pool in my PR for #732.

The change you checked in to have stop() skip over unsubscribing will help in some cases, but the underlying problem is still there for anyone that has active subscriptions.

@cooraz
Copy link
Author

cooraz commented Jun 14, 2017

Yeah you are right, it is not the same issue as #732, I saw that when I started to dig in to it.

The thing is that the way the workerthread is stopped, is by removing any active subscriptions, causing it to exit the loop.
The main thread calls stop() to unsubscribe all active subscriptions. It doesn't reset the connection.

But I actually don't skip unsubscribing. I check if any active subscriptions are available in either channels or patterns and unsubscribe it accordingly.
The issue was that it was calling unsubscribe() first and punsubscribe() after. If I only have subscriptions in channels, the workerthread will exit the loop after the unsubscribe() call, resetting the connection before the main thread call punsubscribe() which is unnecessary anyway, since no pattern subscriptions are active.

@andymccurdy
Copy link
Contributor

This issue was fixed in version 3.2.0.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants