mirror of
https://github.com/evilhero/mylar
synced 2024-12-26 17:46:52 +00:00
263 lines
8.7 KiB
Python
Executable file
263 lines
8.7 KiB
Python
Executable file
import threading
|
|
import time
|
|
import unittest
|
|
|
|
import cherrypy
|
|
from cherrypy._cpcompat import get_daemon, set
|
|
from cherrypy.process import wspbus
|
|
|
|
|
|
msg = "Listener %d on channel %s: %s."
|
|
|
|
|
|
class PublishSubscribeTests(unittest.TestCase):
|
|
|
|
def get_listener(self, channel, index):
|
|
def listener(arg=None):
|
|
self.responses.append(msg % (index, channel, arg))
|
|
return listener
|
|
|
|
def test_builtin_channels(self):
|
|
b = wspbus.Bus()
|
|
|
|
self.responses, expected = [], []
|
|
|
|
for channel in b.listeners:
|
|
for index, priority in enumerate([100, 50, 0, 51]):
|
|
b.subscribe(channel, self.get_listener(channel, index), priority)
|
|
|
|
for channel in b.listeners:
|
|
b.publish(channel)
|
|
expected.extend([msg % (i, channel, None) for i in (2, 1, 3, 0)])
|
|
b.publish(channel, arg=79347)
|
|
expected.extend([msg % (i, channel, 79347) for i in (2, 1, 3, 0)])
|
|
|
|
self.assertEqual(self.responses, expected)
|
|
|
|
def test_custom_channels(self):
|
|
b = wspbus.Bus()
|
|
|
|
self.responses, expected = [], []
|
|
|
|
custom_listeners = ('hugh', 'louis', 'dewey')
|
|
for channel in custom_listeners:
|
|
for index, priority in enumerate([None, 10, 60, 40]):
|
|
b.subscribe(channel, self.get_listener(channel, index), priority)
|
|
|
|
for channel in custom_listeners:
|
|
b.publish(channel, 'ah so')
|
|
expected.extend([msg % (i, channel, 'ah so') for i in (1, 3, 0, 2)])
|
|
b.publish(channel)
|
|
expected.extend([msg % (i, channel, None) for i in (1, 3, 0, 2)])
|
|
|
|
self.assertEqual(self.responses, expected)
|
|
|
|
def test_listener_errors(self):
|
|
b = wspbus.Bus()
|
|
|
|
self.responses, expected = [], []
|
|
channels = [c for c in b.listeners if c != 'log']
|
|
|
|
for channel in channels:
|
|
b.subscribe(channel, self.get_listener(channel, 1))
|
|
# This will break since the lambda takes no args.
|
|
b.subscribe(channel, lambda: None, priority=20)
|
|
|
|
for channel in channels:
|
|
self.assertRaises(wspbus.ChannelFailures, b.publish, channel, 123)
|
|
expected.append(msg % (1, channel, 123))
|
|
|
|
self.assertEqual(self.responses, expected)
|
|
|
|
|
|
class BusMethodTests(unittest.TestCase):
|
|
|
|
def log(self, bus):
|
|
self._log_entries = []
|
|
def logit(msg, level):
|
|
self._log_entries.append(msg)
|
|
bus.subscribe('log', logit)
|
|
|
|
def assertLog(self, entries):
|
|
self.assertEqual(self._log_entries, entries)
|
|
|
|
def get_listener(self, channel, index):
|
|
def listener(arg=None):
|
|
self.responses.append(msg % (index, channel, arg))
|
|
return listener
|
|
|
|
def test_start(self):
|
|
b = wspbus.Bus()
|
|
self.log(b)
|
|
|
|
self.responses = []
|
|
num = 3
|
|
for index in range(num):
|
|
b.subscribe('start', self.get_listener('start', index))
|
|
|
|
b.start()
|
|
try:
|
|
# The start method MUST call all 'start' listeners.
|
|
self.assertEqual(set(self.responses),
|
|
set([msg % (i, 'start', None) for i in range(num)]))
|
|
# The start method MUST move the state to STARTED
|
|
# (or EXITING, if errors occur)
|
|
self.assertEqual(b.state, b.states.STARTED)
|
|
# The start method MUST log its states.
|
|
self.assertLog(['Bus STARTING', 'Bus STARTED'])
|
|
finally:
|
|
# Exit so the atexit handler doesn't complain.
|
|
b.exit()
|
|
|
|
def test_stop(self):
|
|
b = wspbus.Bus()
|
|
self.log(b)
|
|
|
|
self.responses = []
|
|
num = 3
|
|
for index in range(num):
|
|
b.subscribe('stop', self.get_listener('stop', index))
|
|
|
|
b.stop()
|
|
|
|
# The stop method MUST call all 'stop' listeners.
|
|
self.assertEqual(set(self.responses),
|
|
set([msg % (i, 'stop', None) for i in range(num)]))
|
|
# The stop method MUST move the state to STOPPED
|
|
self.assertEqual(b.state, b.states.STOPPED)
|
|
# The stop method MUST log its states.
|
|
self.assertLog(['Bus STOPPING', 'Bus STOPPED'])
|
|
|
|
def test_graceful(self):
|
|
b = wspbus.Bus()
|
|
self.log(b)
|
|
|
|
self.responses = []
|
|
num = 3
|
|
for index in range(num):
|
|
b.subscribe('graceful', self.get_listener('graceful', index))
|
|
|
|
b.graceful()
|
|
|
|
# The graceful method MUST call all 'graceful' listeners.
|
|
self.assertEqual(set(self.responses),
|
|
set([msg % (i, 'graceful', None) for i in range(num)]))
|
|
# The graceful method MUST log its states.
|
|
self.assertLog(['Bus graceful'])
|
|
|
|
def test_exit(self):
|
|
b = wspbus.Bus()
|
|
self.log(b)
|
|
|
|
self.responses = []
|
|
num = 3
|
|
for index in range(num):
|
|
b.subscribe('stop', self.get_listener('stop', index))
|
|
b.subscribe('exit', self.get_listener('exit', index))
|
|
|
|
b.exit()
|
|
|
|
# The exit method MUST call all 'stop' listeners,
|
|
# and then all 'exit' listeners.
|
|
self.assertEqual(set(self.responses),
|
|
set([msg % (i, 'stop', None) for i in range(num)] +
|
|
[msg % (i, 'exit', None) for i in range(num)]))
|
|
# The exit method MUST move the state to EXITING
|
|
self.assertEqual(b.state, b.states.EXITING)
|
|
# The exit method MUST log its states.
|
|
self.assertLog(['Bus STOPPING', 'Bus STOPPED', 'Bus EXITING', 'Bus EXITED'])
|
|
|
|
def test_wait(self):
|
|
b = wspbus.Bus()
|
|
|
|
def f(method):
|
|
time.sleep(0.2)
|
|
getattr(b, method)()
|
|
|
|
for method, states in [('start', [b.states.STARTED]),
|
|
('stop', [b.states.STOPPED]),
|
|
('start', [b.states.STARTING, b.states.STARTED]),
|
|
('exit', [b.states.EXITING]),
|
|
]:
|
|
threading.Thread(target=f, args=(method,)).start()
|
|
b.wait(states)
|
|
|
|
# The wait method MUST wait for the given state(s).
|
|
if b.state not in states:
|
|
self.fail("State %r not in %r" % (b.state, states))
|
|
|
|
def test_block(self):
|
|
b = wspbus.Bus()
|
|
self.log(b)
|
|
|
|
def f():
|
|
time.sleep(0.2)
|
|
b.exit()
|
|
def g():
|
|
time.sleep(0.4)
|
|
threading.Thread(target=f).start()
|
|
threading.Thread(target=g).start()
|
|
threads = [t for t in threading.enumerate() if not get_daemon(t)]
|
|
self.assertEqual(len(threads), 3)
|
|
|
|
b.block()
|
|
|
|
# The block method MUST wait for the EXITING state.
|
|
self.assertEqual(b.state, b.states.EXITING)
|
|
# The block method MUST wait for ALL non-main, non-daemon threads to finish.
|
|
threads = [t for t in threading.enumerate() if not get_daemon(t)]
|
|
self.assertEqual(len(threads), 1)
|
|
# The last message will mention an indeterminable thread name; ignore it
|
|
self.assertEqual(self._log_entries[:-1],
|
|
['Bus STOPPING', 'Bus STOPPED',
|
|
'Bus EXITING', 'Bus EXITED',
|
|
'Waiting for child threads to terminate...'])
|
|
|
|
def test_start_with_callback(self):
|
|
b = wspbus.Bus()
|
|
self.log(b)
|
|
try:
|
|
events = []
|
|
def f(*args, **kwargs):
|
|
events.append(("f", args, kwargs))
|
|
def g():
|
|
events.append("g")
|
|
b.subscribe("start", g)
|
|
b.start_with_callback(f, (1, 3, 5), {"foo": "bar"})
|
|
# Give wait() time to run f()
|
|
time.sleep(0.2)
|
|
|
|
# The callback method MUST wait for the STARTED state.
|
|
self.assertEqual(b.state, b.states.STARTED)
|
|
# The callback method MUST run after all start methods.
|
|
self.assertEqual(events, ["g", ("f", (1, 3, 5), {"foo": "bar"})])
|
|
finally:
|
|
b.exit()
|
|
|
|
def test_log(self):
|
|
b = wspbus.Bus()
|
|
self.log(b)
|
|
self.assertLog([])
|
|
|
|
# Try a normal message.
|
|
expected = []
|
|
for msg in ["O mah darlin'"] * 3 + ["Clementiiiiiiiine"]:
|
|
b.log(msg)
|
|
expected.append(msg)
|
|
self.assertLog(expected)
|
|
|
|
# Try an error message
|
|
try:
|
|
foo
|
|
except NameError:
|
|
b.log("You are lost and gone forever", traceback=True)
|
|
lastmsg = self._log_entries[-1]
|
|
if "Traceback" not in lastmsg or "NameError" not in lastmsg:
|
|
self.fail("Last log message %r did not contain "
|
|
"the expected traceback." % lastmsg)
|
|
else:
|
|
self.fail("NameError was not raised as expected.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|