1
0
Fork 0
mirror of https://github.com/evilhero/mylar synced 2024-12-21 23:32:23 +00:00

IMP: Updated deluge_client for v2 compatibility

This commit is contained in:
Barbeque Sauce 2019-12-02 13:30:27 -05:00 committed by evilhero
parent f8a8cb2a6b
commit 66897c1549
4 changed files with 443 additions and 217 deletions

View file

@ -1 +1 @@
from .client import DelugeRPCClient
from .client import DelugeRPCClient, FailedToReconnectException

View file

@ -2,6 +2,7 @@ import logging
import socket
import ssl
import struct
import warnings
import zlib
from .rencode import dumps, loads
@ -10,102 +11,265 @@ RPC_RESPONSE = 1
RPC_ERROR = 2
RPC_EVENT = 3
#MESSAGE_HEADER_SIZE = 5
MESSAGE_HEADER_SIZE = 5
READ_SIZE = 10
logger = logging.getLogger(__name__)
class ConnectionLostException(Exception):
class DelugeClientException(Exception):
"""Base exception for all deluge client exceptions"""
class ConnectionLostException(DelugeClientException):
pass
class CallTimeoutException(Exception):
class CallTimeoutException(DelugeClientException):
pass
class InvalidHeaderException(DelugeClientException):
pass
class FailedToReconnectException(DelugeClientException):
pass
class RemoteException(DelugeClientException):
pass
class DelugeRPCClient(object):
timeout = 20
def __init__(self, host, port, username, password):
def __init__(self, host, port, username, password, decode_utf8=False, automatic_reconnect=True):
self.host = host
self.port = port
self.username = username
self.password = password
self.deluge_version = None
# This is only applicable if deluge_version is 2
self.deluge_protocol_version = None
self.decode_utf8 = decode_utf8
if not self.decode_utf8:
warnings.warn('Using `decode_utf8=False` is deprecated, please set it to True.'
'The argument will be removed in a future release where it will be always True', DeprecationWarning)
self.automatic_reconnect = automatic_reconnect
self.request_id = 1
self.connected = False
self._create_socket()
def _create_socket(self, ssl_version=None):
if ssl_version is not None:
self._socket = ssl.wrap_socket(socket.socket(socket.AF_INET, socket.SOCK_STREAM), ssl_version=ssl_version)
else:
self._socket = ssl.wrap_socket(socket.socket(socket.AF_INET, socket.SOCK_STREAM))
self._socket.settimeout(self.timeout)
def connect(self):
"""
Connects to the Deluge instance
"""
self._connect()
logger.debug('Connected to Deluge, detecting daemon version')
self._detect_deluge_version()
logger.debug('Daemon version {} detected, logging in'.format(self.deluge_version))
if self.deluge_version == 2:
result = self.call('daemon.login', self.username, self.password, client_version='deluge-client')
else:
result = self.call('daemon.login', self.username, self.password)
logger.debug('Logged in with value %r' % result)
self.connected = True
def _connect(self):
logger.info('Connecting to %s:%s' % (self.host, self.port))
try:
self._socket.connect((self.host, self.port))
except ssl.SSLError as e:
if e.reason != 'UNSUPPORTED_PROTOCOL' or not hasattr(ssl, 'PROTOCOL_SSLv3'):
# Note: have not verified that we actually get errno 258 for this error
if (hasattr(ssl, 'PROTOCOL_SSLv3') and
(getattr(e, 'reason', None) == 'UNSUPPORTED_PROTOCOL' or e.errno == 258)):
logger.warning('Was unable to ssl handshake, trying to force SSLv3 (insecure)')
self._create_socket(ssl_version=ssl.PROTOCOL_SSLv3)
self._socket.connect((self.host, self.port))
else:
raise
logger.warning('Was unable to ssl handshake, trying to force SSLv3 (insecure)')
self._create_socket(ssl_version=ssl.PROTOCOL_SSLv3)
self._socket.connect((self.host, self.port))
logger.debug('Connected to Deluge, logging in')
result = self.call('daemon.login', self.username, self.password)
logger.debug('Logged in with value %r' % result)
self.connected = True
def disconnect(self):
"""
Disconnect from deluge
"""
if self.connected:
self._socket.close()
def call(self, method, *args, **kwargs):
"""
Calls an RPC function
"""
self._socket = None
self.connected = False
def _detect_deluge_version(self):
if self.deluge_version is not None:
return
self._send_call(1, None, 'daemon.info')
self._send_call(2, None, 'daemon.info')
self._send_call(2, 1, 'daemon.info')
result = self._socket.recv(1)
if result[:1] == b'D':
# This is a protocol deluge 2.0 was using before release
self.deluge_version = 2
self.deluge_protocol_version = None
# If we need the specific version of deluge 2, this is it.
daemon_version = self._receive_response(2, None, partial_data=result)
elif ord(result[:1]) == 1:
self.deluge_version = 2
self.deluge_protocol_version = 1
# If we need the specific version of deluge 2, this is it.
daemon_version = self._receive_response(2, 1, partial_data=result)
else:
self.deluge_version = 1
# Deluge 1 doesn't recover well from the bad request. Re-connect the socket.
self._socket.close()
self._create_socket()
self._connect()
def _send_call(self, deluge_version, protocol_version, method, *args, **kwargs):
self.request_id += 1
logger.debug('Calling reqid %s method %r with args:%r kwargs:%r' % (self.request_id, method, args, kwargs))
if method == 'daemon.login':
debug_args = list(args)
if len(debug_args) >= 2:
debug_args[1] = '<password hidden>'
logger.debug('Calling reqid %s method %r with args:%r kwargs:%r' % (self.request_id, method, debug_args, kwargs))
else:
logger.debug('Calling reqid %s method %r with args:%r kwargs:%r' % (self.request_id, method, args, kwargs))
req = ((self.request_id, method, args, kwargs), )
req = zlib.compress(dumps(req))
#self._socket.send('D' + struct.pack("!i", len(req))) # seems to be for the future !
if deluge_version == 2:
if protocol_version is None:
# This was a protocol for deluge 2 before they introduced protocol version numbers
self._socket.send(b'D' + struct.pack("!i", len(req)))
elif protocol_version == 1:
self._socket.send(struct.pack('!BI', protocol_version, len(req)))
else:
raise Exception('Deluge protocol version {} is not (yet) supported.'.format(protocol_version))
self._socket.send(req)
data = b''
def _receive_response(self, deluge_version, protocol_version, partial_data=b''):
expected_bytes = None
data = partial_data
while True:
try:
d = self._socket.recv(READ_SIZE)
except ssl.SSLError:
raise CallTimeoutException()
data += d
try:
data = zlib.decompress(data)
except zlib.error:
if not d:
raise ConnectionLostException()
continue
break
data = list(loads(data))
if deluge_version == 2:
if expected_bytes is None:
if len(data) < 5:
continue
header = data[:MESSAGE_HEADER_SIZE]
data = data[MESSAGE_HEADER_SIZE:]
if protocol_version is None:
if header[0] != b'D'[0]:
raise InvalidHeaderException('Expected D as first byte in reply')
elif ord(header[:1]) != protocol_version:
raise InvalidHeaderException(
'Expected protocol version ({}) as first byte in reply'.format(protocol_version)
)
if protocol_version is None:
expected_bytes = struct.unpack('!i', header[1:])[0]
else:
expected_bytes = struct.unpack('!I', header[1:])[0]
if len(data) >= expected_bytes:
data = zlib.decompress(data)
break
else:
try:
data = zlib.decompress(data)
except zlib.error:
if not d:
raise ConnectionLostException()
continue
break
data = list(loads(data, decode_utf8=self.decode_utf8))
msg_type = data.pop(0)
request_id = data.pop(0)
if msg_type == RPC_ERROR:
exception_type, exception_msg, traceback = data[0]
exception = type(str(exception_type), (Exception, ), {})
exception_msg = '%s\n\n%s' % (exception_msg, traceback)
if self.deluge_version == 2:
exception_type, exception_msg, _, traceback = data
# On deluge 2, exception arguments are sent as tuple
if self.decode_utf8:
exception_msg = ', '.join(exception_msg)
else:
exception_msg = b', '.join(exception_msg)
else:
exception_type, exception_msg, traceback = data[0]
if self.decode_utf8:
exception = type(str(exception_type), (RemoteException, ), {})
exception_msg = '%s\n%s' % (exception_msg,
traceback)
else:
exception = type(str(exception_type.decode('utf-8', 'ignore')), (RemoteException, ), {})
exception_msg = '%s\n%s' % (exception_msg.decode('utf-8', 'ignore'),
traceback.decode('utf-8', 'ignore'))
raise exception(exception_msg)
elif msg_type == RPC_RESPONSE:
retval = data[0]
return retval
def reconnect(self):
"""
Reconnect
"""
self.disconnect()
self._create_socket()
self.connect()
def call(self, method, *args, **kwargs):
"""
Calls an RPC function
"""
tried_reconnect = False
for _ in range(2):
try:
self._send_call(self.deluge_version, self.deluge_protocol_version, method, *args, **kwargs)
return self._receive_response(self.deluge_version, self.deluge_protocol_version)
except (socket.error, ConnectionLostException, CallTimeoutException):
if self.automatic_reconnect:
if tried_reconnect:
raise FailedToReconnectException()
else:
try:
self.reconnect()
except (socket.error, ConnectionLostException, CallTimeoutException):
raise FailedToReconnectException()
tried_reconnect = True
else:
raise
def __getattr__(self, item):
return RPCCaller(self.call, item)
class RPCCaller(object):
def __init__(self, caller, method=''):
self.caller = caller
self.method = method
def __getattr__(self, item):
return RPCCaller(self.caller, self.method+'.'+item)
def __call__(self, *args, **kwargs):
return self.caller(self.method, *args, **kwargs)

View file

@ -1,27 +1,3 @@
"""
rencode -- Web safe object pickling/unpickling.
Public domain, Connelly Barnes 2006-2007.
The rencode module is a modified version of bencode from the
BitTorrent project. For complex, heterogeneous data structures with
many small elements, r-encodings take up significantly less space than
b-encodings:
>>> len(rencode.dumps({'a':0, 'b':[1,2], 'c':99}))
13
>>> len(bencode.bencode({'a':0, 'b':[1,2], 'c':99}))
26
The rencode format is not standardized, and may change with different
rencode module versions, so you should check that you are using the
same rencode version throughout your project.
"""
__version__ = '1.0.2'
__all__ = ['dumps', 'loads']
# Original bencode module by Petru Paler, et al.
#
# Modifications by Connelly Barnes:
@ -62,23 +38,50 @@ __all__ = ['dumps', 'loads']
# (The rencode module is licensed under the above license as well).
#
import sys
"""
rencode -- Web safe object pickling/unpickling.
Public domain, Connelly Barnes 2006-2007.
The rencode module is a modified version of bencode from the
BitTorrent project. For complex, heterogeneous data structures with
many small elements, r-encodings take up significantly less space than
b-encodings:
>>> len(rencode.dumps({'a':0, 'b':[1,2], 'c':99}))
13
>>> len(bencode.bencode({'a':0, 'b':[1,2], 'c':99}))
26
The rencode format is not standardized, and may change with different
rencode module versions, so you should check that you are using the
same rencode version throughout your project.
"""
py3 = False
if sys.version_info >= (3, 0):
py3 = True
long = int
unicode = str
def int2byte(c):
if py3:
return bytes([c])
else:
return chr(c)
import struct
import sys
from threading import Lock
try:
from future_builtins import zip
except ImportError:
# Ignore on Py3.
pass
__version__ = ('Python', 1, 0, 4)
__all__ = ['dumps', 'loads']
py3 = sys.version_info[0] >= 3
if py3:
long = int # pylint: disable=redefined-builtin
unicode = str # pylint: disable=redefined-builtin
def int2byte(c):
return bytes([c])
else:
def int2byte(c):
return chr(c)
# Default number of bits for serialized floats, either 32 or 64 (also a parameter for dumps()).
DEFAULT_FLOAT_BITS = 32
@ -87,19 +90,19 @@ MAX_INT_LENGTH = 64
# The bencode 'typecodes' such as i, d, etc have been extended and
# relocated on the base-256 character set.
CHR_LIST = int2byte(59)
CHR_DICT = int2byte(60)
CHR_INT = int2byte(61)
CHR_INT1 = int2byte(62)
CHR_INT2 = int2byte(63)
CHR_INT4 = int2byte(64)
CHR_INT8 = int2byte(65)
CHR_LIST = int2byte(59)
CHR_DICT = int2byte(60)
CHR_INT = int2byte(61)
CHR_INT1 = int2byte(62)
CHR_INT2 = int2byte(63)
CHR_INT4 = int2byte(64)
CHR_INT8 = int2byte(65)
CHR_FLOAT32 = int2byte(66)
CHR_FLOAT64 = int2byte(44)
CHR_TRUE = int2byte(67)
CHR_FALSE = int2byte(68)
CHR_NONE = int2byte(69)
CHR_TERM = int2byte(127)
CHR_TRUE = int2byte(67)
CHR_FALSE = int2byte(68)
CHR_NONE = int2byte(69)
CHR_TERM = int2byte(127)
# Positive integers with value embedded in typecode.
INT_POS_FIXED_START = 0
@ -118,12 +121,13 @@ STR_FIXED_START = 128
STR_FIXED_COUNT = 64
# Lists with length embedded in typecode.
LIST_FIXED_START = STR_FIXED_START+STR_FIXED_COUNT
LIST_FIXED_START = STR_FIXED_START + STR_FIXED_COUNT
LIST_FIXED_COUNT = 64
# Whether strings should be decoded when loading
_decode_utf8 = False
def decode_int(x, f):
f += 1
newf = x.index(CHR_TERM, f)
@ -133,39 +137,46 @@ def decode_int(x, f):
n = int(x[f:newf])
except (OverflowError, ValueError):
n = long(x[f:newf])
if x[f:f+1] == '-':
if x[f:f + 1] == '-':
if x[f + 1:f + 2] == '0':
raise ValueError
elif x[f:f+1] == '0' and newf != f+1:
elif x[f:f + 1] == '0' and newf != f + 1:
raise ValueError
return (n, newf+1)
return (n, newf + 1)
def decode_intb(x, f):
f += 1
return (struct.unpack('!b', x[f:f+1])[0], f+1)
return (struct.unpack('!b', x[f:f + 1])[0], f + 1)
def decode_inth(x, f):
f += 1
return (struct.unpack('!h', x[f:f+2])[0], f+2)
return (struct.unpack('!h', x[f:f + 2])[0], f + 2)
def decode_intl(x, f):
f += 1
return (struct.unpack('!l', x[f:f+4])[0], f+4)
return (struct.unpack('!l', x[f:f + 4])[0], f + 4)
def decode_intq(x, f):
f += 1
return (struct.unpack('!q', x[f:f+8])[0], f+8)
return (struct.unpack('!q', x[f:f + 8])[0], f + 8)
def decode_float32(x, f):
f += 1
n = struct.unpack('!f', x[f:f+4])[0]
return (n, f+4)
n = struct.unpack('!f', x[f:f + 4])[0]
return (n, f + 4)
def decode_float64(x, f):
f += 1
n = struct.unpack('!d', x[f:f+8])[0]
return (n, f+8)
n = struct.unpack('!d', x[f:f + 8])[0]
return (n, f + 8)
def decode_string(x, f):
colon = x.index(b':', f)
@ -173,36 +184,42 @@ def decode_string(x, f):
n = int(x[f:colon])
except (OverflowError, ValueError):
n = long(x[f:colon])
if x[f] == '0' and colon != f+1:
if x[f] == '0' and colon != f + 1:
raise ValueError
colon += 1
s = x[colon:colon+n]
s = x[colon:colon + n]
if _decode_utf8:
s = s.decode('utf8')
return (s, colon+n)
return (s, colon + n)
def decode_list(x, f):
r, f = [], f+1
while x[f:f+1] != CHR_TERM:
v, f = decode_func[x[f:f+1]](x, f)
r, f = [], f + 1
while x[f:f + 1] != CHR_TERM:
v, f = decode_func[x[f:f + 1]](x, f)
r.append(v)
return (tuple(r), f + 1)
def decode_dict(x, f):
r, f = {}, f+1
while x[f:f+1] != CHR_TERM:
k, f = decode_func[x[f:f+1]](x, f)
r[k], f = decode_func[x[f:f+1]](x, f)
r, f = {}, f + 1
while x[f:f + 1] != CHR_TERM:
k, f = decode_func[x[f:f + 1]](x, f)
r[k], f = decode_func[x[f:f + 1]](x, f)
return (r, f + 1)
def decode_true(x, f):
return (True, f+1)
return (True, f + 1)
def decode_false(x, f):
return (False, f+1)
return (False, f + 1)
def decode_none(x, f):
return (None, f+1)
return (None, f + 1)
decode_func = {}
decode_func[b'0'] = decode_string
@ -215,72 +232,81 @@ decode_func[b'6'] = decode_string
decode_func[b'7'] = decode_string
decode_func[b'8'] = decode_string
decode_func[b'9'] = decode_string
decode_func[CHR_LIST ] = decode_list
decode_func[CHR_DICT ] = decode_dict
decode_func[CHR_INT ] = decode_int
decode_func[CHR_INT1 ] = decode_intb
decode_func[CHR_INT2 ] = decode_inth
decode_func[CHR_INT4 ] = decode_intl
decode_func[CHR_INT8 ] = decode_intq
decode_func[CHR_LIST] = decode_list
decode_func[CHR_DICT] = decode_dict
decode_func[CHR_INT] = decode_int
decode_func[CHR_INT1] = decode_intb
decode_func[CHR_INT2] = decode_inth
decode_func[CHR_INT4] = decode_intl
decode_func[CHR_INT8] = decode_intq
decode_func[CHR_FLOAT32] = decode_float32
decode_func[CHR_FLOAT64] = decode_float64
decode_func[CHR_TRUE ] = decode_true
decode_func[CHR_FALSE ] = decode_false
decode_func[CHR_NONE ] = decode_none
decode_func[CHR_TRUE] = decode_true
decode_func[CHR_FALSE] = decode_false
decode_func[CHR_NONE] = decode_none
def make_fixed_length_string_decoders():
def make_decoder(slen):
def f(x, f):
s = x[f+1:f+1+slen]
s = x[f + 1:f + 1 + slen]
if _decode_utf8:
s = s.decode("utf8")
return (s, f+1+slen)
s = s.decode('utf8')
return (s, f + 1 + slen)
return f
for i in range(STR_FIXED_COUNT):
decode_func[int2byte(STR_FIXED_START+i)] = make_decoder(i)
decode_func[int2byte(STR_FIXED_START + i)] = make_decoder(i)
make_fixed_length_string_decoders()
def make_fixed_length_list_decoders():
def make_decoder(slen):
def f(x, f):
r, f = [], f+1
for i in range(slen):
v, f = decode_func[x[f:f+1]](x, f)
r, f = [], f + 1
for _ in range(slen):
v, f = decode_func[x[f:f + 1]](x, f)
r.append(v)
return (tuple(r), f)
return f
for i in range(LIST_FIXED_COUNT):
decode_func[int2byte(LIST_FIXED_START+i)] = make_decoder(i)
decode_func[int2byte(LIST_FIXED_START + i)] = make_decoder(i)
make_fixed_length_list_decoders()
def make_fixed_length_int_decoders():
def make_decoder(j):
def f(x, f):
return (j, f+1)
return (j, f + 1)
return f
for i in range(INT_POS_FIXED_COUNT):
decode_func[int2byte(INT_POS_FIXED_START+i)] = make_decoder(i)
decode_func[int2byte(INT_POS_FIXED_START + i)] = make_decoder(i)
for i in range(INT_NEG_FIXED_COUNT):
decode_func[int2byte(INT_NEG_FIXED_START+i)] = make_decoder(-1-i)
decode_func[int2byte(INT_NEG_FIXED_START + i)] = make_decoder(-1 - i)
make_fixed_length_int_decoders()
def make_fixed_length_dict_decoders():
def make_decoder(slen):
def f(x, f):
r, f = {}, f+1
for j in range(slen):
k, f = decode_func[x[f:f+1]](x, f)
r[k], f = decode_func[x[f:f+1]](x, f)
r, f = {}, f + 1
for _ in range(slen):
k, f = decode_func[x[f:f + 1]](x, f)
r[k], f = decode_func[x[f:f + 1]](x, f)
return (r, f)
return f
for i in range(DICT_FIXED_COUNT):
decode_func[int2byte(DICT_FIXED_START+i)] = make_decoder(i)
decode_func[int2byte(DICT_FIXED_START + i)] = make_decoder(i)
make_fixed_length_dict_decoders()
def loads(x, decode_utf8=False):
global _decode_utf8
_decode_utf8 = decode_utf8
@ -292,11 +318,12 @@ def loads(x, decode_utf8=False):
raise ValueError
return r
def encode_int(x, r):
if 0 <= x < INT_POS_FIXED_COUNT:
r.append(int2byte(INT_POS_FIXED_START+x))
r.append(int2byte(INT_POS_FIXED_START + x))
elif -INT_NEG_FIXED_COUNT <= x < 0:
r.append(int2byte(INT_NEG_FIXED_START-1-x))
r.append(int2byte(INT_NEG_FIXED_START - 1 - x))
elif -128 <= x < 128:
r.extend((CHR_INT1, struct.pack('!b', x)))
elif -32768 <= x < 32768:
@ -308,35 +335,42 @@ def encode_int(x, r):
else:
s = str(x)
if py3:
s = bytes(s, "ascii")
s = bytes(s, 'ascii')
if len(s) >= MAX_INT_LENGTH:
raise ValueError('overflow')
r.extend((CHR_INT, s, CHR_TERM))
def encode_float32(x, r):
r.extend((CHR_FLOAT32, struct.pack('!f', x)))
def encode_float64(x, r):
r.extend((CHR_FLOAT64, struct.pack('!d', x)))
def encode_bool(x, r):
r.append({False: CHR_FALSE, True: CHR_TRUE}[bool(x)])
def encode_none(x, r):
r.append(CHR_NONE)
def encode_string(x, r):
if len(x) < STR_FIXED_COUNT:
r.extend((int2byte(STR_FIXED_START + len(x)), x))
else:
s = str(len(x))
if py3:
s = bytes(s, "ascii")
s = bytes(s, 'ascii')
r.extend((s, b':', x))
def encode_unicode(x, r):
encode_string(x.encode("utf8"), r)
encode_string(x.encode('utf8'), r)
def encode_list(x, r):
if len(x) < LIST_FIXED_COUNT:
@ -349,7 +383,8 @@ def encode_list(x, r):
encode_func[type(i)](i, r)
r.append(CHR_TERM)
def encode_dict(x,r):
def encode_dict(x, r):
if len(x) < DICT_FIXED_COUNT:
r.append(int2byte(DICT_FIXED_START + len(x)))
for k, v in x.items():
@ -362,6 +397,7 @@ def encode_dict(x,r):
encode_func[type(v)](v, r)
r.append(CHR_TERM)
encode_func = {}
encode_func[int] = encode_int
encode_func[long] = encode_int
@ -375,14 +411,14 @@ encode_func[bool] = encode_bool
lock = Lock()
def dumps(x, float_bits=DEFAULT_FLOAT_BITS):
"""
Dump data structure to str.
Here float_bits is either 32 or 64.
"""
lock.acquire()
try:
with lock:
if float_bits == 32:
encode_func[float] = encode_float32
elif float_bits == 64:
@ -391,39 +427,41 @@ def dumps(x, float_bits=DEFAULT_FLOAT_BITS):
raise ValueError('Float bits (%d) is not 32 or 64' % float_bits)
r = []
encode_func[type(x)](x, r)
finally:
lock.release()
return b''.join(r)
def test():
f1 = struct.unpack('!f', struct.pack('!f', 25.5))[0]
f2 = struct.unpack('!f', struct.pack('!f', 29.3))[0]
f3 = struct.unpack('!f', struct.pack('!f', -0.6))[0]
L = (({b'a':15, b'bb':f1, b'ccc':f2, b'':(f3,(),False,True,b'')},(b'a',10**20),tuple(range(-100000,100000)),b'b'*31,b'b'*62,b'b'*64,2**30,2**33,2**62,2**64,2**30,2**33,2**62,2**64,False,False, True, -1, 2, 0),)
assert loads(dumps(L)) == L
d = dict(zip(range(-100000,100000),range(-100000,100000)))
d.update({b'a':20, 20:40, 40:41, f1:f2, f2:f3, f3:False, False:True, True:False})
L = (d, {}, {5:6}, {7:7,True:8}, {9:10, 22:39, 49:50, 44: b''})
assert loads(dumps(L)) == L
L = (b'', b'a'*10, b'a'*100, b'a'*1000, b'a'*10000, b'a'*100000, b'a'*1000000, b'a'*10000000)
assert loads(dumps(L)) == L
L = tuple([dict(zip(range(n),range(n))) for n in range(100)]) + (b'b',)
assert loads(dumps(L)) == L
L = tuple([dict(zip(range(n),range(-n,0))) for n in range(100)]) + (b'b',)
assert loads(dumps(L)) == L
L = tuple([tuple(range(n)) for n in range(100)]) + (b'b',)
assert loads(dumps(L)) == L
L = tuple([b'a'*n for n in range(1000)]) + (b'b',)
assert loads(dumps(L)) == L
L = tuple([b'a'*n for n in range(1000)]) + (None,True,None)
assert loads(dumps(L)) == L
assert loads(dumps(None)) == None
assert loads(dumps({None:None})) == {None:None}
assert 1e-10<abs(loads(dumps(1.1))-1.1)<1e-6
assert 1e-10<abs(loads(dumps(1.1,32))-1.1)<1e-6
assert abs(loads(dumps(1.1,64))-1.1)<1e-12
assert loads(dumps("Hello World!!"), decode_utf8=True)
ld = (({b'a': 15, b'bb': f1, b'ccc': f2, b'': (f3, (), False, True, b'')}, (b'a', 10**20),
tuple(range(-100000, 100000)), b'b' * 31, b'b' * 62, b'b' * 64, 2**30, 2**33, 2**62,
2**64, 2**30, 2**33, 2**62, 2**64, False, False, True, -1, 2, 0),)
assert loads(dumps(ld)) == ld
d = dict(zip(range(-100000, 100000), range(-100000, 100000)))
d.update({b'a': 20, 20: 40, 40: 41, f1: f2, f2: f3, f3: False, False: True, True: False})
ld = (d, {}, {5: 6}, {7: 7, True: 8}, {9: 10, 22: 39, 49: 50, 44: b''})
assert loads(dumps(ld)) == ld
ld = (b'', b'a' * 10, b'a' * 100, b'a' * 1000, b'a' * 10000, b'a' * 100000, b'a' * 1000000, b'a' * 10000000)
assert loads(dumps(ld)) == ld
ld = tuple([dict(zip(range(n), range(n))) for n in range(100)]) + (b'b',)
assert loads(dumps(ld)) == ld
ld = tuple([dict(zip(range(n), range(-n, 0))) for n in range(100)]) + (b'b',)
assert loads(dumps(ld)) == ld
ld = tuple([tuple(range(n)) for n in range(100)]) + (b'b',)
assert loads(dumps(ld)) == ld
ld = tuple([b'a' * n for n in range(1000)]) + (b'b',)
assert loads(dumps(ld)) == ld
ld = tuple([b'a' * n for n in range(1000)]) + (None, True, None)
assert loads(dumps(ld)) == ld
assert loads(dumps(None)) is None
assert loads(dumps({None: None})) == {None: None}
assert 1e-10 < abs(loads(dumps(1.1)) - 1.1) < 1e-6
assert 1e-10 < abs(loads(dumps(1.1, 32)) - 1.1) < 1e-6
assert abs(loads(dumps(1.1, 64)) - 1.1) < 1e-12
assert loads(dumps('Hello World!!'), decode_utf8=True)
try:
import psyco
psyco.bind(dumps)
@ -433,4 +471,4 @@ except ImportError:
if __name__ == '__main__':
test()
test()

View file

@ -1,41 +1,65 @@
import os
import sys
from unittest import TestCase
import pytest
from .client import DelugeRPCClient
from .client import DelugeRPCClient, RemoteException
class TestDelugeClient(TestCase):
def setUp(self):
if sys.version_info > (3,):
long = int
@pytest.fixture
def client(request):
if sys.platform.startswith('win'):
auth_path = os.path.join(os.getenv('APPDATA'), 'deluge', 'auth')
else:
auth_path = os.path.expanduser("~/.config/deluge/auth")
with open(auth_path, 'rb') as f:
filedata = f.read().decode("utf-8").split('\n')[0].split(':')
self.username, self.password = filedata[:2]
self.ip = '127.0.0.1'
self.port = 58846
self.client = DelugeRPCClient(self.ip, self.port, self.username, self.password)
def tearDown(self):
try:
self.client.disconnect()
except:
pass
def test_connect(self):
self.client.connect()
def test_call_method(self):
self.client.connect()
self.assertIsInstance(self.client.call('core.get_free_space'), int)
def test_call_method_arguments(self):
self.client.connect()
self.assertIsInstance(self.client.call('core.get_free_space', '/'), int)
def test_call_method_exception(self):
self.client.connect()
try:
self.client.call('core.get_free_space', '1', '2')
except Exception as e:
self.assertEqual('deluge_client.client', e.__module__)
with open(auth_path, 'rb') as f:
filedata = f.read().decode("utf-8").split('\n')[0].split(':')
username, password = filedata[:2]
ip = '127.0.0.1'
port = 58846
kwargs = {'decode_utf8': True}
if hasattr(request, 'param'):
kwargs.update(request.param)
client = DelugeRPCClient(ip, port, username, password, **kwargs)
client.connect()
yield client
try:
client.disconnect()
except:
pass
def test_connect(client):
assert client.connected
def test_call_method(client):
assert isinstance(client.call('core.get_free_space'), (int, long))
def test_call_method_arguments(client):
assert isinstance(client.call('core.get_free_space', '/'), (int, long))
@pytest.mark.parametrize('client',
[{'decode_utf8': True}, {'decode_utf8': False}],
ids=['decode_utf8_on', 'decode_utf8_off'],
indirect=True)
def test_call_method_exception(client):
with pytest.raises(RemoteException) as ex_info:
client.call('core.get_free_space', '1', '2')
assert ('takes at most 2 arguments' in str(ex_info.value) or
'takes from 1 to 2 positional arguments' in str(ex_info.value)) # deluge 2.0
def test_attr_caller(client):
assert isinstance(client.core.get_free_space(), (int, long))
assert isinstance(client.core.get_free_space('/'), (int, long))