Remove wraptor library that is never used

This commit is contained in:
Michiel van Baak 2020-02-10 10:35:49 +01:00
parent e3ee77d6ea
commit ecefa41c93
21 changed files with 0 additions and 561 deletions

View File

@ -1,7 +0,0 @@
""" Wraptor
Provides a set of useful decorators and other wrap-like python utility functions
"""
__version__ = "0.6.0"

View File

@ -1,5 +0,0 @@
from wraptor.context.maybe import maybe
from wraptor.context.throttle import throttle
from wraptor.context.timer import timer
__all__ = ['maybe', 'throttle', 'timer']

View File

@ -1,27 +0,0 @@
import sys
import inspect
class _SkippedBlock(Exception):
pass
class maybe(object):
def __init__(self, predicate):
self.predicate = predicate
def __empty_fn(self, *args, **kwargs):
return None
def __enter__(self):
if not self.predicate():
sys.settrace(self.__empty_fn)
frame = inspect.currentframe(1)
frame.f_trace = self.__trace
def __trace(self, *args, **kwargs):
raise _SkippedBlock()
def __exit__(self, type, value, traceback):
if isinstance(value, _SkippedBlock):
sys.settrace(None)
return True
return False

View File

@ -1,26 +0,0 @@
from threading import Thread
from wraptor.context import maybe
def test_basic():
with maybe(lambda: False):
assert False
check = False
with maybe(lambda: True):
check = True
assert check
def test_threads():
def worker(arr, index):
for i in range(5):
with maybe(lambda: i == 3):
arr[index] = True
workers = 100
arr = [False for i in range(workers)]
threads = [Thread(target=worker, args=(arr, i)) for i in range(workers)]
[t.start() for t in threads]
[t.join() for t in threads]
assert all(arr)

View File

@ -1,17 +0,0 @@
import time
from wraptor.context import throttle
def test_basic():
arr = []
t = throttle(.1)
with t:
arr.append(1)
with t:
arr.append(1)
time.sleep(.2)
with t:
arr.append(1)
assert arr == [1, 1]

View File

@ -1,15 +0,0 @@
from wraptor.context import timer
import time
def test_basic():
with timer() as t:
time.sleep(0.1000000) # sleep 100 ms
assert t.interval >= 100
def test_params():
with timer('test') as t:
pass
assert t.name == 'test'
assert str(t).startswith('test')

View File

@ -1,16 +0,0 @@
import time
from wraptor.context import maybe
class throttle(maybe):
def __init__(self, seconds=1):
self.seconds = seconds
self.last_run = 0
def predicate():
now = time.time()
if now > self.last_run + self.seconds:
self.last_run = now
return True
return False
maybe.__init__(self, predicate)

View File

@ -1,18 +0,0 @@
import time
class timer(object):
__slots__ = ('name', 'interval', 'start', 'end')
def __init__(self, name=None):
self.name = name
def __enter__(self):
self.start = time.time() * 1e3
return self
def __exit__(self, *args):
self.end = time.time() * 1e3
self.interval = self.end - self.start
def __str__(self):
return "%s took %.03f ms" % (self.name, self.interval)

View File

@ -1,6 +0,0 @@
from wraptor.decorators.memoize import memoize
from wraptor.decorators.throttle import throttle
from wraptor.decorators.timeout import timeout, TimeoutException
from wraptor.decorators.exception_catcher import exception_catcher
__all__ = ['memoize', 'throttle', 'timeout', 'TimeoutException', 'exception_catcher']

View File

@ -1,29 +0,0 @@
from functools import wraps
import sys
import Queue
def exception_catcher(fn):
""" Catch exceptions raised by the decorated function.
Call check() to raise any caught exceptions.
"""
exceptions = Queue.Queue()
@wraps(fn)
def wrapped(*args, **kwargs):
try:
ret = fn(*args, **kwargs)
except Exception:
exceptions.put(sys.exc_info())
raise
return ret
def check():
try:
item = exceptions.get(block=False)
klass, value, tb = item
raise klass, value, tb
except Queue.Empty:
pass
setattr(wrapped, 'check', check)
return wrapped

View File

@ -1,70 +0,0 @@
from functools import wraps
import time
from hashlib import md5
import threading
class memoize(object):
""" Memoize the results of a function. Supports an optional timeout
for automatic cache expiration.
If the optional manual_flush argument is True, a function called
"flush_cache" will be added to the wrapped function. When
called, it will remove all the timed out values from the cache.
If you use this decorator as a class method, you must specify
instance_method=True otherwise you will have a single shared
cache for every instance of your class.
This decorator is thread safe.
"""
def __init__(self, timeout=None, manual_flush=False, instance_method=False):
self.timeout = timeout
self.manual_flush = manual_flush
self.instance_method = instance_method
self.cache = {}
self.cache_lock = threading.RLock()
def __call__(self, fn):
if self.instance_method:
@wraps(fn)
def rewrite_instance_method(instance, *args, **kwargs):
# the first time we are called we overwrite the method
# on the class instance with a new memoize instance
if hasattr(instance, fn.__name__):
bound_fn = fn.__get__(instance, instance.__class__)
new_memoizer = memoize(self.timeout, self.manual_flush)(bound_fn)
setattr(instance, fn.__name__, new_memoizer)
return getattr(instance, fn.__name__)(*args, **kwargs)
return rewrite_instance_method
def flush_cache():
with self.cache_lock:
for key in self.cache.keys():
if (time.time() - self.cache[key][1]) > self.timeout:
del(self.cache[key])
@wraps(fn)
def wrapped(*args, **kwargs):
kw = kwargs.items()
kw.sort()
key_str = repr((args, kw))
key = md5(key_str).hexdigest()
with self.cache_lock:
try:
result, cache_time = self.cache[key]
if self.timeout is not None and (time.time() - cache_time) > self.timeout:
raise KeyError
except KeyError:
result, _ = self.cache[key] = (fn(*args, **kwargs), time.time())
if not self.manual_flush and self.timeout is not None:
flush_cache()
return result
if self.manual_flush:
wrapped.flush_cache = flush_cache
return wrapped

View File

@ -1,18 +0,0 @@
from wraptor.decorators import timeout, throttle, memoize
import pytest
with_decorators = pytest.mark.parametrize("decorator", [
timeout, throttle, memoize
])
@with_decorators
def test_called_with_args(decorator):
test_args = [1, 2, [1, 2, 3], { 'asdf': 5 }]
test_kwargs = { 'a': 1, 'b': [1, 2, 3] }
@decorator()
def fn(*args, **kwargs):
assert tuple(test_args) == args
assert test_kwargs == kwargs
fn(*test_args, **test_kwargs)

View File

@ -1,16 +0,0 @@
from wraptor.decorators import exception_catcher
import threading
import pytest
def test_basic():
@exception_catcher
def work():
raise Exception()
t = threading.Thread(target=work)
t.start()
t.join()
with pytest.raises(Exception):
work.check()

View File

@ -1,147 +0,0 @@
import time
from wraptor.decorators import memoize
def test_basic_noargs():
arr = []
@memoize()
def fn():
arr.append(1)
fn()
fn()
assert arr == [1]
def test_basic_args():
arr = []
@memoize()
def fn(*args, **kwargs):
arr.append(1)
s_args = [1, 2, 3]
fn(*s_args)
fn(*s_args)
c_args = [[1], "asdjf", {'a': 5}]
fn(*c_args)
fn(*c_args)
kw_args = {'a': 234, 'b': [1, 2, "asdf"], 'c': [5, 6]}
kw_args_2 = {'a': 234, 'b': [1, 3, "asdf"], 'c': [5, 6]}
fn(*c_args, **kw_args)
fn(*c_args, **kw_args_2)
fn(*c_args, **kw_args)
fn(fn)
fn(fn)
assert arr == [1, 1, 1, 1, 1]
def test_timeout():
arr = []
@memoize(timeout=.1)
def fn(*args, **kwargs):
arr.append(1)
fn(1, 2, 3)
time.sleep(.2)
fn(1, 2, 3)
assert arr == [1, 1]
def test_auto_flush():
memoize_inst = memoize(timeout=.1)
@memoize_inst
def fn(*args, **kwargs):
pass
fn(1, 2, 3)
assert len(memoize_inst.cache.keys()) == 1
time.sleep(.2)
fn(1, 2, 3)
assert len(memoize_inst.cache.keys()) == 1
def test_manual_flush():
memoize_inst = memoize(timeout=.1, manual_flush=True)
@memoize_inst
def fn(*args, **kwargs):
pass
fn(1, 2, 3)
assert len(memoize_inst.cache.keys()) == 1
time.sleep(.2)
fn(3, 4, 5)
assert len(memoize_inst.cache.keys()) == 2
time.sleep(.2)
fn.flush_cache()
assert len(memoize_inst.cache.keys()) == 0
def test_class_method():
import random
memoizer = memoize(manual_flush=True, instance_method=True)
class foo(object):
@memoizer
def bar(self, *args):
return random.random()
x = foo()
x2 = foo()
assert x.bar('a', 'b') != x2.bar('a', 'b')
assert x.bar('a', 'b') == x.bar('a', 'b')
assert x.bar('a', 'b') != x.bar('a', 'd')
assert x2.bar('a', 'b') == x2.bar('a', 'b')
# the memoizer should have made private caches for each instance
assert len(memoizer.cache) == 0
# now make sure that they don't share caches
res1 = x.bar('a', 'b')
res2 = x2.bar('a', 'b')
x.bar.flush_cache()
assert res1 != x.bar('a', 'b')
assert res2 == x2.bar('a', 'b')
def test_instance_method_extended():
class foo(object):
def __init__(self):
self.i = 0
@memoize(instance_method=True)
def bar(self, instance):
assert self == instance
self.i += 1
return self.i
f = foo()
assert f.bar(f) == 1
assert f.bar(f) == 1
def test_fail_instance_method():
""" Test that memoize without instance_method creates a globally
shared memoize instance (shared by all instances of the class)
"""
memoizer = memoize(manual_flush=True)
class foo(object):
def __init__(self, x):
self._x = x
@memoizer
def bar(self):
return self._x
x = foo(1)
x2 = foo(2)
assert x.bar() != x2.bar()
# note that they share the same cache
assert len(memoizer.cache) == 2

View File

@ -1,51 +0,0 @@
import time
from wraptor.decorators import throttle
def test_basic():
arr = []
@throttle(.1)
def test():
arr.append(1)
test()
test()
time.sleep(.2)
test()
assert arr == [1, 1]
def test_fail_instance_method():
""" Test that throttle without instance_method creates a globally
shared throttle instance (shared by all instances of the class)
"""
arr = []
class foo(object):
@throttle(1)
def bar(self):
arr.append(1)
x = foo()
x2 = foo()
x.bar()
x2.bar()
# throttle
assert arr == [1]
def test_class_method():
arr = []
class foo(object):
@throttle(1, instance_method=True)
def bar(self):
arr.append(1)
x = foo()
x2 = foo()
x.bar()
x2.bar()
assert arr == [1, 1]

View File

@ -1,31 +0,0 @@
import time
from wraptor.decorators import timeout, TimeoutException
def test_basic():
@timeout(1)
def fn():
try:
time.sleep(2)
assert False
except TimeoutException:
pass
fn()
def test_catch_exception_outsize():
@timeout(1)
def fn():
time.sleep(2)
assert False
try:
fn()
except TimeoutException:
pass
def test_cancels_signal():
@timeout(1)
def fn():
pass
fn()
time.sleep(1)

View File

@ -1,31 +0,0 @@
from functools import wraps
from wraptor import context
class throttle(object):
""" Throttle a function to execute at most 1 time per <seconds> seconds
The function is executed on the forward edge.
"""
def __init__(self, seconds=1, instance_method=False):
self.throttler = context.throttle(seconds=seconds)
self.seconds = seconds
self.instance_method = instance_method
def __call__(self, fn):
if self.instance_method:
@wraps(fn)
def rewrite_instance_method(instance, *args, **kwargs):
# the first time we are called we overwrite the method
# on the class instance with a new memoize instance
if hasattr(instance, fn.__name__):
bound_fn = fn.__get__(instance, instance.__class__)
new_throttler = throttle(self.seconds)(bound_fn)
setattr(instance, fn.__name__, new_throttler)
return getattr(instance, fn.__name__)(*args, **kwargs)
return rewrite_instance_method
@wraps(fn)
def wrapped(*args, **kwargs):
with self.throttler:
return fn(*args, **kwargs)
return wrapped

View File

@ -1,31 +0,0 @@
from functools import wraps
import signal
class TimeoutException(Exception):
pass
class timeout(object):
""" Basic timeout decorator
* Uses signals, so this can only be used in the main thread of execution
* seconds must be a positive integer
Signal implementation based on http://code.activestate.com/recipes/307871-timing-out-function/
"""
def __init__(self, seconds=1):
self.seconds = seconds
def __call__(self, fn):
def sig_handler(signum, frame):
raise TimeoutException()
@wraps(fn)
def wrapped(*args, **kwargs):
old = signal.signal(signal.SIGALRM, sig_handler)
signal.alarm(self.seconds)
try:
result = fn(*args, **kwargs)
finally:
signal.signal(signal.SIGALRM, old)
signal.alarm(0)
return result
return wrapped