mirror of
https://github.com/morpheus65535/bazarr
synced 2025-01-03 05:25:28 +00:00
eb296e13c1
* Use Hamming textdistance library Used Hamming textdistance to sort by closest match. * Global search UI improvements Increased dropdown height to show more results initially (and which can also be scrolled into view). Scrollbars will appear automatically as needed. Remove dropdown when Search box is cleared. * Added textdistance 4.6.2 library
847 lines
27 KiB
Python
847 lines
27 KiB
Python
from __future__ import annotations
|
|
|
|
# built-in
|
|
from collections import defaultdict
|
|
from itertools import zip_longest
|
|
from typing import Any, Sequence, TypeVar
|
|
|
|
# app
|
|
from .base import Base as _Base, BaseSimilarity as _BaseSimilarity
|
|
from .types import SimFunc, TestFunc
|
|
|
|
|
|
try:
|
|
# external
|
|
import numpy
|
|
except ImportError:
|
|
numpy = None # type: ignore[assignment]
|
|
|
|
|
|
__all__ = [
|
|
'Hamming', 'MLIPNS',
|
|
'Levenshtein', 'DamerauLevenshtein',
|
|
'Jaro', 'JaroWinkler', 'StrCmp95',
|
|
'NeedlemanWunsch', 'Gotoh', 'SmithWaterman',
|
|
|
|
'hamming', 'mlipns',
|
|
'levenshtein', 'damerau_levenshtein',
|
|
'jaro', 'jaro_winkler', 'strcmp95',
|
|
'needleman_wunsch', 'gotoh', 'smith_waterman',
|
|
]
|
|
T = TypeVar('T')
|
|
|
|
|
|
class Hamming(_Base):
|
|
"""
|
|
Compute the Hamming distance between the two or more sequences.
|
|
The Hamming distance is the number of differing items in ordered sequences.
|
|
|
|
https://en.wikipedia.org/wiki/Hamming_distance
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
qval: int = 1,
|
|
test_func: TestFunc | None = None,
|
|
truncate: bool = False,
|
|
external: bool = True,
|
|
) -> None:
|
|
self.qval = qval
|
|
self.test_func = test_func or self._ident
|
|
self.truncate = truncate
|
|
self.external = external
|
|
|
|
def __call__(self, *sequences: Sequence[object]) -> int:
|
|
sequences = self._get_sequences(*sequences)
|
|
|
|
result = self.quick_answer(*sequences)
|
|
if result is not None:
|
|
assert isinstance(result, int)
|
|
return result
|
|
|
|
_zip = zip if self.truncate else zip_longest
|
|
return sum(not self.test_func(*es) for es in _zip(*sequences))
|
|
|
|
|
|
class Levenshtein(_Base):
|
|
"""
|
|
Compute the absolute Levenshtein distance between the two sequences.
|
|
The Levenshtein distance is the minimum number of edit operations necessary
|
|
for transforming one sequence into the other. The edit operations allowed are:
|
|
|
|
* deletion: ABC -> BC, AC, AB
|
|
* insertion: ABC -> ABCD, EABC, AEBC..
|
|
* substitution: ABC -> ABE, ADC, FBC..
|
|
|
|
https://en.wikipedia.org/wiki/Levenshtein_distance
|
|
TODO: https://gist.github.com/kylebgorman/1081951/9b38b7743a3cb5167ab2c6608ac8eea7fc629dca
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
qval: int = 1,
|
|
test_func: TestFunc | None = None,
|
|
external: bool = True,
|
|
) -> None:
|
|
self.qval = qval
|
|
self.test_func = test_func or self._ident
|
|
self.external = external
|
|
|
|
def _recursive(self, s1: Sequence[T], s2: Sequence[T]) -> int:
|
|
# TODO: more than 2 sequences support
|
|
if not s1 or not s2:
|
|
return len(s1) + len(s2)
|
|
|
|
if self.test_func(s1[-1], s2[-1]):
|
|
return self(s1[:-1], s2[:-1])
|
|
|
|
# deletion/insertion
|
|
d = min(
|
|
self(s1[:-1], s2),
|
|
self(s1, s2[:-1]),
|
|
)
|
|
# substitution
|
|
s = self(s1[:-1], s2[:-1])
|
|
return min(d, s) + 1
|
|
|
|
def _cycled(self, s1: Sequence[T], s2: Sequence[T]) -> int:
|
|
"""
|
|
source:
|
|
https://github.com/jamesturk/jellyfish/blob/master/jellyfish/_jellyfish.py#L18
|
|
"""
|
|
rows = len(s1) + 1
|
|
cols = len(s2) + 1
|
|
prev = None
|
|
cur: Any
|
|
if numpy:
|
|
cur = numpy.arange(cols)
|
|
else:
|
|
cur = range(cols)
|
|
|
|
for r in range(1, rows):
|
|
prev, cur = cur, [r] + [0] * (cols - 1)
|
|
for c in range(1, cols):
|
|
deletion = prev[c] + 1
|
|
insertion = cur[c - 1] + 1
|
|
dist = self.test_func(s1[r - 1], s2[c - 1])
|
|
edit = prev[c - 1] + (not dist)
|
|
cur[c] = min(edit, deletion, insertion)
|
|
return int(cur[-1])
|
|
|
|
def __call__(self, s1: Sequence[T], s2: Sequence[T]) -> int:
|
|
s1, s2 = self._get_sequences(s1, s2)
|
|
|
|
result = self.quick_answer(s1, s2)
|
|
if result is not None:
|
|
assert isinstance(result, int)
|
|
return result
|
|
|
|
return self._cycled(s1, s2)
|
|
|
|
|
|
class DamerauLevenshtein(_Base):
|
|
"""
|
|
Compute the absolute Damerau-Levenshtein distance between the two sequences.
|
|
The Damerau-Levenshtein distance is the minimum number of edit operations necessary
|
|
for transforming one sequence into the other. The edit operations allowed are:
|
|
|
|
* deletion: ABC -> BC, AC, AB
|
|
* insertion: ABC -> ABCD, EABC, AEBC..
|
|
* substitution: ABC -> ABE, ADC, FBC..
|
|
* transposition: ABC -> ACB, BAC
|
|
|
|
If `restricted=False`, it will calculate unrestricted distance,
|
|
where the same character can be touched more than once.
|
|
So the distance between BA and ACB is 2: BA -> AB -> ACB.
|
|
|
|
https://en.wikipedia.org/wiki/Damerau%E2%80%93Levenshtein_distance
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
qval: int = 1,
|
|
test_func: TestFunc | None = None,
|
|
external: bool = True,
|
|
restricted: bool = True,
|
|
) -> None:
|
|
self.qval = qval
|
|
self.test_func = test_func or self._ident
|
|
self.external = external
|
|
self.restricted = restricted
|
|
|
|
def _numpy(self, s1: Sequence[T], s2: Sequence[T]) -> int:
|
|
# TODO: doesn't pass tests, need improve
|
|
d = numpy.zeros([len(s1) + 1, len(s2) + 1], dtype=int)
|
|
|
|
# matrix
|
|
for i in range(-1, len(s1) + 1):
|
|
d[i][-1] = i + 1
|
|
for j in range(-1, len(s2) + 1):
|
|
d[-1][j] = j + 1
|
|
|
|
for i, cs1 in enumerate(s1):
|
|
for j, cs2 in enumerate(s2):
|
|
cost = int(not self.test_func(cs1, cs2))
|
|
# ^ 0 if equal, 1 otherwise
|
|
|
|
d[i][j] = min(
|
|
d[i - 1][j] + 1, # deletion
|
|
d[i][j - 1] + 1, # insertion
|
|
d[i - 1][j - 1] + cost, # substitution
|
|
)
|
|
|
|
# transposition
|
|
if not i or not j:
|
|
continue
|
|
if not self.test_func(cs1, s2[j - 1]):
|
|
continue
|
|
d[i][j] = min(
|
|
d[i][j],
|
|
d[i - 2][j - 2] + cost,
|
|
)
|
|
|
|
return d[len(s1) - 1][len(s2) - 1]
|
|
|
|
def _pure_python_unrestricted(self, s1: Sequence[T], s2: Sequence[T]) -> int:
|
|
"""https://en.wikipedia.org/wiki/Damerau%E2%80%93Levenshtein_distance
|
|
"""
|
|
d: dict[tuple[int, int], int] = {}
|
|
da: dict[T, int] = {}
|
|
|
|
len1 = len(s1)
|
|
len2 = len(s2)
|
|
|
|
maxdist = len1 + len2
|
|
d[-1, -1] = maxdist
|
|
|
|
# matrix
|
|
for i in range(len(s1) + 1):
|
|
d[i, -1] = maxdist
|
|
d[i, 0] = i
|
|
for j in range(len(s2) + 1):
|
|
d[-1, j] = maxdist
|
|
d[0, j] = j
|
|
|
|
for i, cs1 in enumerate(s1, start=1):
|
|
db = 0
|
|
for j, cs2 in enumerate(s2, start=1):
|
|
i1 = da.get(cs2, 0)
|
|
j1 = db
|
|
if self.test_func(cs1, cs2):
|
|
cost = 0
|
|
db = j
|
|
else:
|
|
cost = 1
|
|
|
|
d[i, j] = min(
|
|
d[i - 1, j - 1] + cost, # substitution
|
|
d[i, j - 1] + 1, # insertion
|
|
d[i - 1, j] + 1, # deletion
|
|
d[i1 - 1, j1 - 1] + (i - i1) - 1 + (j - j1), # transposition
|
|
)
|
|
da[cs1] = i
|
|
|
|
return d[len1, len2]
|
|
|
|
def _pure_python_restricted(self, s1: Sequence[T], s2: Sequence[T]) -> int:
|
|
"""
|
|
https://www.guyrutenberg.com/2008/12/15/damerau-levenshtein-distance-in-python/
|
|
"""
|
|
d: dict[tuple[int, int], int] = {}
|
|
|
|
# matrix
|
|
for i in range(-1, len(s1) + 1):
|
|
d[i, -1] = i + 1
|
|
for j in range(-1, len(s2) + 1):
|
|
d[-1, j] = j + 1
|
|
|
|
for i, cs1 in enumerate(s1):
|
|
for j, cs2 in enumerate(s2):
|
|
cost = int(not self.test_func(cs1, cs2))
|
|
# ^ 0 if equal, 1 otherwise
|
|
|
|
d[i, j] = min(
|
|
d[i - 1, j] + 1, # deletion
|
|
d[i, j - 1] + 1, # insertion
|
|
d[i - 1, j - 1] + cost, # substitution
|
|
)
|
|
|
|
# transposition
|
|
if not i or not j:
|
|
continue
|
|
if not self.test_func(cs1, s2[j - 1]):
|
|
continue
|
|
if not self.test_func(s1[i - 1], cs2):
|
|
continue
|
|
d[i, j] = min(
|
|
d[i, j],
|
|
d[i - 2, j - 2] + cost,
|
|
)
|
|
|
|
return d[len(s1) - 1, len(s2) - 1]
|
|
|
|
def __call__(self, s1: Sequence[T], s2: Sequence[T]) -> int:
|
|
s1, s2 = self._get_sequences(s1, s2)
|
|
|
|
result = self.quick_answer(s1, s2)
|
|
if result is not None:
|
|
return result # type: ignore[return-value]
|
|
|
|
# if numpy:
|
|
# return self._numpy(s1, s2)
|
|
# else:
|
|
if self.restricted:
|
|
return self._pure_python_restricted(s1, s2)
|
|
return self._pure_python_unrestricted(s1, s2)
|
|
|
|
|
|
class JaroWinkler(_BaseSimilarity):
|
|
"""
|
|
Computes the Jaro-Winkler measure between two strings.
|
|
The Jaro-Winkler measure is designed to capture cases where two strings
|
|
have a low Jaro score, but share a prefix.
|
|
and thus are likely to match.
|
|
|
|
https://en.wikipedia.org/wiki/Jaro%E2%80%93Winkler_distance
|
|
https://github.com/Yomguithereal/talisman/blob/master/src/metrics/jaro.js
|
|
https://github.com/Yomguithereal/talisman/blob/master/src/metrics/jaro-winkler.js
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
long_tolerance: bool = False,
|
|
winklerize: bool = True,
|
|
qval: int = 1,
|
|
external: bool = True,
|
|
) -> None:
|
|
self.qval = qval
|
|
self.long_tolerance = long_tolerance
|
|
self.winklerize = winklerize
|
|
self.external = external
|
|
|
|
def maximum(self, *sequences: Sequence[object]) -> int:
|
|
return 1
|
|
|
|
def __call__(self, s1: Sequence[T], s2: Sequence[T], prefix_weight: float = 0.1) -> float:
|
|
s1, s2 = self._get_sequences(s1, s2)
|
|
|
|
result = self.quick_answer(s1, s2)
|
|
if result is not None:
|
|
return result
|
|
|
|
s1_len = len(s1)
|
|
s2_len = len(s2)
|
|
|
|
if not s1_len or not s2_len:
|
|
return 0.0
|
|
|
|
min_len = min(s1_len, s2_len)
|
|
search_range = max(s1_len, s2_len)
|
|
search_range = (search_range // 2) - 1
|
|
if search_range < 0:
|
|
search_range = 0
|
|
|
|
s1_flags = [False] * s1_len
|
|
s2_flags = [False] * s2_len
|
|
|
|
# looking only within search range, count & flag matched pairs
|
|
common_chars = 0
|
|
for i, s1_ch in enumerate(s1):
|
|
low = max(0, i - search_range)
|
|
hi = min(i + search_range, s2_len - 1)
|
|
for j in range(low, hi + 1):
|
|
if not s2_flags[j] and s2[j] == s1_ch:
|
|
s1_flags[i] = s2_flags[j] = True
|
|
common_chars += 1
|
|
break
|
|
|
|
# short circuit if no characters match
|
|
if not common_chars:
|
|
return 0.0
|
|
|
|
# count transpositions
|
|
k = trans_count = 0
|
|
for i, s1_f in enumerate(s1_flags):
|
|
if s1_f:
|
|
for j in range(k, s2_len):
|
|
if s2_flags[j]:
|
|
k = j + 1
|
|
break
|
|
if s1[i] != s2[j]:
|
|
trans_count += 1
|
|
trans_count //= 2
|
|
|
|
# adjust for similarities in nonmatched characters
|
|
weight = common_chars / s1_len + common_chars / s2_len
|
|
weight += (common_chars - trans_count) / common_chars
|
|
weight /= 3
|
|
|
|
# stop to boost if strings are not similar
|
|
if not self.winklerize:
|
|
return weight
|
|
if weight <= 0.7:
|
|
return weight
|
|
|
|
# winkler modification
|
|
# adjust for up to first 4 chars in common
|
|
j = min(min_len, 4)
|
|
i = 0
|
|
while i < j and s1[i] == s2[i]:
|
|
i += 1
|
|
if i:
|
|
weight += i * prefix_weight * (1.0 - weight)
|
|
|
|
# optionally adjust for long strings
|
|
# after agreeing beginning chars, at least two or more must agree and
|
|
# agreed characters must be > half of remaining characters
|
|
if not self.long_tolerance or min_len <= 4:
|
|
return weight
|
|
if common_chars <= i + 1 or 2 * common_chars < min_len + i:
|
|
return weight
|
|
tmp = (common_chars - i - 1) / (s1_len + s2_len - i * 2 + 2)
|
|
weight += (1.0 - weight) * tmp
|
|
return weight
|
|
|
|
|
|
class Jaro(JaroWinkler):
|
|
def __init__(
|
|
self,
|
|
long_tolerance: bool = False,
|
|
qval: int = 1,
|
|
external: bool = True,
|
|
) -> None:
|
|
super().__init__(
|
|
long_tolerance=long_tolerance,
|
|
winklerize=False,
|
|
qval=qval,
|
|
external=external,
|
|
)
|
|
|
|
|
|
class NeedlemanWunsch(_BaseSimilarity):
|
|
"""
|
|
Computes the Needleman-Wunsch measure between two strings.
|
|
The Needleman-Wunsch generalizes the Levenshtein distance and considers global
|
|
alignment between two strings. Specifically, it is computed by assigning
|
|
a score to each alignment between two input strings and choosing the
|
|
score of the best alignment, that is, the maximal score.
|
|
An alignment between two strings is a set of correspondences between the
|
|
characters of between them, allowing for gaps.
|
|
|
|
https://en.wikipedia.org/wiki/Needleman%E2%80%93Wunsch_algorithm
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
gap_cost: float = 1.0,
|
|
sim_func: SimFunc = None,
|
|
qval: int = 1,
|
|
external: bool = True,
|
|
) -> None:
|
|
self.qval = qval
|
|
self.gap_cost = gap_cost
|
|
if sim_func:
|
|
self.sim_func = sim_func
|
|
else:
|
|
self.sim_func = self._ident
|
|
self.external = external
|
|
|
|
def minimum(self, *sequences: Sequence[object]) -> float:
|
|
return -max(map(len, sequences)) * self.gap_cost
|
|
|
|
def maximum(self, *sequences: Sequence[object]) -> float:
|
|
return max(map(len, sequences))
|
|
|
|
def distance(self, *sequences: Sequence[object]) -> float:
|
|
"""Get distance between sequences
|
|
"""
|
|
return -1 * self.similarity(*sequences)
|
|
|
|
def normalized_distance(self, *sequences: Sequence[object]) -> float:
|
|
"""Get distance from 0 to 1
|
|
"""
|
|
minimum = self.minimum(*sequences)
|
|
maximum = self.maximum(*sequences)
|
|
if maximum == 0:
|
|
return 0
|
|
return (self.distance(*sequences) - minimum) / (maximum - minimum)
|
|
|
|
def normalized_similarity(self, *sequences: Sequence[object]) -> float:
|
|
"""Get similarity from 0 to 1
|
|
"""
|
|
minimum = self.minimum(*sequences)
|
|
maximum = self.maximum(*sequences)
|
|
if maximum == 0:
|
|
return 1
|
|
return (self.similarity(*sequences) - minimum) / (maximum * 2)
|
|
|
|
def __call__(self, s1: Sequence[T], s2: Sequence[T]) -> float:
|
|
if not numpy:
|
|
raise ImportError('Please, install numpy for Needleman-Wunsch measure')
|
|
|
|
s1, s2 = self._get_sequences(s1, s2)
|
|
|
|
# result = self.quick_answer(s1, s2)
|
|
# if result is not None:
|
|
# return result * self.maximum(s1, s2)
|
|
|
|
dist_mat = numpy.zeros(
|
|
(len(s1) + 1, len(s2) + 1),
|
|
dtype=float,
|
|
)
|
|
# DP initialization
|
|
for i in range(len(s1) + 1):
|
|
dist_mat[i, 0] = -(i * self.gap_cost)
|
|
# DP initialization
|
|
for j in range(len(s2) + 1):
|
|
dist_mat[0, j] = -(j * self.gap_cost)
|
|
# Needleman-Wunsch DP calculation
|
|
for i, c1 in enumerate(s1, 1):
|
|
for j, c2 in enumerate(s2, 1):
|
|
match = dist_mat[i - 1, j - 1] + self.sim_func(c1, c2)
|
|
delete = dist_mat[i - 1, j] - self.gap_cost
|
|
insert = dist_mat[i, j - 1] - self.gap_cost
|
|
dist_mat[i, j] = max(match, delete, insert)
|
|
return dist_mat[dist_mat.shape[0] - 1, dist_mat.shape[1] - 1]
|
|
|
|
|
|
class SmithWaterman(_BaseSimilarity):
|
|
"""
|
|
Computes the Smith-Waterman measure between two strings.
|
|
The Smith-Waterman algorithm performs local sequence alignment;
|
|
that is, for determining similar regions between two strings.
|
|
Instead of looking at the total sequence, the Smith-Waterman algorithm compares
|
|
segments of all possible lengths and optimizes the similarity measure.
|
|
|
|
https://en.wikipedia.org/wiki/Smith%E2%80%93Waterman_algorithm
|
|
https://github.com/Yomguithereal/talisman/blob/master/src/metrics/smith-waterman.js
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
gap_cost: float = 1.0,
|
|
sim_func: SimFunc = None,
|
|
qval: int = 1,
|
|
external: bool = True,
|
|
) -> None:
|
|
self.qval = qval
|
|
self.gap_cost = gap_cost
|
|
self.sim_func = sim_func or self._ident
|
|
self.external = external
|
|
|
|
def maximum(self, *sequences: Sequence[object]) -> int:
|
|
return min(map(len, sequences))
|
|
|
|
def __call__(self, s1: Sequence[T], s2: Sequence[T]) -> float:
|
|
if not numpy:
|
|
raise ImportError('Please, install numpy for Smith-Waterman measure')
|
|
|
|
s1, s2 = self._get_sequences(s1, s2)
|
|
|
|
result = self.quick_answer(s1, s2)
|
|
if result is not None:
|
|
return result
|
|
|
|
dist_mat = numpy.zeros(
|
|
(len(s1) + 1, len(s2) + 1),
|
|
dtype=float,
|
|
)
|
|
for i, sc1 in enumerate(s1, start=1):
|
|
for j, sc2 in enumerate(s2, start=1):
|
|
# The score for substituting the letter a[i - 1] for b[j - 1].
|
|
# Generally low for mismatch, high for match.
|
|
match = dist_mat[i - 1, j - 1] + self.sim_func(sc1, sc2)
|
|
# The scores for for introducing extra letters in one of the strings
|
|
# (or by symmetry, deleting them from the other).
|
|
delete = dist_mat[i - 1, j] - self.gap_cost
|
|
insert = dist_mat[i, j - 1] - self.gap_cost
|
|
dist_mat[i, j] = max(0, match, delete, insert)
|
|
return dist_mat[dist_mat.shape[0] - 1, dist_mat.shape[1] - 1]
|
|
|
|
|
|
class Gotoh(NeedlemanWunsch):
|
|
"""Gotoh score
|
|
Gotoh's algorithm is essentially Needleman-Wunsch with affine gap
|
|
penalties:
|
|
https://www.cs.umd.edu/class/spring2003/cmsc838t/papers/gotoh1982.pdf
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
gap_open: int = 1,
|
|
gap_ext: float = 0.4,
|
|
sim_func: SimFunc = None,
|
|
qval: int = 1,
|
|
external: bool = True,
|
|
) -> None:
|
|
self.qval = qval
|
|
self.gap_open = gap_open
|
|
self.gap_ext = gap_ext
|
|
if sim_func:
|
|
self.sim_func = sim_func
|
|
else:
|
|
self.sim_func = self._ident
|
|
self.external = external
|
|
|
|
def minimum(self, *sequences: Sequence[object]) -> int:
|
|
return -min(map(len, sequences))
|
|
|
|
def maximum(self, *sequences: Sequence[object]) -> int:
|
|
return min(map(len, sequences))
|
|
|
|
def __call__(self, s1: Sequence[T], s2: Sequence[T]) -> float:
|
|
if not numpy:
|
|
raise ImportError('Please, install numpy for Gotoh measure')
|
|
|
|
s1, s2 = self._get_sequences(s1, s2)
|
|
|
|
# result = self.quick_answer(s1, s2)
|
|
# if result is not None:
|
|
# return result * self.maximum(s1, s2)
|
|
|
|
len_s1 = len(s1)
|
|
len_s2 = len(s2)
|
|
d_mat = numpy.zeros((len_s1 + 1, len_s2 + 1), dtype=float)
|
|
p_mat = numpy.zeros((len_s1 + 1, len_s2 + 1), dtype=float)
|
|
q_mat = numpy.zeros((len_s1 + 1, len_s2 + 1), dtype=float)
|
|
|
|
d_mat[0, 0] = 0
|
|
p_mat[0, 0] = float('-inf')
|
|
q_mat[0, 0] = float('-inf')
|
|
for i in range(1, len_s1 + 1):
|
|
d_mat[i, 0] = float('-inf')
|
|
p_mat[i, 0] = -self.gap_open - self.gap_ext * (i - 1)
|
|
q_mat[i, 0] = float('-inf')
|
|
q_mat[i, 1] = -self.gap_open
|
|
for j in range(1, len_s2 + 1):
|
|
d_mat[0, j] = float('-inf')
|
|
p_mat[0, j] = float('-inf')
|
|
p_mat[1, j] = -self.gap_open
|
|
q_mat[0, j] = -self.gap_open - self.gap_ext * (j - 1)
|
|
|
|
for i, sc1 in enumerate(s1, start=1):
|
|
for j, sc2 in enumerate(s2, start=1):
|
|
sim_val = self.sim_func(sc1, sc2)
|
|
d_mat[i, j] = max(
|
|
d_mat[i - 1, j - 1] + sim_val,
|
|
p_mat[i - 1, j - 1] + sim_val,
|
|
q_mat[i - 1, j - 1] + sim_val,
|
|
)
|
|
p_mat[i, j] = max(
|
|
d_mat[i - 1, j] - self.gap_open,
|
|
p_mat[i - 1, j] - self.gap_ext,
|
|
)
|
|
q_mat[i, j] = max(
|
|
d_mat[i, j - 1] - self.gap_open,
|
|
q_mat[i, j - 1] - self.gap_ext,
|
|
)
|
|
|
|
i, j = (n - 1 for n in d_mat.shape)
|
|
return max(d_mat[i, j], p_mat[i, j], q_mat[i, j])
|
|
|
|
|
|
class StrCmp95(_BaseSimilarity):
|
|
"""strcmp95 similarity
|
|
|
|
http://cpansearch.perl.org/src/SCW/Text-JaroWinkler-0.1/strcmp95.c
|
|
"""
|
|
sp_mx: tuple[tuple[str, str], ...] = (
|
|
('A', 'E'), ('A', 'I'), ('A', 'O'), ('A', 'U'), ('B', 'V'), ('E', 'I'),
|
|
('E', 'O'), ('E', 'U'), ('I', 'O'), ('I', 'U'), ('O', 'U'), ('I', 'Y'),
|
|
('E', 'Y'), ('C', 'G'), ('E', 'F'), ('W', 'U'), ('W', 'V'), ('X', 'K'),
|
|
('S', 'Z'), ('X', 'S'), ('Q', 'C'), ('U', 'V'), ('M', 'N'), ('L', 'I'),
|
|
('Q', 'O'), ('P', 'R'), ('I', 'J'), ('2', 'Z'), ('5', 'S'), ('8', 'B'),
|
|
('1', 'I'), ('1', 'L'), ('0', 'O'), ('0', 'Q'), ('C', 'K'), ('G', 'J'),
|
|
)
|
|
|
|
def __init__(self, long_strings: bool = False, external: bool = True) -> None:
|
|
self.long_strings = long_strings
|
|
self.external = external
|
|
|
|
def maximum(self, *sequences: Sequence[object]) -> int:
|
|
return 1
|
|
|
|
@staticmethod
|
|
def _in_range(char) -> bool:
|
|
return 0 < ord(char) < 91
|
|
|
|
def __call__(self, s1: str, s2: str) -> float:
|
|
s1 = s1.strip().upper()
|
|
s2 = s2.strip().upper()
|
|
|
|
result = self.quick_answer(s1, s2)
|
|
if result is not None:
|
|
return result
|
|
|
|
len_s1 = len(s1)
|
|
len_s2 = len(s2)
|
|
|
|
adjwt = defaultdict(int)
|
|
|
|
# Initialize the adjwt array on the first call to the function only.
|
|
# The adjwt array is used to give partial credit for characters that
|
|
# may be errors due to known phonetic or character recognition errors.
|
|
# A typical example is to match the letter "O" with the number "0"
|
|
for c1, c2 in self.sp_mx:
|
|
adjwt[c1, c2] = 3
|
|
adjwt[c2, c1] = 3
|
|
|
|
if len_s1 > len_s2:
|
|
search_range = len_s1
|
|
minv = len_s2
|
|
else:
|
|
search_range = len_s2
|
|
minv = len_s1
|
|
|
|
# Blank out the flags
|
|
s1_flag = [0] * search_range
|
|
s2_flag = [0] * search_range
|
|
search_range = max(0, search_range // 2 - 1)
|
|
|
|
# Looking only within the search range, count and flag the matched pairs.
|
|
num_com = 0
|
|
yl1 = len_s2 - 1
|
|
for i, sc1 in enumerate(s1):
|
|
lowlim = max(i - search_range, 0)
|
|
hilim = min(i + search_range, yl1)
|
|
for j in range(lowlim, hilim + 1):
|
|
if s2_flag[j] == 0 and s2[j] == sc1:
|
|
s2_flag[j] = 1
|
|
s1_flag[i] = 1
|
|
num_com += 1
|
|
break
|
|
|
|
# If no characters in common - return
|
|
if num_com == 0:
|
|
return 0.0
|
|
|
|
# Count the number of transpositions
|
|
k = n_trans = 0
|
|
for i, sc1 in enumerate(s1):
|
|
if not s1_flag[i]:
|
|
continue
|
|
for j in range(k, len_s2):
|
|
if s2_flag[j] != 0:
|
|
k = j + 1
|
|
break
|
|
if sc1 != s2[j]:
|
|
n_trans += 1
|
|
n_trans = n_trans // 2
|
|
|
|
# Adjust for similarities in unmatched characters
|
|
n_simi = 0
|
|
if minv > num_com:
|
|
for i in range(len_s1):
|
|
if s1_flag[i] != 0:
|
|
continue
|
|
if not self._in_range(s1[i]):
|
|
continue
|
|
for j in range(len_s2):
|
|
if s2_flag[j] != 0:
|
|
continue
|
|
if not self._in_range(s2[j]):
|
|
continue
|
|
if (s1[i], s2[j]) not in adjwt:
|
|
continue
|
|
n_simi += adjwt[s1[i], s2[j]]
|
|
s2_flag[j] = 2
|
|
break
|
|
num_sim = n_simi / 10.0 + num_com
|
|
|
|
# Main weight computation
|
|
weight = num_sim / len_s1 + num_sim / len_s2
|
|
weight += (num_com - n_trans) / num_com
|
|
weight = weight / 3.0
|
|
|
|
# Continue to boost the weight if the strings are similar
|
|
if weight <= 0.7:
|
|
return weight
|
|
|
|
# Adjust for having up to the first 4 characters in common
|
|
j = min(minv, 4)
|
|
i = 0
|
|
for sc1, sc2 in zip(s1, s2):
|
|
if i >= j:
|
|
break
|
|
if sc1 != sc2:
|
|
break
|
|
if sc1.isdigit():
|
|
break
|
|
i += 1
|
|
if i:
|
|
weight += i * 0.1 * (1.0 - weight)
|
|
|
|
# Optionally adjust for long strings.
|
|
|
|
# After agreeing beginning chars, at least two more must agree and
|
|
# the agreeing characters must be > .5 of remaining characters.
|
|
if not self.long_strings:
|
|
return weight
|
|
if minv <= 4:
|
|
return weight
|
|
if num_com <= i + 1 or 2 * num_com < minv + i:
|
|
return weight
|
|
if s1[0].isdigit():
|
|
return weight
|
|
res = (num_com - i - 1) / (len_s1 + len_s2 - i * 2 + 2)
|
|
weight += (1.0 - weight) * res
|
|
return weight
|
|
|
|
|
|
class MLIPNS(_BaseSimilarity):
|
|
"""
|
|
Compute the Hamming distance between the two or more sequences.
|
|
The Hamming distance is the number of differing items in ordered sequences.
|
|
|
|
http://www.sial.iias.spb.su/files/386-386-1-PB.pdf
|
|
https://github.com/Yomguithereal/talisman/blob/master/src/metrics/mlipns.js
|
|
"""
|
|
|
|
def __init__(
|
|
self, threshold: float = 0.25,
|
|
maxmismatches: int = 2,
|
|
qval: int = 1,
|
|
external: bool = True,
|
|
) -> None:
|
|
self.qval = qval
|
|
self.threshold = threshold
|
|
self.maxmismatches = maxmismatches
|
|
self.external = external
|
|
|
|
def maximum(self, *sequences: Sequence[object]) -> int:
|
|
return 1
|
|
|
|
def __call__(self, *sequences: Sequence[object]) -> float:
|
|
sequences = self._get_sequences(*sequences)
|
|
|
|
result = self.quick_answer(*sequences)
|
|
if result is not None:
|
|
return result
|
|
|
|
mismatches = 0
|
|
ham = Hamming()(*sequences)
|
|
maxlen = max(map(len, sequences))
|
|
while all(sequences) and mismatches <= self.maxmismatches:
|
|
if not maxlen:
|
|
return 1
|
|
if 1 - (maxlen - ham) / maxlen <= self.threshold:
|
|
return 1
|
|
mismatches += 1
|
|
ham -= 1
|
|
maxlen -= 1
|
|
|
|
if not maxlen:
|
|
return 1
|
|
return 0
|
|
|
|
|
|
hamming = Hamming()
|
|
levenshtein = Levenshtein()
|
|
damerau = damerau_levenshtein = DamerauLevenshtein()
|
|
jaro = Jaro()
|
|
jaro_winkler = JaroWinkler()
|
|
needleman_wunsch = NeedlemanWunsch()
|
|
smith_waterman = SmithWaterman()
|
|
gotoh = Gotoh()
|
|
strcmp95 = StrCmp95()
|
|
mlipns = MLIPNS()
|