added textdistance dependency

This commit is contained in:
morpheus65535 2024-05-02 14:39:57 -04:00
parent 85866fc063
commit 13343c432d
24 changed files with 3148 additions and 0 deletions

View File

@ -0,0 +1 @@
pip

View File

@ -0,0 +1,7 @@
Copyright 2018 @orsinium
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

View File

@ -0,0 +1,402 @@
Metadata-Version: 2.1
Name: textdistance
Version: 4.6.2
Summary: Compute distance between the two texts.
Home-page: https://github.com/orsinium/textdistance
Download-URL: https://github.com/orsinium/textdistance/tarball/master
Author: orsinium
Author-email: gram@orsinium.dev
License: MIT
Keywords: distance between text strings sequences iterators
Classifier: Development Status :: 5 - Production/Stable
Classifier: Environment :: Plugins
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python
Classifier: Topic :: Scientific/Engineering :: Human Machine Interfaces
Requires-Python: >=3.5
Description-Content-Type: text/markdown
License-File: LICENSE
Provides-Extra: dameraulevenshtein
Requires-Dist: rapidfuzz >=2.6.0 ; extra == 'dameraulevenshtein'
Requires-Dist: jellyfish ; extra == 'dameraulevenshtein'
Requires-Dist: pyxDamerauLevenshtein ; extra == 'dameraulevenshtein'
Provides-Extra: hamming
Requires-Dist: Levenshtein ; extra == 'hamming'
Requires-Dist: rapidfuzz >=2.6.0 ; extra == 'hamming'
Requires-Dist: jellyfish ; extra == 'hamming'
Requires-Dist: distance ; extra == 'hamming'
Provides-Extra: jaro
Requires-Dist: rapidfuzz >=2.6.0 ; extra == 'jaro'
Requires-Dist: Levenshtein ; extra == 'jaro'
Provides-Extra: jarowinkler
Requires-Dist: rapidfuzz >=2.6.0 ; extra == 'jarowinkler'
Requires-Dist: jellyfish ; extra == 'jarowinkler'
Provides-Extra: levenshtein
Requires-Dist: rapidfuzz >=2.6.0 ; extra == 'levenshtein'
Requires-Dist: Levenshtein ; extra == 'levenshtein'
Provides-Extra: all
Requires-Dist: jellyfish ; extra == 'all'
Requires-Dist: numpy ; extra == 'all'
Requires-Dist: Levenshtein ; extra == 'all'
Requires-Dist: pyxDamerauLevenshtein ; extra == 'all'
Requires-Dist: rapidfuzz >=2.6.0 ; extra == 'all'
Requires-Dist: distance ; extra == 'all'
Requires-Dist: pylev ; extra == 'all'
Requires-Dist: py-stringmatching ; extra == 'all'
Requires-Dist: tabulate ; extra == 'all'
Provides-Extra: benchmark
Requires-Dist: jellyfish ; extra == 'benchmark'
Requires-Dist: numpy ; extra == 'benchmark'
Requires-Dist: Levenshtein ; extra == 'benchmark'
Requires-Dist: pyxDamerauLevenshtein ; extra == 'benchmark'
Requires-Dist: rapidfuzz >=2.6.0 ; extra == 'benchmark'
Requires-Dist: distance ; extra == 'benchmark'
Requires-Dist: pylev ; extra == 'benchmark'
Requires-Dist: py-stringmatching ; extra == 'benchmark'
Requires-Dist: tabulate ; extra == 'benchmark'
Provides-Extra: benchmarks
Requires-Dist: jellyfish ; extra == 'benchmarks'
Requires-Dist: numpy ; extra == 'benchmarks'
Requires-Dist: Levenshtein ; extra == 'benchmarks'
Requires-Dist: pyxDamerauLevenshtein ; extra == 'benchmarks'
Requires-Dist: rapidfuzz >=2.6.0 ; extra == 'benchmarks'
Requires-Dist: distance ; extra == 'benchmarks'
Requires-Dist: pylev ; extra == 'benchmarks'
Requires-Dist: py-stringmatching ; extra == 'benchmarks'
Requires-Dist: tabulate ; extra == 'benchmarks'
Provides-Extra: common
Requires-Dist: jellyfish ; extra == 'common'
Requires-Dist: numpy ; extra == 'common'
Requires-Dist: Levenshtein ; extra == 'common'
Requires-Dist: pyxDamerauLevenshtein ; extra == 'common'
Requires-Dist: rapidfuzz >=2.6.0 ; extra == 'common'
Provides-Extra: extra
Requires-Dist: jellyfish ; extra == 'extra'
Requires-Dist: numpy ; extra == 'extra'
Requires-Dist: Levenshtein ; extra == 'extra'
Requires-Dist: pyxDamerauLevenshtein ; extra == 'extra'
Requires-Dist: rapidfuzz >=2.6.0 ; extra == 'extra'
Provides-Extra: extras
Requires-Dist: jellyfish ; extra == 'extras'
Requires-Dist: numpy ; extra == 'extras'
Requires-Dist: Levenshtein ; extra == 'extras'
Requires-Dist: pyxDamerauLevenshtein ; extra == 'extras'
Requires-Dist: rapidfuzz >=2.6.0 ; extra == 'extras'
Provides-Extra: lint
Requires-Dist: twine ; extra == 'lint'
Requires-Dist: mypy ; extra == 'lint'
Requires-Dist: isort ; extra == 'lint'
Requires-Dist: flake8 ; extra == 'lint'
Requires-Dist: types-tabulate ; extra == 'lint'
Requires-Dist: flake8-blind-except ; extra == 'lint'
Requires-Dist: flake8-bugbear ; extra == 'lint'
Requires-Dist: flake8-commas ; extra == 'lint'
Requires-Dist: flake8-logging-format ; extra == 'lint'
Requires-Dist: flake8-mutable ; extra == 'lint'
Requires-Dist: flake8-pep3101 ; extra == 'lint'
Requires-Dist: flake8-quotes ; extra == 'lint'
Requires-Dist: flake8-string-format ; extra == 'lint'
Requires-Dist: flake8-tidy-imports ; extra == 'lint'
Requires-Dist: pep8-naming ; extra == 'lint'
Provides-Extra: test
Requires-Dist: hypothesis ; extra == 'test'
Requires-Dist: isort ; extra == 'test'
Requires-Dist: numpy ; extra == 'test'
Requires-Dist: pytest ; extra == 'test'
# TextDistance
![TextDistance logo](logo.png)
[![Build Status](https://travis-ci.org/life4/textdistance.svg?branch=master)](https://travis-ci.org/life4/textdistance) [![PyPI version](https://img.shields.io/pypi/v/textdistance.svg)](https://pypi.python.org/pypi/textdistance) [![Status](https://img.shields.io/pypi/status/textdistance.svg)](https://pypi.python.org/pypi/textdistance) [![License](https://img.shields.io/pypi/l/textdistance.svg)](LICENSE)
**TextDistance** -- python library for comparing distance between two or more sequences by many algorithms.
Features:
- 30+ algorithms
- Pure python implementation
- Simple usage
- More than two sequences comparing
- Some algorithms have more than one implementation in one class.
- Optional numpy usage for maximum speed.
## Algorithms
### Edit based
| Algorithm | Class | Functions |
|-------------------------------------------------------------------------------------------|----------------------|------------------------|
| [Hamming](https://en.wikipedia.org/wiki/Hamming_distance) | `Hamming` | `hamming` |
| [MLIPNS](http://www.sial.iias.spb.su/files/386-386-1-PB.pdf) | `Mlipns` | `mlipns` |
| [Levenshtein](https://en.wikipedia.org/wiki/Levenshtein_distance) | `Levenshtein` | `levenshtein` |
| [Damerau-Levenshtein](https://en.wikipedia.org/wiki/Damerau%E2%80%93Levenshtein_distance) | `DamerauLevenshtein` | `damerau_levenshtein` |
| [Jaro-Winkler](https://en.wikipedia.org/wiki/Jaro%E2%80%93Winkler_distance) | `JaroWinkler` | `jaro_winkler`, `jaro` |
| [Strcmp95](http://cpansearch.perl.org/src/SCW/Text-JaroWinkler-0.1/strcmp95.c) | `StrCmp95` | `strcmp95` |
| [Needleman-Wunsch](https://en.wikipedia.org/wiki/Needleman%E2%80%93Wunsch_algorithm) | `NeedlemanWunsch` | `needleman_wunsch` |
| [Gotoh](http://bioinfo.ict.ac.cn/~dbu/AlgorithmCourses/Lectures/LOA/Lec6-Sequence-Alignment-Affine-Gaps-Gotoh1982.pdf) | `Gotoh` | `gotoh` |
| [Smith-Waterman](https://en.wikipedia.org/wiki/Smith%E2%80%93Waterman_algorithm) | `SmithWaterman` | `smith_waterman` |
### Token based
| Algorithm | Class | Functions |
|-------------------------------------------------------------------------------------------|----------------------|---------------|
| [Jaccard index](https://en.wikipedia.org/wiki/Jaccard_index) | `Jaccard` | `jaccard` |
| [SørensenDice coefficient](https://en.wikipedia.org/wiki/S%C3%B8rensen%E2%80%93Dice_coefficient) | `Sorensen` | `sorensen`, `sorensen_dice`, `dice` |
| [Tversky index](https://en.wikipedia.org/wiki/Tversky_index) | `Tversky` | `tversky` |
| [Overlap coefficient](https://en.wikipedia.org/wiki/Overlap_coefficient) | `Overlap` | `overlap` |
| [Tanimoto distance](https://en.wikipedia.org/wiki/Jaccard_index#Tanimoto_similarity_and_distance) | `Tanimoto` | `tanimoto` |
| [Cosine similarity](https://en.wikipedia.org/wiki/Cosine_similarity) | `Cosine` | `cosine` |
| [Monge-Elkan](https://www.academia.edu/200314/Generalized_Monge-Elkan_Method_for_Approximate_Text_String_Comparison) | `MongeElkan` | `monge_elkan` |
| [Bag distance](https://github.com/Yomguithereal/talisman/blob/master/src/metrics/bag.js) | `Bag` | `bag` |
### Sequence based
| Algorithm | Class | Functions |
|-----------|-------|-----------|
| [longest common subsequence similarity](https://en.wikipedia.org/wiki/Longest_common_subsequence_problem) | `LCSSeq` | `lcsseq` |
| [longest common substring similarity](https://docs.python.org/2/library/difflib.html#difflib.SequenceMatcher) | `LCSStr` | `lcsstr` |
| [Ratcliff-Obershelp similarity](https://en.wikipedia.org/wiki/Gestalt_Pattern_Matching) | `RatcliffObershelp` | `ratcliff_obershelp` |
### Compression based
[Normalized compression distance](https://en.wikipedia.org/wiki/Normalized_compression_distance#Normalized_compression_distance) with different compression algorithms.
Classic compression algorithms:
| Algorithm | Class | Function |
|----------------------------------------------------------------------------|-------------|--------------|
| [Arithmetic coding](https://en.wikipedia.org/wiki/Arithmetic_coding) | `ArithNCD` | `arith_ncd` |
| [RLE](https://en.wikipedia.org/wiki/Run-length_encoding) | `RLENCD` | `rle_ncd` |
| [BWT RLE](https://en.wikipedia.org/wiki/Burrows%E2%80%93Wheeler_transform) | `BWTRLENCD` | `bwtrle_ncd` |
Normal compression algorithms:
| Algorithm | Class | Function |
|----------------------------------------------------------------------------|--------------|---------------|
| Square Root | `SqrtNCD` | `sqrt_ncd` |
| [Entropy](https://en.wikipedia.org/wiki/Entropy_(information_theory)) | `EntropyNCD` | `entropy_ncd` |
Work in progress algorithms that compare two strings as array of bits:
| Algorithm | Class | Function |
|--------------------------------------------|-----------|------------|
| [BZ2](https://en.wikipedia.org/wiki/Bzip2) | `BZ2NCD` | `bz2_ncd` |
| [LZMA](https://en.wikipedia.org/wiki/LZMA) | `LZMANCD` | `lzma_ncd` |
| [ZLib](https://en.wikipedia.org/wiki/Zlib) | `ZLIBNCD` | `zlib_ncd` |
See [blog post](https://articles.life4web.ru/other/ncd/) for more details about NCD.
### Phonetic
| Algorithm | Class | Functions |
|------------------------------------------------------------------------------|----------|-----------|
| [MRA](https://en.wikipedia.org/wiki/Match_rating_approach) | `MRA` | `mra` |
| [Editex](https://anhaidgroup.github.io/py_stringmatching/v0.3.x/Editex.html) | `Editex` | `editex` |
### Simple
| Algorithm | Class | Functions |
|---------------------|------------|------------|
| Prefix similarity | `Prefix` | `prefix` |
| Postfix similarity | `Postfix` | `postfix` |
| Length distance | `Length` | `length` |
| Identity similarity | `Identity` | `identity` |
| Matrix similarity | `Matrix` | `matrix` |
## Installation
### Stable
Only pure python implementation:
```bash
pip install textdistance
```
With extra libraries for maximum speed:
```bash
pip install "textdistance[extras]"
```
With all libraries (required for [benchmarking](#benchmarks) and [testing](#running-tests)):
```bash
pip install "textdistance[benchmark]"
```
With algorithm specific extras:
```bash
pip install "textdistance[Hamming]"
```
Algorithms with available extras: `DamerauLevenshtein`, `Hamming`, `Jaro`, `JaroWinkler`, `Levenshtein`.
### Dev
Via pip:
```bash
pip install -e git+https://github.com/life4/textdistance.git#egg=textdistance
```
Or clone repo and install with some extras:
```bash
git clone https://github.com/life4/textdistance.git
pip install -e ".[benchmark]"
```
## Usage
All algorithms have 2 interfaces:
1. Class with algorithm-specific params for customizing.
1. Class instance with default params for quick and simple usage.
All algorithms have some common methods:
1. `.distance(*sequences)` -- calculate distance between sequences.
1. `.similarity(*sequences)` -- calculate similarity for sequences.
1. `.maximum(*sequences)` -- maximum possible value for distance and similarity. For any sequence: `distance + similarity == maximum`.
1. `.normalized_distance(*sequences)` -- normalized distance between sequences. The return value is a float between 0 and 1, where 0 means equal, and 1 totally different.
1. `.normalized_similarity(*sequences)` -- normalized similarity for sequences. The return value is a float between 0 and 1, where 0 means totally different, and 1 equal.
Most common init arguments:
1. `qval` -- q-value for split sequences into q-grams. Possible values:
- 1 (default) -- compare sequences by chars.
- 2 or more -- transform sequences to q-grams.
- None -- split sequences by words.
1. `as_set` -- for token-based algorithms:
- True -- `t` and `ttt` is equal.
- False (default) -- `t` and `ttt` is different.
## Examples
For example, [Hamming distance](https://en.wikipedia.org/wiki/Hamming_distance):
```python
import textdistance
textdistance.hamming('test', 'text')
# 1
textdistance.hamming.distance('test', 'text')
# 1
textdistance.hamming.similarity('test', 'text')
# 3
textdistance.hamming.normalized_distance('test', 'text')
# 0.25
textdistance.hamming.normalized_similarity('test', 'text')
# 0.75
textdistance.Hamming(qval=2).distance('test', 'text')
# 2
```
Any other algorithms have same interface.
## Articles
A few articles with examples how to use textdistance in the real world:
- [Guide to Fuzzy Matching with Python](http://theautomatic.net/2019/11/13/guide-to-fuzzy-matching-with-python/)
- [String similarity — the basic know your algorithms guide!](https://itnext.io/string-similarity-the-basic-know-your-algorithms-guide-3de3d7346227)
- [Normalized compression distance](https://articles.life4web.ru/other/ncd/)
## Extra libraries
For main algorithms textdistance try to call known external libraries (fastest first) if available (installed in your system) and possible (this implementation can compare this type of sequences). [Install](#installation) textdistance with extras for this feature.
You can disable this by passing `external=False` argument on init:
```python3
import textdistance
hamming = textdistance.Hamming(external=False)
hamming('text', 'testit')
# 3
```
Supported libraries:
1. [Distance](https://github.com/doukremt/distance)
1. [jellyfish](https://github.com/jamesturk/jellyfish)
1. [py_stringmatching](https://github.com/anhaidgroup/py_stringmatching)
1. [pylev](https://github.com/toastdriven/pylev)
1. [Levenshtein](https://github.com/maxbachmann/Levenshtein)
1. [pyxDamerauLevenshtein](https://github.com/gfairchild/pyxDamerauLevenshtein)
Algorithms:
1. DamerauLevenshtein
1. Hamming
1. Jaro
1. JaroWinkler
1. Levenshtein
## Benchmarks
Without extras installation:
| algorithm | library | time |
|--------------------|-----------------------|---------|
| DamerauLevenshtein | rapidfuzz | 0.00312 |
| DamerauLevenshtein | jellyfish | 0.00591 |
| DamerauLevenshtein | pyxdameraulevenshtein | 0.03335 |
| DamerauLevenshtein | **textdistance** | 0.83524 |
| Hamming | Levenshtein | 0.00038 |
| Hamming | rapidfuzz | 0.00044 |
| Hamming | jellyfish | 0.00091 |
| Hamming | distance | 0.00812 |
| Hamming | **textdistance** | 0.03531 |
| Jaro | rapidfuzz | 0.00092 |
| Jaro | jellyfish | 0.00191 |
| Jaro | **textdistance** | 0.07365 |
| JaroWinkler | rapidfuzz | 0.00094 |
| JaroWinkler | jellyfish | 0.00195 |
| JaroWinkler | **textdistance** | 0.07501 |
| Levenshtein | rapidfuzz | 0.00099 |
| Levenshtein | Levenshtein | 0.00122 |
| Levenshtein | jellyfish | 0.00254 |
| Levenshtein | pylev | 0.15688 |
| Levenshtein | distance | 0.28669 |
| Levenshtein | **textdistance** | 0.53902 |
Total: 24 libs.
Yeah, so slow. Use TextDistance on production only with extras.
Textdistance use benchmark's results for algorithm's optimization and try to call fastest external lib first (if possible).
You can run benchmark manually on your system:
```bash
pip install textdistance[benchmark]
python3 -m textdistance.benchmark
```
TextDistance show benchmarks results table for your system and save libraries priorities into `libraries.json` file in TextDistance's folder. This file will be used by textdistance for calling fastest algorithm implementation. Default [libraries.json](textdistance/libraries.json) already included in package.
## Running tests
All you need is [task](https://taskfile.dev/). See [Taskfile.yml](./Taskfile.yml) for the list of available commands. For example, to run tests including third-party libraries usage, execute `task pytest-external:run`.
## Contributing
PRs are welcome!
- Found a bug? Fix it!
- Want to add more algorithms? Sure! Just make it with the same interface as other algorithms in the lib and add some tests.
- Can make something faster? Great! Just avoid external dependencies and remember that everything should work not only with strings.
- Something else that do you think is good? Do it! Just make sure that CI passes and everything from the README is still applicable (interface, features, and so on).
- Have no time to code? Tell your friends and subscribers about `textdistance`. More users, more contributions, more amazing features.
Thank you :heart:

View File

@ -0,0 +1,23 @@
textdistance-4.6.2.dist-info/INSTALLER,sha256=zuuue4knoyJ-UwPPXg8fezS7VCrXJQrAP7zeNuwvFQg,4
textdistance-4.6.2.dist-info/LICENSE,sha256=oOV_OJnxc9uQzeaMabB-rqV8Gti-Q1kQXusPgFxoEJI,1049
textdistance-4.6.2.dist-info/METADATA,sha256=EMFMdpWu4YfxyjNq4WGavFIWxxFmolqLPnk1KtNoSNU,18233
textdistance-4.6.2.dist-info/RECORD,,
textdistance-4.6.2.dist-info/REQUESTED,sha256=47DEQpj8HBSa-_TImW-5JCeuQeRkm5NMpJWZG3hSuFU,0
textdistance-4.6.2.dist-info/WHEEL,sha256=GJ7t_kWBFywbagK5eo9IoUwLW6oyOeTKmQ-9iHFVNxQ,92
textdistance-4.6.2.dist-info/top_level.txt,sha256=GBIsLNa3pcbaPSp8KNq93YdHKE3CNIQMQrLrZdectak,13
textdistance/__init__.py,sha256=abtyaG6QgNqbwQTs_8q1rJUSPr78akEOLYgJDXHgtLM,355
textdistance/algorithms/__init__.py,sha256=1raagDGcgHenA-Ncj3oKHTCk0ai8ltLdqQzTA__clkg,217
textdistance/algorithms/base.py,sha256=IJwzIa3G4n6piDS9dOhSAVUoHmD4sCJCvGHH1Z4F_0o,6332
textdistance/algorithms/compression_based.py,sha256=qXg-jUm4ifd1jLLXhvrNkhbi-_JK5AQyjr4026qquVE,8190
textdistance/algorithms/edit_based.py,sha256=OZc-sGjzRx0eFl5jsEA3V3Q5MkBJR5QLbZYiz_EAPL0,27598
textdistance/algorithms/phonetic.py,sha256=E7yCZVV_6XDkq7tbLHmYd2CAIyj7VQcpf7rQhHXMMj8,6133
textdistance/algorithms/sequence_based.py,sha256=0iS9iZkx_eYJQFZKjRpBFp8jCs1c_1Hz0kWq6CBnJVg,6158
textdistance/algorithms/simple.py,sha256=2wryMhYmBRDGjG9AT74AAI9SpmYDLABqpSUbw_Fy8AU,3209
textdistance/algorithms/token_based.py,sha256=D2__lJONSfvU6Eiuq8IkB6TIBWCHPb3JWNH5LuL5liA,9405
textdistance/algorithms/types.py,sha256=PVVh0bcCEK8ziRsmKgHyIJ8i9TERKaGoVA36_5lnAr0,166
textdistance/algorithms/vector_based.py,sha256=jmbeSioJlATSlx097ptcJRl0G6dHzp2x_fyOcKYY6ZE,2821
textdistance/benchmark.py,sha256=NpxvQQgBFVElQrG0wP44AlmcxEntLI11qj1A0KFSrCY,3818
textdistance/libraries.json,sha256=bZw0jXy6oPnKr7VPu0LyOMDA1EAUoF-TDwjazl3lknc,1161
textdistance/libraries.py,sha256=GGQsTRlyMOoak2WQ1w_mESgDzmcYeUiCHmWqP0s8ncI,6716
textdistance/py.typed,sha256=47DEQpj8HBSa-_TImW-5JCeuQeRkm5NMpJWZG3hSuFU,0
textdistance/utils.py,sha256=SDJclnzpkOpoyJmZ23AO7JZQfdsdpWmOO0xofzi95YQ,783

View File

@ -0,0 +1,5 @@
Wheel-Version: 1.0
Generator: bdist_wheel (0.43.0)
Root-Is-Purelib: true
Tag: py3-none-any

View File

@ -0,0 +1 @@
textdistance

View File

@ -0,0 +1,20 @@
"""
TextDistance.
Compute distance between sequences.
30+ algorithms, pure python implementation, common interface.
"""
# main package info
__title__ = 'TextDistance'
__version__ = '4.6.2'
__author__ = 'Gram (@orsinium)'
__license__ = 'MIT'
# version synonym
VERSION = __version__
# app
from .algorithms import * # noQA
from .utils import * # noQA

View File

@ -0,0 +1,8 @@
# app
from .compression_based import * # noQA
from .edit_based import * # noQA
from .phonetic import * # noQA
from .sequence_based import * # noQA
from .simple import * # noQA
from .token_based import * # noQA

View File

@ -0,0 +1,191 @@
from __future__ import annotations
# built-in
from collections import Counter
from contextlib import suppress
from typing import Sequence, TypeVar
# app
from ..libraries import prototype
from ..utils import find_ngrams
libraries = prototype.clone()
libraries.optimize()
T = TypeVar('T')
class Base:
def __init__(self, qval: int = 1, external: bool = True) -> None:
self.qval = qval
self.external = external
def __call__(self, *sequences: Sequence[object]) -> float:
raise NotImplementedError
@staticmethod
def maximum(*sequences: Sequence[object]) -> float:
"""Get maximum possible value
"""
return max(map(len, sequences))
def distance(self, *sequences: Sequence[object]) -> float:
"""Get distance between sequences
"""
return self(*sequences)
def similarity(self, *sequences: Sequence[object]) -> float:
"""Get sequences similarity.
similarity = maximum - distance
"""
return self.maximum(*sequences) - self.distance(*sequences)
def normalized_distance(self, *sequences: Sequence[object]) -> float:
"""Get distance from 0 to 1
"""
maximum = self.maximum(*sequences)
if maximum == 0:
return 0
return self.distance(*sequences) / maximum
def normalized_similarity(self, *sequences: Sequence[object]) -> float:
"""Get similarity from 0 to 1
normalized_similarity = 1 - normalized_distance
"""
return 1 - self.normalized_distance(*sequences)
def external_answer(self, *sequences: Sequence[object]) -> float | None:
"""Try to get answer from known external libraries.
"""
# if this feature disabled
if not getattr(self, 'external', False):
return None
# all external libs don't support test_func
test_func = getattr(self, 'test_func', self._ident)
if test_func is not self._ident:
return None
# try to get external libs for algorithm
libs = libraries.get_libs(self.__class__.__name__)
for lib in libs:
# if conditions not satisfied
if not lib.check_conditions(self, *sequences):
continue
# if library is not installed yet
func = lib.get_function()
if func is None:
continue
prepared_sequences = lib.prepare(*sequences)
# fail side libraries silently and try next libs
with suppress(Exception):
return func(*prepared_sequences)
return None
def quick_answer(self, *sequences: Sequence[object]) -> float | None:
"""Try to get answer quick without main implementation calling.
If no sequences, 1 sequence or all sequences are equal then return 0.
If any sequence are empty then return maximum.
And in finish try to get external answer.
"""
if not sequences:
return 0
if len(sequences) == 1:
return 0
if self._ident(*sequences):
return 0
if not all(sequences):
return self.maximum(*sequences)
# try get answer from external libs
return self.external_answer(*sequences)
@staticmethod
def _ident(*elements: object) -> bool:
"""Return True if all sequences are equal.
"""
try:
# for hashable elements
return len(set(elements)) == 1
except TypeError:
# for unhashable elements
for e1, e2 in zip(elements, elements[1:]):
if e1 != e2:
return False
return True
def _get_sequences(self, *sequences: Sequence[object]) -> list:
"""Prepare sequences.
qval=None: split text by words
qval=1: do not split sequences. For text this is mean comparing by letters.
qval>1: split sequences by q-grams
"""
# by words
if not self.qval:
return [s.split() for s in sequences] # type: ignore[attr-defined]
# by chars
if self.qval == 1:
return list(sequences)
# by n-grams
return [find_ngrams(s, self.qval) for s in sequences]
def _get_counters(self, *sequences: Sequence[object]) -> list[Counter]:
"""Prepare sequences and convert it to Counters.
"""
# already Counters
if all(isinstance(s, Counter) for s in sequences):
return list(sequences) # type: ignore[arg-type]
return [Counter(s) for s in self._get_sequences(*sequences)]
def _intersect_counters(self, *sequences: Counter[T]) -> Counter[T]:
intersection = sequences[0].copy()
for s in sequences[1:]:
intersection &= s
return intersection
def _union_counters(self, *sequences: Counter[T]) -> Counter[T]:
union = sequences[0].copy()
for s in sequences[1:]:
union |= s
return union
def _sum_counters(self, *sequences: Counter[T]) -> Counter[T]:
result = sequences[0].copy()
for s in sequences[1:]:
result += s
return result
def _count_counters(self, counter: Counter) -> int:
"""Return all elements count from Counter
"""
if getattr(self, 'as_set', False):
return len(set(counter))
else:
return sum(counter.values())
def __repr__(self) -> str:
return '{name}({data})'.format(
name=type(self).__name__,
data=self.__dict__,
)
class BaseSimilarity(Base):
def distance(self, *sequences: Sequence[object]) -> float:
return self.maximum(*sequences) - self.similarity(*sequences)
def similarity(self, *sequences: Sequence[object]) -> float:
return self(*sequences)
def quick_answer(self, *sequences: Sequence[object]) -> float | None:
if not sequences:
return self.maximum(*sequences)
if len(sequences) == 1:
return self.maximum(*sequences)
if self._ident(*sequences):
return self.maximum(*sequences)
if not all(sequences):
return 0
# try get answer from external libs
return self.external_answer(*sequences)

View File

@ -0,0 +1,286 @@
from __future__ import annotations
# built-in
import codecs
import math
from collections import Counter
from fractions import Fraction
from itertools import groupby, permutations
from typing import Any, Sequence, TypeVar
# app
from .base import Base as _Base
try:
# built-in
import lzma
except ImportError:
lzma = None # type: ignore[assignment]
__all__ = [
'ArithNCD', 'LZMANCD', 'BZ2NCD', 'RLENCD', 'BWTRLENCD', 'ZLIBNCD',
'SqrtNCD', 'EntropyNCD',
'bz2_ncd', 'lzma_ncd', 'arith_ncd', 'rle_ncd', 'bwtrle_ncd', 'zlib_ncd',
'sqrt_ncd', 'entropy_ncd',
]
T = TypeVar('T')
class _NCDBase(_Base):
"""Normalized compression distance (NCD)
https://articles.orsinium.dev/other/ncd/
https://en.wikipedia.org/wiki/Normalized_compression_distance#Normalized_compression_distance
"""
qval = 1
def __init__(self, qval: int = 1) -> None:
self.qval = qval
def maximum(self, *sequences) -> int:
return 1
def _get_size(self, data: str) -> float:
return len(self._compress(data))
def _compress(self, data: str) -> Any:
raise NotImplementedError
def __call__(self, *sequences) -> float:
if not sequences:
return 0
sequences = self._get_sequences(*sequences)
concat_len = float('Inf')
empty = type(sequences[0])()
for mutation in permutations(sequences):
if isinstance(empty, (str, bytes)):
data = empty.join(mutation)
else:
data = sum(mutation, empty)
concat_len = min(concat_len, self._get_size(data)) # type: ignore[arg-type]
compressed_lens = [self._get_size(s) for s in sequences]
max_len = max(compressed_lens)
if max_len == 0:
return 0
return (concat_len - min(compressed_lens) * (len(sequences) - 1)) / max_len
class _BinaryNCDBase(_NCDBase):
def __init__(self) -> None:
pass
def __call__(self, *sequences) -> float:
if not sequences:
return 0
if isinstance(sequences[0], str):
sequences = tuple(s.encode('utf-8') for s in sequences)
return super().__call__(*sequences)
class ArithNCD(_NCDBase):
"""Arithmetic coding
https://github.com/gw-c/arith
http://www.drdobbs.com/cpp/data-compression-with-arithmetic-encodin/240169251
https://en.wikipedia.org/wiki/Arithmetic_coding
"""
def __init__(self, base: int = 2, terminator: str | None = None, qval: int = 1) -> None:
self.base = base
self.terminator = terminator
self.qval = qval
def _make_probs(self, *sequences) -> dict[str, tuple[Fraction, Fraction]]:
"""
https://github.com/gw-c/arith/blob/master/arith.py
"""
sequences = self._get_counters(*sequences)
counts = self._sum_counters(*sequences)
if self.terminator is not None:
counts[self.terminator] = 1
total_letters = sum(counts.values())
prob_pairs = {}
cumulative_count = 0
for char, current_count in counts.most_common():
prob_pairs[char] = (
Fraction(cumulative_count, total_letters),
Fraction(current_count, total_letters),
)
cumulative_count += current_count
assert cumulative_count == total_letters
return prob_pairs
def _get_range(
self,
data: str,
probs: dict[str, tuple[Fraction, Fraction]],
) -> tuple[Fraction, Fraction]:
if self.terminator is not None:
if self.terminator in data:
data = data.replace(self.terminator, '')
data += self.terminator
start = Fraction(0, 1)
width = Fraction(1, 1)
for char in data:
prob_start, prob_width = probs[char]
start += prob_start * width
width *= prob_width
return start, start + width
def _compress(self, data: str) -> Fraction:
probs = self._make_probs(data)
start, end = self._get_range(data=data, probs=probs)
output_fraction = Fraction(0, 1)
output_denominator = 1
while not (start <= output_fraction < end):
output_numerator = 1 + ((start.numerator * output_denominator) // start.denominator)
output_fraction = Fraction(output_numerator, output_denominator)
output_denominator *= 2
return output_fraction
def _get_size(self, data: str) -> int:
numerator = self._compress(data).numerator
if numerator == 0:
return 0
return math.ceil(math.log(numerator, self.base))
class RLENCD(_NCDBase):
"""Run-length encoding
https://en.wikipedia.org/wiki/Run-length_encoding
"""
def _compress(self, data: Sequence) -> str:
new_data = []
for k, g in groupby(data):
n = len(list(g))
if n > 2:
new_data.append(str(n) + k)
elif n == 1:
new_data.append(k)
else:
new_data.append(2 * k)
return ''.join(new_data)
class BWTRLENCD(RLENCD):
"""
https://en.wikipedia.org/wiki/Burrows%E2%80%93Wheeler_transform
https://en.wikipedia.org/wiki/Run-length_encoding
"""
def __init__(self, terminator: str = '\0') -> None:
self.terminator: Any = terminator
def _compress(self, data: str) -> str:
if not data:
data = self.terminator
elif self.terminator not in data:
data += self.terminator
modified = sorted(data[i:] + data[:i] for i in range(len(data)))
empty = type(data)()
data = empty.join(subdata[-1] for subdata in modified)
return super()._compress(data)
# -- NORMAL COMPRESSORS -- #
class SqrtNCD(_NCDBase):
"""Square Root based NCD
Size of compressed data equals to sum of square roots of counts of every
element in the input sequence.
"""
def __init__(self, qval: int = 1) -> None:
self.qval = qval
def _compress(self, data: Sequence[T]) -> dict[T, float]:
return {element: math.sqrt(count) for element, count in Counter(data).items()}
def _get_size(self, data: Sequence) -> float:
return sum(self._compress(data).values())
class EntropyNCD(_NCDBase):
"""Entropy based NCD
Get Entropy of input sequence as a size of compressed data.
https://en.wikipedia.org/wiki/Entropy_(information_theory)
https://en.wikipedia.org/wiki/Entropy_encoding
"""
def __init__(self, qval: int = 1, coef: int = 1, base: int = 2) -> None:
self.qval = qval
self.coef = coef
self.base = base
def _compress(self, data: Sequence) -> float:
total_count = len(data)
entropy = 0.0
for element_count in Counter(data).values():
p = element_count / total_count
entropy -= p * math.log(p, self.base)
assert entropy >= 0
return entropy
# # redundancy:
# unique_count = len(counter)
# absolute_entropy = math.log(unique_count, 2) / unique_count
# return absolute_entropy - entropy / unique_count
def _get_size(self, data: Sequence) -> float:
return self.coef + self._compress(data)
# -- BINARY COMPRESSORS -- #
class BZ2NCD(_BinaryNCDBase):
"""
https://en.wikipedia.org/wiki/Bzip2
"""
def _compress(self, data: str | bytes) -> bytes:
return codecs.encode(data, 'bz2_codec')[15:]
class LZMANCD(_BinaryNCDBase):
"""
https://en.wikipedia.org/wiki/LZMA
"""
def _compress(self, data: bytes) -> bytes:
if not lzma:
raise ImportError('Please, install the PylibLZMA module')
return lzma.compress(data)[14:]
class ZLIBNCD(_BinaryNCDBase):
"""
https://en.wikipedia.org/wiki/Zlib
"""
def _compress(self, data: str | bytes) -> bytes:
return codecs.encode(data, 'zlib_codec')[2:]
arith_ncd = ArithNCD()
bwtrle_ncd = BWTRLENCD()
bz2_ncd = BZ2NCD()
lzma_ncd = LZMANCD()
rle_ncd = RLENCD()
zlib_ncd = ZLIBNCD()
sqrt_ncd = SqrtNCD()
entropy_ncd = EntropyNCD()

View File

@ -0,0 +1,847 @@
from __future__ import annotations
# built-in
from collections import defaultdict
from itertools import zip_longest
from typing import Any, Sequence, TypeVar
# app
from .base import Base as _Base, BaseSimilarity as _BaseSimilarity
from .types import SimFunc, TestFunc
try:
# external
import numpy
except ImportError:
numpy = None # type: ignore[assignment]
__all__ = [
'Hamming', 'MLIPNS',
'Levenshtein', 'DamerauLevenshtein',
'Jaro', 'JaroWinkler', 'StrCmp95',
'NeedlemanWunsch', 'Gotoh', 'SmithWaterman',
'hamming', 'mlipns',
'levenshtein', 'damerau_levenshtein',
'jaro', 'jaro_winkler', 'strcmp95',
'needleman_wunsch', 'gotoh', 'smith_waterman',
]
T = TypeVar('T')
class Hamming(_Base):
"""
Compute the Hamming distance between the two or more sequences.
The Hamming distance is the number of differing items in ordered sequences.
https://en.wikipedia.org/wiki/Hamming_distance
"""
def __init__(
self,
qval: int = 1,
test_func: TestFunc | None = None,
truncate: bool = False,
external: bool = True,
) -> None:
self.qval = qval
self.test_func = test_func or self._ident
self.truncate = truncate
self.external = external
def __call__(self, *sequences: Sequence[object]) -> int:
sequences = self._get_sequences(*sequences)
result = self.quick_answer(*sequences)
if result is not None:
assert isinstance(result, int)
return result
_zip = zip if self.truncate else zip_longest
return sum(not self.test_func(*es) for es in _zip(*sequences))
class Levenshtein(_Base):
"""
Compute the absolute Levenshtein distance between the two sequences.
The Levenshtein distance is the minimum number of edit operations necessary
for transforming one sequence into the other. The edit operations allowed are:
* deletion: ABC -> BC, AC, AB
* insertion: ABC -> ABCD, EABC, AEBC..
* substitution: ABC -> ABE, ADC, FBC..
https://en.wikipedia.org/wiki/Levenshtein_distance
TODO: https://gist.github.com/kylebgorman/1081951/9b38b7743a3cb5167ab2c6608ac8eea7fc629dca
"""
def __init__(
self,
qval: int = 1,
test_func: TestFunc | None = None,
external: bool = True,
) -> None:
self.qval = qval
self.test_func = test_func or self._ident
self.external = external
def _recursive(self, s1: Sequence[T], s2: Sequence[T]) -> int:
# TODO: more than 2 sequences support
if not s1 or not s2:
return len(s1) + len(s2)
if self.test_func(s1[-1], s2[-1]):
return self(s1[:-1], s2[:-1])
# deletion/insertion
d = min(
self(s1[:-1], s2),
self(s1, s2[:-1]),
)
# substitution
s = self(s1[:-1], s2[:-1])
return min(d, s) + 1
def _cycled(self, s1: Sequence[T], s2: Sequence[T]) -> int:
"""
source:
https://github.com/jamesturk/jellyfish/blob/master/jellyfish/_jellyfish.py#L18
"""
rows = len(s1) + 1
cols = len(s2) + 1
prev = None
cur: Any
if numpy:
cur = numpy.arange(cols)
else:
cur = range(cols)
for r in range(1, rows):
prev, cur = cur, [r] + [0] * (cols - 1)
for c in range(1, cols):
deletion = prev[c] + 1
insertion = cur[c - 1] + 1
dist = self.test_func(s1[r - 1], s2[c - 1])
edit = prev[c - 1] + (not dist)
cur[c] = min(edit, deletion, insertion)
return int(cur[-1])
def __call__(self, s1: Sequence[T], s2: Sequence[T]) -> int:
s1, s2 = self._get_sequences(s1, s2)
result = self.quick_answer(s1, s2)
if result is not None:
assert isinstance(result, int)
return result
return self._cycled(s1, s2)
class DamerauLevenshtein(_Base):
"""
Compute the absolute Damerau-Levenshtein distance between the two sequences.
The Damerau-Levenshtein distance is the minimum number of edit operations necessary
for transforming one sequence into the other. The edit operations allowed are:
* deletion: ABC -> BC, AC, AB
* insertion: ABC -> ABCD, EABC, AEBC..
* substitution: ABC -> ABE, ADC, FBC..
* transposition: ABC -> ACB, BAC
If `restricted=False`, it will calculate unrestricted distance,
where the same character can be touched more than once.
So the distance between BA and ACB is 2: BA -> AB -> ACB.
https://en.wikipedia.org/wiki/Damerau%E2%80%93Levenshtein_distance
"""
def __init__(
self,
qval: int = 1,
test_func: TestFunc | None = None,
external: bool = True,
restricted: bool = True,
) -> None:
self.qval = qval
self.test_func = test_func or self._ident
self.external = external
self.restricted = restricted
def _numpy(self, s1: Sequence[T], s2: Sequence[T]) -> int:
# TODO: doesn't pass tests, need improve
d = numpy.zeros([len(s1) + 1, len(s2) + 1], dtype=int)
# matrix
for i in range(-1, len(s1) + 1):
d[i][-1] = i + 1
for j in range(-1, len(s2) + 1):
d[-1][j] = j + 1
for i, cs1 in enumerate(s1):
for j, cs2 in enumerate(s2):
cost = int(not self.test_func(cs1, cs2))
# ^ 0 if equal, 1 otherwise
d[i][j] = min(
d[i - 1][j] + 1, # deletion
d[i][j - 1] + 1, # insertion
d[i - 1][j - 1] + cost, # substitution
)
# transposition
if not i or not j:
continue
if not self.test_func(cs1, s2[j - 1]):
continue
d[i][j] = min(
d[i][j],
d[i - 2][j - 2] + cost,
)
return d[len(s1) - 1][len(s2) - 1]
def _pure_python_unrestricted(self, s1: Sequence[T], s2: Sequence[T]) -> int:
"""https://en.wikipedia.org/wiki/Damerau%E2%80%93Levenshtein_distance
"""
d: dict[tuple[int, int], int] = {}
da: dict[T, int] = {}
len1 = len(s1)
len2 = len(s2)
maxdist = len1 + len2
d[-1, -1] = maxdist
# matrix
for i in range(len(s1) + 1):
d[i, -1] = maxdist
d[i, 0] = i
for j in range(len(s2) + 1):
d[-1, j] = maxdist
d[0, j] = j
for i, cs1 in enumerate(s1, start=1):
db = 0
for j, cs2 in enumerate(s2, start=1):
i1 = da.get(cs2, 0)
j1 = db
if self.test_func(cs1, cs2):
cost = 0
db = j
else:
cost = 1
d[i, j] = min(
d[i - 1, j - 1] + cost, # substitution
d[i, j - 1] + 1, # insertion
d[i - 1, j] + 1, # deletion
d[i1 - 1, j1 - 1] + (i - i1) - 1 + (j - j1), # transposition
)
da[cs1] = i
return d[len1, len2]
def _pure_python_restricted(self, s1: Sequence[T], s2: Sequence[T]) -> int:
"""
https://www.guyrutenberg.com/2008/12/15/damerau-levenshtein-distance-in-python/
"""
d: dict[tuple[int, int], int] = {}
# matrix
for i in range(-1, len(s1) + 1):
d[i, -1] = i + 1
for j in range(-1, len(s2) + 1):
d[-1, j] = j + 1
for i, cs1 in enumerate(s1):
for j, cs2 in enumerate(s2):
cost = int(not self.test_func(cs1, cs2))
# ^ 0 if equal, 1 otherwise
d[i, j] = min(
d[i - 1, j] + 1, # deletion
d[i, j - 1] + 1, # insertion
d[i - 1, j - 1] + cost, # substitution
)
# transposition
if not i or not j:
continue
if not self.test_func(cs1, s2[j - 1]):
continue
if not self.test_func(s1[i - 1], cs2):
continue
d[i, j] = min(
d[i, j],
d[i - 2, j - 2] + cost,
)
return d[len(s1) - 1, len(s2) - 1]
def __call__(self, s1: Sequence[T], s2: Sequence[T]) -> int:
s1, s2 = self._get_sequences(s1, s2)
result = self.quick_answer(s1, s2)
if result is not None:
return result # type: ignore[return-value]
# if numpy:
# return self._numpy(s1, s2)
# else:
if self.restricted:
return self._pure_python_restricted(s1, s2)
return self._pure_python_unrestricted(s1, s2)
class JaroWinkler(_BaseSimilarity):
"""
Computes the Jaro-Winkler measure between two strings.
The Jaro-Winkler measure is designed to capture cases where two strings
have a low Jaro score, but share a prefix.
and thus are likely to match.
https://en.wikipedia.org/wiki/Jaro%E2%80%93Winkler_distance
https://github.com/Yomguithereal/talisman/blob/master/src/metrics/jaro.js
https://github.com/Yomguithereal/talisman/blob/master/src/metrics/jaro-winkler.js
"""
def __init__(
self,
long_tolerance: bool = False,
winklerize: bool = True,
qval: int = 1,
external: bool = True,
) -> None:
self.qval = qval
self.long_tolerance = long_tolerance
self.winklerize = winklerize
self.external = external
def maximum(self, *sequences: Sequence[object]) -> int:
return 1
def __call__(self, s1: Sequence[T], s2: Sequence[T], prefix_weight: float = 0.1) -> float:
s1, s2 = self._get_sequences(s1, s2)
result = self.quick_answer(s1, s2)
if result is not None:
return result
s1_len = len(s1)
s2_len = len(s2)
if not s1_len or not s2_len:
return 0.0
min_len = min(s1_len, s2_len)
search_range = max(s1_len, s2_len)
search_range = (search_range // 2) - 1
if search_range < 0:
search_range = 0
s1_flags = [False] * s1_len
s2_flags = [False] * s2_len
# looking only within search range, count & flag matched pairs
common_chars = 0
for i, s1_ch in enumerate(s1):
low = max(0, i - search_range)
hi = min(i + search_range, s2_len - 1)
for j in range(low, hi + 1):
if not s2_flags[j] and s2[j] == s1_ch:
s1_flags[i] = s2_flags[j] = True
common_chars += 1
break
# short circuit if no characters match
if not common_chars:
return 0.0
# count transpositions
k = trans_count = 0
for i, s1_f in enumerate(s1_flags):
if s1_f:
for j in range(k, s2_len):
if s2_flags[j]:
k = j + 1
break
if s1[i] != s2[j]:
trans_count += 1
trans_count //= 2
# adjust for similarities in nonmatched characters
weight = common_chars / s1_len + common_chars / s2_len
weight += (common_chars - trans_count) / common_chars
weight /= 3
# stop to boost if strings are not similar
if not self.winklerize:
return weight
if weight <= 0.7:
return weight
# winkler modification
# adjust for up to first 4 chars in common
j = min(min_len, 4)
i = 0
while i < j and s1[i] == s2[i]:
i += 1
if i:
weight += i * prefix_weight * (1.0 - weight)
# optionally adjust for long strings
# after agreeing beginning chars, at least two or more must agree and
# agreed characters must be > half of remaining characters
if not self.long_tolerance or min_len <= 4:
return weight
if common_chars <= i + 1 or 2 * common_chars < min_len + i:
return weight
tmp = (common_chars - i - 1) / (s1_len + s2_len - i * 2 + 2)
weight += (1.0 - weight) * tmp
return weight
class Jaro(JaroWinkler):
def __init__(
self,
long_tolerance: bool = False,
qval: int = 1,
external: bool = True,
) -> None:
super().__init__(
long_tolerance=long_tolerance,
winklerize=False,
qval=qval,
external=external,
)
class NeedlemanWunsch(_BaseSimilarity):
"""
Computes the Needleman-Wunsch measure between two strings.
The Needleman-Wunsch generalizes the Levenshtein distance and considers global
alignment between two strings. Specifically, it is computed by assigning
a score to each alignment between two input strings and choosing the
score of the best alignment, that is, the maximal score.
An alignment between two strings is a set of correspondences between the
characters of between them, allowing for gaps.
https://en.wikipedia.org/wiki/Needleman%E2%80%93Wunsch_algorithm
"""
def __init__(
self,
gap_cost: float = 1.0,
sim_func: SimFunc = None,
qval: int = 1,
external: bool = True,
) -> None:
self.qval = qval
self.gap_cost = gap_cost
if sim_func:
self.sim_func = sim_func
else:
self.sim_func = self._ident
self.external = external
def minimum(self, *sequences: Sequence[object]) -> float:
return -max(map(len, sequences)) * self.gap_cost
def maximum(self, *sequences: Sequence[object]) -> float:
return max(map(len, sequences))
def distance(self, *sequences: Sequence[object]) -> float:
"""Get distance between sequences
"""
return -1 * self.similarity(*sequences)
def normalized_distance(self, *sequences: Sequence[object]) -> float:
"""Get distance from 0 to 1
"""
minimum = self.minimum(*sequences)
maximum = self.maximum(*sequences)
if maximum == 0:
return 0
return (self.distance(*sequences) - minimum) / (maximum - minimum)
def normalized_similarity(self, *sequences: Sequence[object]) -> float:
"""Get similarity from 0 to 1
"""
minimum = self.minimum(*sequences)
maximum = self.maximum(*sequences)
if maximum == 0:
return 1
return (self.similarity(*sequences) - minimum) / (maximum * 2)
def __call__(self, s1: Sequence[T], s2: Sequence[T]) -> float:
if not numpy:
raise ImportError('Please, install numpy for Needleman-Wunsch measure')
s1, s2 = self._get_sequences(s1, s2)
# result = self.quick_answer(s1, s2)
# if result is not None:
# return result * self.maximum(s1, s2)
dist_mat = numpy.zeros(
(len(s1) + 1, len(s2) + 1),
dtype=float,
)
# DP initialization
for i in range(len(s1) + 1):
dist_mat[i, 0] = -(i * self.gap_cost)
# DP initialization
for j in range(len(s2) + 1):
dist_mat[0, j] = -(j * self.gap_cost)
# Needleman-Wunsch DP calculation
for i, c1 in enumerate(s1, 1):
for j, c2 in enumerate(s2, 1):
match = dist_mat[i - 1, j - 1] + self.sim_func(c1, c2)
delete = dist_mat[i - 1, j] - self.gap_cost
insert = dist_mat[i, j - 1] - self.gap_cost
dist_mat[i, j] = max(match, delete, insert)
return dist_mat[dist_mat.shape[0] - 1, dist_mat.shape[1] - 1]
class SmithWaterman(_BaseSimilarity):
"""
Computes the Smith-Waterman measure between two strings.
The Smith-Waterman algorithm performs local sequence alignment;
that is, for determining similar regions between two strings.
Instead of looking at the total sequence, the Smith-Waterman algorithm compares
segments of all possible lengths and optimizes the similarity measure.
https://en.wikipedia.org/wiki/Smith%E2%80%93Waterman_algorithm
https://github.com/Yomguithereal/talisman/blob/master/src/metrics/smith-waterman.js
"""
def __init__(
self,
gap_cost: float = 1.0,
sim_func: SimFunc = None,
qval: int = 1,
external: bool = True,
) -> None:
self.qval = qval
self.gap_cost = gap_cost
self.sim_func = sim_func or self._ident
self.external = external
def maximum(self, *sequences: Sequence[object]) -> int:
return min(map(len, sequences))
def __call__(self, s1: Sequence[T], s2: Sequence[T]) -> float:
if not numpy:
raise ImportError('Please, install numpy for Smith-Waterman measure')
s1, s2 = self._get_sequences(s1, s2)
result = self.quick_answer(s1, s2)
if result is not None:
return result
dist_mat = numpy.zeros(
(len(s1) + 1, len(s2) + 1),
dtype=float,
)
for i, sc1 in enumerate(s1, start=1):
for j, sc2 in enumerate(s2, start=1):
# The score for substituting the letter a[i - 1] for b[j - 1].
# Generally low for mismatch, high for match.
match = dist_mat[i - 1, j - 1] + self.sim_func(sc1, sc2)
# The scores for for introducing extra letters in one of the strings
# (or by symmetry, deleting them from the other).
delete = dist_mat[i - 1, j] - self.gap_cost
insert = dist_mat[i, j - 1] - self.gap_cost
dist_mat[i, j] = max(0, match, delete, insert)
return dist_mat[dist_mat.shape[0] - 1, dist_mat.shape[1] - 1]
class Gotoh(NeedlemanWunsch):
"""Gotoh score
Gotoh's algorithm is essentially Needleman-Wunsch with affine gap
penalties:
https://www.cs.umd.edu/class/spring2003/cmsc838t/papers/gotoh1982.pdf
"""
def __init__(
self,
gap_open: int = 1,
gap_ext: float = 0.4,
sim_func: SimFunc = None,
qval: int = 1,
external: bool = True,
) -> None:
self.qval = qval
self.gap_open = gap_open
self.gap_ext = gap_ext
if sim_func:
self.sim_func = sim_func
else:
self.sim_func = self._ident
self.external = external
def minimum(self, *sequences: Sequence[object]) -> int:
return -min(map(len, sequences))
def maximum(self, *sequences: Sequence[object]) -> int:
return min(map(len, sequences))
def __call__(self, s1: Sequence[T], s2: Sequence[T]) -> float:
if not numpy:
raise ImportError('Please, install numpy for Gotoh measure')
s1, s2 = self._get_sequences(s1, s2)
# result = self.quick_answer(s1, s2)
# if result is not None:
# return result * self.maximum(s1, s2)
len_s1 = len(s1)
len_s2 = len(s2)
d_mat = numpy.zeros((len_s1 + 1, len_s2 + 1), dtype=float)
p_mat = numpy.zeros((len_s1 + 1, len_s2 + 1), dtype=float)
q_mat = numpy.zeros((len_s1 + 1, len_s2 + 1), dtype=float)
d_mat[0, 0] = 0
p_mat[0, 0] = float('-inf')
q_mat[0, 0] = float('-inf')
for i in range(1, len_s1 + 1):
d_mat[i, 0] = float('-inf')
p_mat[i, 0] = -self.gap_open - self.gap_ext * (i - 1)
q_mat[i, 0] = float('-inf')
q_mat[i, 1] = -self.gap_open
for j in range(1, len_s2 + 1):
d_mat[0, j] = float('-inf')
p_mat[0, j] = float('-inf')
p_mat[1, j] = -self.gap_open
q_mat[0, j] = -self.gap_open - self.gap_ext * (j - 1)
for i, sc1 in enumerate(s1, start=1):
for j, sc2 in enumerate(s2, start=1):
sim_val = self.sim_func(sc1, sc2)
d_mat[i, j] = max(
d_mat[i - 1, j - 1] + sim_val,
p_mat[i - 1, j - 1] + sim_val,
q_mat[i - 1, j - 1] + sim_val,
)
p_mat[i, j] = max(
d_mat[i - 1, j] - self.gap_open,
p_mat[i - 1, j] - self.gap_ext,
)
q_mat[i, j] = max(
d_mat[i, j - 1] - self.gap_open,
q_mat[i, j - 1] - self.gap_ext,
)
i, j = (n - 1 for n in d_mat.shape)
return max(d_mat[i, j], p_mat[i, j], q_mat[i, j])
class StrCmp95(_BaseSimilarity):
"""strcmp95 similarity
http://cpansearch.perl.org/src/SCW/Text-JaroWinkler-0.1/strcmp95.c
"""
sp_mx: tuple[tuple[str, str], ...] = (
('A', 'E'), ('A', 'I'), ('A', 'O'), ('A', 'U'), ('B', 'V'), ('E', 'I'),
('E', 'O'), ('E', 'U'), ('I', 'O'), ('I', 'U'), ('O', 'U'), ('I', 'Y'),
('E', 'Y'), ('C', 'G'), ('E', 'F'), ('W', 'U'), ('W', 'V'), ('X', 'K'),
('S', 'Z'), ('X', 'S'), ('Q', 'C'), ('U', 'V'), ('M', 'N'), ('L', 'I'),
('Q', 'O'), ('P', 'R'), ('I', 'J'), ('2', 'Z'), ('5', 'S'), ('8', 'B'),
('1', 'I'), ('1', 'L'), ('0', 'O'), ('0', 'Q'), ('C', 'K'), ('G', 'J'),
)
def __init__(self, long_strings: bool = False, external: bool = True) -> None:
self.long_strings = long_strings
self.external = external
def maximum(self, *sequences: Sequence[object]) -> int:
return 1
@staticmethod
def _in_range(char) -> bool:
return 0 < ord(char) < 91
def __call__(self, s1: str, s2: str) -> float:
s1 = s1.strip().upper()
s2 = s2.strip().upper()
result = self.quick_answer(s1, s2)
if result is not None:
return result
len_s1 = len(s1)
len_s2 = len(s2)
adjwt = defaultdict(int)
# Initialize the adjwt array on the first call to the function only.
# The adjwt array is used to give partial credit for characters that
# may be errors due to known phonetic or character recognition errors.
# A typical example is to match the letter "O" with the number "0"
for c1, c2 in self.sp_mx:
adjwt[c1, c2] = 3
adjwt[c2, c1] = 3
if len_s1 > len_s2:
search_range = len_s1
minv = len_s2
else:
search_range = len_s2
minv = len_s1
# Blank out the flags
s1_flag = [0] * search_range
s2_flag = [0] * search_range
search_range = max(0, search_range // 2 - 1)
# Looking only within the search range, count and flag the matched pairs.
num_com = 0
yl1 = len_s2 - 1
for i, sc1 in enumerate(s1):
lowlim = max(i - search_range, 0)
hilim = min(i + search_range, yl1)
for j in range(lowlim, hilim + 1):
if s2_flag[j] == 0 and s2[j] == sc1:
s2_flag[j] = 1
s1_flag[i] = 1
num_com += 1
break
# If no characters in common - return
if num_com == 0:
return 0.0
# Count the number of transpositions
k = n_trans = 0
for i, sc1 in enumerate(s1):
if not s1_flag[i]:
continue
for j in range(k, len_s2):
if s2_flag[j] != 0:
k = j + 1
break
if sc1 != s2[j]:
n_trans += 1
n_trans = n_trans // 2
# Adjust for similarities in unmatched characters
n_simi = 0
if minv > num_com:
for i in range(len_s1):
if s1_flag[i] != 0:
continue
if not self._in_range(s1[i]):
continue
for j in range(len_s2):
if s2_flag[j] != 0:
continue
if not self._in_range(s2[j]):
continue
if (s1[i], s2[j]) not in adjwt:
continue
n_simi += adjwt[s1[i], s2[j]]
s2_flag[j] = 2
break
num_sim = n_simi / 10.0 + num_com
# Main weight computation
weight = num_sim / len_s1 + num_sim / len_s2
weight += (num_com - n_trans) / num_com
weight = weight / 3.0
# Continue to boost the weight if the strings are similar
if weight <= 0.7:
return weight
# Adjust for having up to the first 4 characters in common
j = min(minv, 4)
i = 0
for sc1, sc2 in zip(s1, s2):
if i >= j:
break
if sc1 != sc2:
break
if sc1.isdigit():
break
i += 1
if i:
weight += i * 0.1 * (1.0 - weight)
# Optionally adjust for long strings.
# After agreeing beginning chars, at least two more must agree and
# the agreeing characters must be > .5 of remaining characters.
if not self.long_strings:
return weight
if minv <= 4:
return weight
if num_com <= i + 1 or 2 * num_com < minv + i:
return weight
if s1[0].isdigit():
return weight
res = (num_com - i - 1) / (len_s1 + len_s2 - i * 2 + 2)
weight += (1.0 - weight) * res
return weight
class MLIPNS(_BaseSimilarity):
"""
Compute the Hamming distance between the two or more sequences.
The Hamming distance is the number of differing items in ordered sequences.
http://www.sial.iias.spb.su/files/386-386-1-PB.pdf
https://github.com/Yomguithereal/talisman/blob/master/src/metrics/mlipns.js
"""
def __init__(
self, threshold: float = 0.25,
maxmismatches: int = 2,
qval: int = 1,
external: bool = True,
) -> None:
self.qval = qval
self.threshold = threshold
self.maxmismatches = maxmismatches
self.external = external
def maximum(self, *sequences: Sequence[object]) -> int:
return 1
def __call__(self, *sequences: Sequence[object]) -> float:
sequences = self._get_sequences(*sequences)
result = self.quick_answer(*sequences)
if result is not None:
return result
mismatches = 0
ham = Hamming()(*sequences)
maxlen = max(map(len, sequences))
while all(sequences) and mismatches <= self.maxmismatches:
if not maxlen:
return 1
if 1 - (maxlen - ham) / maxlen <= self.threshold:
return 1
mismatches += 1
ham -= 1
maxlen -= 1
if not maxlen:
return 1
return 0
hamming = Hamming()
levenshtein = Levenshtein()
damerau = damerau_levenshtein = DamerauLevenshtein()
jaro = Jaro()
jaro_winkler = JaroWinkler()
needleman_wunsch = NeedlemanWunsch()
smith_waterman = SmithWaterman()
gotoh = Gotoh()
strcmp95 = StrCmp95()
mlipns = MLIPNS()

View File

@ -0,0 +1,179 @@
from __future__ import annotations
# built-in
from collections import defaultdict
from itertools import groupby, zip_longest
from typing import Any, Iterator, Sequence, TypeVar
# app
from .base import Base as _Base, BaseSimilarity as _BaseSimilarity
try:
# external
import numpy
except ImportError:
numpy = None # type: ignore[assignment]
__all__ = [
'MRA', 'Editex',
'mra', 'editex',
]
T = TypeVar('T')
class MRA(_BaseSimilarity):
"""Western Airlines Surname Match Rating Algorithm comparison rating
https://en.wikipedia.org/wiki/Match_rating_approach
https://github.com/Yomguithereal/talisman/blob/master/src/metrics/mra.js
"""
def maximum(self, *sequences: str) -> int:
sequences = [list(self._calc_mra(s)) for s in sequences]
return max(map(len, sequences))
def _calc_mra(self, word: str) -> str:
if not word:
return word
word = word.upper()
word = word[0] + ''.join(c for c in word[1:] if c not in 'AEIOU')
# remove repeats like an UNIX uniq
word = ''.join(char for char, _ in groupby(word))
if len(word) > 6:
return word[:3] + word[-3:]
return word
def __call__(self, *sequences: str) -> int:
if not all(sequences):
return 0
sequences = [list(self._calc_mra(s)) for s in sequences]
lengths = list(map(len, sequences))
count = len(lengths)
max_length = max(lengths)
if abs(max_length - min(lengths)) > count:
return 0
for _ in range(count):
new_sequences = []
minlen = min(lengths)
for chars in zip(*sequences):
if not self._ident(*chars):
new_sequences.append(chars)
new_sequences = map(list, zip(*new_sequences))
# update sequences
ss: Iterator[tuple[Any, Any]]
ss = zip_longest(new_sequences, sequences, fillvalue=list())
sequences = [s1 + s2[minlen:] for s1, s2 in ss]
# update lengths
lengths = list(map(len, sequences))
if not lengths:
return max_length
return max_length - max(lengths)
class Editex(_Base):
"""
https://anhaidgroup.github.io/py_stringmatching/v0.3.x/Editex.html
http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.14.3856&rep=rep1&type=pdf
http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.18.2138&rep=rep1&type=pdf
https://github.com/chrislit/blob/master/abydos/distance/_editex.py
https://habr.com/ru/post/331174/ (RUS)
"""
groups: tuple[frozenset[str], ...] = (
frozenset('AEIOUY'),
frozenset('BP'),
frozenset('CKQ'),
frozenset('DT'),
frozenset('LR'),
frozenset('MN'),
frozenset('GJ'),
frozenset('FPV'),
frozenset('SXZ'),
frozenset('CSZ'),
)
ungrouped = frozenset('HW') # all letters in alphabet that not presented in `grouped`
def __init__(
self,
local: bool = False,
match_cost: int = 0,
group_cost: int = 1,
mismatch_cost: int = 2,
groups: tuple[frozenset[str], ...] = None,
ungrouped: frozenset[str] = None,
external: bool = True,
) -> None:
# Ensure that match_cost <= group_cost <= mismatch_cost
self.match_cost = match_cost
self.group_cost = max(group_cost, self.match_cost)
self.mismatch_cost = max(mismatch_cost, self.group_cost)
self.local = local
self.external = external
if groups is not None:
if ungrouped is None:
raise ValueError('`ungrouped` argument required with `groups`')
self.groups = groups
self.ungrouped = ungrouped
self.grouped = frozenset.union(*self.groups)
def maximum(self, *sequences: Sequence) -> int:
return max(map(len, sequences)) * self.mismatch_cost
def r_cost(self, *elements: str) -> int:
if self._ident(*elements):
return self.match_cost
if any(map(lambda x: x not in self.grouped, elements)):
return self.mismatch_cost
for group in self.groups:
if all(map(lambda x: x in group, elements)):
return self.group_cost
return self.mismatch_cost
def d_cost(self, *elements: str) -> int:
if not self._ident(*elements) and elements[0] in self.ungrouped:
return self.group_cost
return self.r_cost(*elements)
def __call__(self, s1: str, s2: str) -> float:
result = self.quick_answer(s1, s2)
if result is not None:
return result
# must do `upper` before getting length because some one-char lowercase glyphs
# are represented as two chars in uppercase.
# This might result in a distance that is greater than the maximum
# input sequence length, though, so we save that maximum first.
max_length = self.maximum(s1, s2)
s1 = ' ' + s1.upper()
s2 = ' ' + s2.upper()
len_s1 = len(s1) - 1
len_s2 = len(s2) - 1
d_mat: Any
if numpy:
d_mat = numpy.zeros((len_s1 + 1, len_s2 + 1), dtype=int)
else:
d_mat = defaultdict(lambda: defaultdict(int))
if not self.local:
for i in range(1, len_s1 + 1):
d_mat[i][0] = d_mat[i - 1][0] + self.d_cost(s1[i - 1], s1[i])
for j in range(1, len_s2 + 1):
d_mat[0][j] = d_mat[0][j - 1] + self.d_cost(s2[j - 1], s2[j])
for i, (cs1_prev, cs1_curr) in enumerate(zip(s1, s1[1:]), start=1):
for j, (cs2_prev, cs2_curr) in enumerate(zip(s2, s2[1:]), start=1):
d_mat[i][j] = min(
d_mat[i - 1][j] + self.d_cost(cs1_prev, cs1_curr),
d_mat[i][j - 1] + self.d_cost(cs2_prev, cs2_curr),
d_mat[i - 1][j - 1] + self.r_cost(cs1_curr, cs2_curr),
)
distance = d_mat[len_s1][len_s2]
return min(distance, max_length)
mra = MRA()
editex = Editex()

View File

@ -0,0 +1,186 @@
from __future__ import annotations
# built-in
from difflib import SequenceMatcher as _SequenceMatcher
from typing import Any
# app
from ..utils import find_ngrams
from .base import BaseSimilarity as _BaseSimilarity
from .types import TestFunc
try:
# external
import numpy
except ImportError:
# built-in
from array import array
numpy = None # type: ignore[assignment]
__all__ = [
'lcsseq', 'lcsstr', 'ratcliff_obershelp',
'LCSSeq', 'LCSStr', 'RatcliffObershelp',
]
class LCSSeq(_BaseSimilarity):
"""longest common subsequence similarity
https://en.wikipedia.org/wiki/Longest_common_subsequence_problem
"""
def __init__(
self,
qval: int = 1,
test_func: TestFunc = None,
external: bool = True,
) -> None:
self.qval = qval
self.test_func = test_func or self._ident
self.external = external
def _dynamic(self, seq1: str, seq2: str) -> str:
"""
https://github.com/chrislit/abydos/blob/master/abydos/distance/_lcsseq.py
http://www.dis.uniroma1.it/~bonifaci/algo/LCSSEQ.py
http://rosettacode.org/wiki/Longest_common_subsequence#Dynamic_Programming_8
"""
lengths: Any
if numpy:
lengths = numpy.zeros((len(seq1) + 1, len(seq2) + 1), dtype=int)
else:
lengths = [array('L', [0] * (len(seq2) + 1)) for _ in range(len(seq1) + 1)]
# row 0 and column 0 are initialized to 0 already
for i, char1 in enumerate(seq1):
for j, char2 in enumerate(seq2):
if char1 == char2:
lengths[i + 1][j + 1] = lengths[i][j] + 1
else:
lengths[i + 1][j + 1] = max(lengths[i + 1][j], lengths[i][j + 1])
# read the substring out from the matrix
result = ''
i, j = len(seq1), len(seq2)
while i != 0 and j != 0:
if lengths[i][j] == lengths[i - 1][j]:
i -= 1
elif lengths[i][j] == lengths[i][j - 1]:
j -= 1
else:
assert seq1[i - 1] == seq2[j - 1]
result = seq1[i - 1] + result
i -= 1
j -= 1
return result
def _recursive(self, *sequences: str) -> str:
if not all(sequences):
return type(sequences[0])() # empty sequence
if self.test_func(*[s[-1] for s in sequences]):
c = sequences[0][-1]
sequences = tuple(s[:-1] for s in sequences)
return self(*sequences) + c
m = type(sequences[0])() # empty sequence
for i, s in enumerate(sequences):
ss = sequences[:i] + (s[:-1], ) + sequences[i + 1:]
m = max([self(*ss), m], key=len)
return m
def __call__(self, *sequences: str) -> str:
if not sequences:
return ''
sequences = self._get_sequences(*sequences)
if len(sequences) == 2:
return self._dynamic(*sequences)
else:
return self._recursive(*sequences)
def similarity(self, *sequences) -> int:
return len(self(*sequences))
class LCSStr(_BaseSimilarity):
"""longest common substring similarity
"""
def _standart(self, s1: str, s2: str) -> str:
matcher = _SequenceMatcher(a=s1, b=s2)
match = matcher.find_longest_match(0, len(s1), 0, len(s2))
return s1[match.a: match.a + match.size]
def _custom(self, *sequences: str) -> str:
short = min(sequences, key=len)
length = len(short)
for n in range(length, 0, -1):
for subseq in find_ngrams(short, n):
joined = ''.join(subseq)
for seq in sequences:
if joined not in seq:
break
else:
return joined
return type(short)() # empty sequence
def __call__(self, *sequences: str) -> str:
if not all(sequences):
return ''
length = len(sequences)
if length == 0:
return ''
if length == 1:
return sequences[0]
sequences = self._get_sequences(*sequences)
if length == 2 and max(map(len, sequences)) < 200:
return self._standart(*sequences)
return self._custom(*sequences)
def similarity(self, *sequences: str) -> int:
return len(self(*sequences))
class RatcliffObershelp(_BaseSimilarity):
"""Ratcliff-Obershelp similarity
This follows the Ratcliff-Obershelp algorithm to derive a similarity
measure:
1. Find the length of the longest common substring in sequences.
2. Recurse on the strings to the left & right of each this substring
in sequences. The base case is a 0 length common substring, in which
case, return 0. Otherwise, return the sum of the current longest
common substring and the left & right recursed sums.
3. Multiply this length by 2 and divide by the sum of the lengths of
sequences.
https://en.wikipedia.org/wiki/Gestalt_Pattern_Matching
https://github.com/Yomguithereal/talisman/blob/master/src/metrics/ratcliff-obershelp.js
https://xlinux.nist.gov/dads/HTML/ratcliffObershelp.html
"""
def maximum(self, *sequences: str) -> int:
return 1
def _find(self, *sequences: str) -> int:
subseq = LCSStr()(*sequences)
length = len(subseq)
if length == 0:
return 0
before = [s[:s.find(subseq)] for s in sequences]
after = [s[s.find(subseq) + length:] for s in sequences]
return self._find(*before) + length + self._find(*after)
def __call__(self, *sequences: str) -> float:
result = self.quick_answer(*sequences)
if result is not None:
return result
scount = len(sequences) # sequences count
ecount = sum(map(len, sequences)) # elements count
sequences = self._get_sequences(*sequences)
return scount * self._find(*sequences) / ecount
lcsseq = LCSSeq()
lcsstr = LCSStr()
ratcliff_obershelp = RatcliffObershelp()

View File

@ -0,0 +1,127 @@
from __future__ import annotations
# built-in
from itertools import takewhile
from typing import Sequence
# app
from .base import Base as _Base, BaseSimilarity as _BaseSimilarity
from .types import SimFunc
__all__ = [
'Prefix', 'Postfix', 'Length', 'Identity', 'Matrix',
'prefix', 'postfix', 'length', 'identity', 'matrix',
]
class Prefix(_BaseSimilarity):
"""prefix similarity
"""
def __init__(self, qval: int = 1, sim_test: SimFunc = None) -> None:
self.qval = qval
self.sim_test = sim_test or self._ident
def __call__(self, *sequences: Sequence) -> Sequence:
if not sequences:
return ''
sequences = self._get_sequences(*sequences)
def test(seq):
return self.sim_test(*seq)
result = [c[0] for c in takewhile(test, zip(*sequences))]
s = sequences[0]
if isinstance(s, str):
return ''.join(result)
if isinstance(s, bytes):
return b''.join(result)
return result
def similarity(self, *sequences: Sequence) -> int:
return len(self(*sequences))
class Postfix(Prefix):
"""postfix similarity
"""
def __call__(self, *sequences: Sequence) -> Sequence:
s = sequences[0]
sequences = [list(reversed(s)) for s in sequences]
result = reversed(super().__call__(*sequences))
if isinstance(s, str):
return ''.join(result)
if isinstance(s, bytes):
return b''.join(result)
return list(result)
class Length(_Base):
"""Length distance
"""
def __call__(self, *sequences: Sequence) -> int:
lengths = list(map(len, sequences))
return max(lengths) - min(lengths)
class Identity(_BaseSimilarity):
"""Identity similarity
"""
def maximum(self, *sequences: Sequence) -> int:
return 1
def __call__(self, *sequences: Sequence) -> int:
return int(self._ident(*sequences))
class Matrix(_BaseSimilarity):
"""Matrix similarity
"""
def __init__(
self,
mat=None,
mismatch_cost: int = 0,
match_cost: int = 1,
symmetric: bool = True,
external: bool = True,
) -> None:
self.mat = mat
self.mismatch_cost = mismatch_cost
self.match_cost = match_cost
self.symmetric = symmetric
def maximum(self, *sequences: Sequence) -> int:
return self.match_cost
def __call__(self, *sequences: Sequence) -> int:
if not self.mat:
if self._ident(*sequences):
return self.match_cost
return self.mismatch_cost
# search in matrix
if sequences in self.mat:
return self.mat[sequences]
# search in symmetric matrix
if self.symmetric:
sequences = tuple(reversed(sequences))
if sequences in self.mat:
return self.mat[sequences]
# if identity then return match_cost
if self._ident(*sequences):
return self.match_cost
# not found
return self.mismatch_cost
prefix = Prefix()
postfix = Postfix()
length = Length()
identity = Identity()
matrix = Matrix()

View File

@ -0,0 +1,297 @@
from __future__ import annotations
# built-in
from functools import reduce
from itertools import islice, permutations, repeat
from math import log
from typing import Sequence
# app
from .base import Base as _Base, BaseSimilarity as _BaseSimilarity
from .edit_based import DamerauLevenshtein
__all__ = [
'Jaccard', 'Sorensen', 'Tversky',
'Overlap', 'Cosine', 'Tanimoto', 'MongeElkan', 'Bag',
'jaccard', 'sorensen', 'tversky', 'sorensen_dice',
'overlap', 'cosine', 'tanimoto', 'monge_elkan', 'bag',
]
class Jaccard(_BaseSimilarity):
"""
Compute the Jaccard similarity between the two sequences.
They should contain hashable items.
The return value is a float between 0 and 1, where 1 means equal,
and 0 totally different.
https://en.wikipedia.org/wiki/Jaccard_index
https://github.com/Yomguithereal/talisman/blob/master/src/metrics/jaccard.js
"""
def __init__(
self,
qval: int = 1,
as_set: bool = False,
external: bool = True,
) -> None:
self.qval = qval
self.as_set = as_set
self.external = external
def maximum(self, *sequences: Sequence) -> int:
return 1
def __call__(self, *sequences: Sequence) -> float:
result = self.quick_answer(*sequences)
if result is not None:
return result
sequences = self._get_counters(*sequences) # sets
intersection = self._intersect_counters(*sequences) # set
intersection = self._count_counters(intersection) # int
union = self._union_counters(*sequences) # set
union = self._count_counters(union) # int
return intersection / union
class Sorensen(_BaseSimilarity):
"""
Compute the Sorensen distance between the two sequences.
They should contain hashable items.
The return value is a float between 0 and 1, where 0 means equal,
and 1 totally different.
https://en.wikipedia.org/wiki/S%C3%B8rensen%E2%80%93Dice_coefficient
https://github.com/Yomguithereal/talisman/blob/master/src/metrics/dice.js
"""
def __init__(self, qval: int = 1, as_set: bool = False, external: bool = True) -> None:
self.qval = qval
self.as_set = as_set
self.external = external
def maximum(self, *sequences: Sequence) -> int:
return 1
def __call__(self, *sequences: Sequence) -> float:
result = self.quick_answer(*sequences)
if result is not None:
return result
sequences = self._get_counters(*sequences) # sets
count = sum(self._count_counters(s) for s in sequences)
intersection = self._intersect_counters(*sequences) # set
intersection = self._count_counters(intersection) # int
return 2.0 * intersection / count
class Tversky(_BaseSimilarity):
"""Tversky index
https://en.wikipedia.org/wiki/Tversky_index
https://github.com/Yomguithereal/talisman/blob/master/src/metrics/tversky.js
"""
def __init__(
self,
qval: int = 1,
ks: Sequence[float] = None,
bias: float | None = None,
as_set: bool = False,
external: bool = True,
) -> None:
self.qval = qval
self.ks = ks or repeat(1)
self.bias = bias
self.as_set = as_set
self.external = external
def maximum(self, *sequences: Sequence) -> int:
return 1
def __call__(self, *sequences: Sequence) -> float:
quick_result = self.quick_answer(*sequences)
if quick_result is not None:
return quick_result
sequences = self._get_counters(*sequences) # sets
intersection = self._intersect_counters(*sequences) # set
intersection = self._count_counters(intersection) # int
sequences = [self._count_counters(s) for s in sequences] # ints
ks = list(islice(self.ks, len(sequences)))
if len(sequences) != 2 or self.bias is None:
result = intersection
for k, s in zip(ks, sequences):
result += k * (s - intersection)
return intersection / result
s1, s2 = sequences
alpha, beta = ks
a_val = min([s1, s2])
b_val = max([s1, s2])
c_val = intersection + self.bias
result = alpha * beta * (a_val - b_val) + b_val * beta
return c_val / (result + c_val)
class Overlap(_BaseSimilarity):
"""overlap coefficient
https://en.wikipedia.org/wiki/Overlap_coefficient
https://github.com/Yomguithereal/talisman/blob/master/src/metrics/overlap.js
"""
def __init__(
self,
qval: int = 1,
as_set: bool = False,
external: bool = True,
) -> None:
self.qval = qval
self.as_set = as_set
self.external = external
def maximum(self, *sequences: Sequence) -> int:
return 1
def __call__(self, *sequences: Sequence) -> float:
result = self.quick_answer(*sequences)
if result is not None:
return result
sequences = self._get_counters(*sequences) # sets
intersection = self._intersect_counters(*sequences) # set
intersection = self._count_counters(intersection) # int
sequences = [self._count_counters(s) for s in sequences] # ints
return intersection / min(sequences)
class Cosine(_BaseSimilarity):
"""cosine similarity (Ochiai coefficient)
https://en.wikipedia.org/wiki/Cosine_similarity
https://github.com/Yomguithereal/talisman/blob/master/src/metrics/cosine.js
"""
def __init__(
self,
qval: int = 1,
as_set: bool = False,
external: bool = True,
) -> None:
self.qval = qval
self.as_set = as_set
self.external = external
def maximum(self, *sequences: Sequence) -> int:
return 1
def __call__(self, *sequences: Sequence) -> float:
result = self.quick_answer(*sequences)
if result is not None:
return result
sequences = self._get_counters(*sequences) # sets
intersection = self._intersect_counters(*sequences) # set
intersection = self._count_counters(intersection) # int
sequences = [self._count_counters(s) for s in sequences] # ints
prod = reduce(lambda x, y: x * y, sequences)
return intersection / pow(prod, 1.0 / len(sequences))
class Tanimoto(Jaccard):
"""Tanimoto distance
This is identical to the Jaccard similarity coefficient
and the Tversky index for alpha=1 and beta=1.
"""
def __call__(self, *sequences: Sequence) -> float:
result = super().__call__(*sequences)
if result == 0:
return float('-inf')
else:
return log(result, 2)
class MongeElkan(_BaseSimilarity):
"""
https://www.academia.edu/200314/Generalized_Monge-Elkan_Method_for_Approximate_Text_String_Comparison
http://www.cs.cmu.edu/~wcohen/postscript/kdd-2003-match-ws.pdf
https://github.com/Yomguithereal/talisman/blob/master/src/metrics/monge-elkan.js
"""
_damerau_levenshtein = DamerauLevenshtein()
def __init__(
self,
algorithm=_damerau_levenshtein,
symmetric: bool = False,
qval: int = 1,
external: bool = True,
) -> None:
self.algorithm = algorithm
self.symmetric = symmetric
self.qval = qval
self.external = external
def maximum(self, *sequences: Sequence) -> float:
result = self.algorithm.maximum(sequences)
for seq in sequences:
if seq:
result = max(result, self.algorithm.maximum(*seq))
return result
def _calc(self, seq, *sequences: Sequence) -> float:
if not seq:
return 0
maxes = []
for c1 in seq:
for s in sequences:
max_sim = float('-inf')
for c2 in s:
max_sim = max(max_sim, self.algorithm.similarity(c1, c2))
maxes.append(max_sim)
return sum(maxes) / len(seq) / len(maxes)
def __call__(self, *sequences: Sequence) -> float:
quick_result = self.quick_answer(*sequences)
if quick_result is not None:
return quick_result
sequences = self._get_sequences(*sequences)
if self.symmetric:
result = []
for seqs in permutations(sequences):
result.append(self._calc(*seqs))
return sum(result) / len(result)
else:
return self._calc(*sequences)
class Bag(_Base):
"""Bag distance
https://github.com/Yomguithereal/talisman/blob/master/src/metrics/bag.js
"""
def __call__(self, *sequences: Sequence) -> float:
sequences = self._get_counters(*sequences) # sets
intersection = self._intersect_counters(*sequences) # set
return max(self._count_counters(sequence - intersection) for sequence in sequences)
bag = Bag()
cosine = Cosine()
dice = Sorensen()
jaccard = Jaccard()
monge_elkan = MongeElkan()
overlap = Overlap()
sorensen = Sorensen()
sorensen_dice = Sorensen()
# sorensen_dice = Tversky(ks=[.5, .5])
tanimoto = Tanimoto()
tversky = Tversky()

View File

@ -0,0 +1,8 @@
# built-in
from typing import Callable, Optional, TypeVar
T = TypeVar('T')
SimFunc = Optional[Callable[[T, T], float]]
TestFunc = Optional[Callable[[T, T], bool]]

View File

@ -0,0 +1,112 @@
"""
IMPORTANT: it's just draft
"""
# built-in
from functools import reduce
from typing import Any
# app
from .base import Base as _Base, BaseSimilarity as _BaseSimilarity
try:
# external
import numpy
except ImportError:
numpy = None # type: ignore[assignment]
class Chebyshev(_Base):
def _numpy(self, s1, s2):
s1, s2 = numpy.asarray(s1), numpy.asarray(s2)
return max(abs(s1 - s2))
def _pure(self, s1, s2):
return max(abs(e1 - e2) for e1, e2 in zip(s1, s2))
def __call__(self, s1, s2) -> Any:
if numpy:
return self._numpy(s1, s2)
else:
return self._pure(s1, s2)
class Minkowski(_Base):
def __init__(self, p: int = 1, weight: int = 1) -> None:
if p < 1:
raise ValueError('p must be at least 1')
self.p = p
self.weight = weight
def _numpy(self, s1, s2):
s1, s2 = numpy.asarray(s1), numpy.asarray(s2)
result = (self.weight * abs(s1 - s2)) ** self.p
return result.sum() ** (1.0 / self.p)
def _pure(self, s1, s2):
result = (self.weight * abs(e1 - e2) for e1, e2 in zip(s1, s2))
result = sum(e ** self.p for e in result)
return result ** (1.0 / self.p)
def __call__(self, s1, s2) -> Any:
if numpy:
return self._numpy(s1, s2)
else:
return self._pure(s1, s2)
class Manhattan(_Base):
def __call__(self, s1, s2) -> Any:
raise NotImplementedError
class Euclidean(_Base):
def __init__(self, squared: bool = False) -> None:
self.squared = squared
def _numpy(self, s1, s2):
s1 = numpy.asarray(s1)
s2 = numpy.asarray(s2)
q = numpy.matrix(s1 - s2)
result = (q * q.T).sum()
if self.squared:
return result
return numpy.sqrt(result)
def _pure(self, s1, s2) -> None:
raise NotImplementedError
def __call__(self, s1, s2) -> Any:
if numpy:
return self._numpy(s1, s2)
else:
return self._pure(s1, s2)
class Mahalanobis(_Base):
def __call__(self, s1, s2) -> Any:
raise NotImplementedError
class Correlation(_BaseSimilarity):
def _numpy(self, *sequences):
sequences = [numpy.asarray(s) for s in sequences]
ssm = [s - s.mean() for s in sequences]
result = reduce(numpy.dot, sequences)
for sm in ssm:
result /= numpy.sqrt(numpy.dot(sm, sm))
return result
def _pure(self, *sequences):
raise NotImplementedError
def __call__(self, *sequences):
if numpy:
return self._numpy(*sequences)
else:
return self._pure(*sequences)
class Kulsinski(_BaseSimilarity):
def __call__(self, s1, s2) -> Any:
raise NotImplementedError

View File

@ -0,0 +1,139 @@
from __future__ import annotations
# built-in
import json
import math
from collections import defaultdict
from timeit import timeit
from typing import Iterable, Iterator, NamedTuple
# external
from tabulate import tabulate
# app
from .libraries import LIBRARIES_PATH, prototype
# python3 -m textdistance.benchmark
libraries = prototype.clone()
class Lib(NamedTuple):
algorithm: str
library: str
function: str
time: float
setup: str
@property
def row(self) -> tuple[str, ...]:
time = '' if math.isinf(self.time) else f'{self.time:0.05f}'
return (self.algorithm, self.library.split('.')[0], time)
INTERNAL_SETUP = """
from textdistance import {} as cls
func = cls(external=False)
"""
STMT = """
func('text', 'test')
func('qwer', 'asdf')
func('a' * 15, 'b' * 15)
"""
RUNS = 4000
class Benchmark:
@staticmethod
def get_installed() -> Iterator[Lib]:
for alg in libraries.get_algorithms():
for lib in libraries.get_libs(alg):
# try load function
if not lib.get_function():
print(f'WARNING: cannot get func for {lib}')
continue
# return library info
yield Lib(
algorithm=alg,
library=lib.module_name,
function=lib.func_name,
time=float('Inf'),
setup=lib.setup,
)
@staticmethod
def get_external_benchmark(installed: Iterable[Lib]) -> Iterator[Lib]:
for lib in installed:
time = timeit(
stmt=STMT,
setup=lib.setup,
number=RUNS,
)
yield lib._replace(time=time)
@staticmethod
def get_internal_benchmark() -> Iterator[Lib]:
for alg in libraries.get_algorithms():
setup = f'func = __import__("textdistance").{alg}(external=False)'
yield Lib(
algorithm=alg,
library='**textdistance**',
function=alg,
time=timeit(
stmt=STMT,
setup=setup,
number=RUNS,
),
setup=setup,
)
@staticmethod
def filter_benchmark(
external: Iterable[Lib],
internal: Iterable[Lib],
) -> Iterator[Lib]:
limits = {i.algorithm: i.time for i in internal}
return filter(lambda x: x.time < limits[x.algorithm], external)
@staticmethod
def get_table(libs: list[Lib]) -> str:
table = tabulate(
[lib.row for lib in libs],
headers=['algorithm', 'library', 'time'],
tablefmt='github',
)
table += f'\nTotal: {len(libs)} libs.\n\n'
return table
@staticmethod
def save(libs: Iterable[Lib]) -> None:
data = defaultdict(list)
for lib in libs:
data[lib.algorithm].append([lib.library, lib.function])
with LIBRARIES_PATH.open('w', encoding='utf8') as f:
json.dump(obj=data, fp=f, indent=2, sort_keys=True)
@classmethod
def run(cls) -> None:
print('# Installed libraries:\n')
installed = list(cls.get_installed())
installed.sort()
print(cls.get_table(installed))
print('# Benchmarks (with textdistance):\n')
benchmark = list(cls.get_external_benchmark(installed))
benchmark_internal = list(cls.get_internal_benchmark())
benchmark += benchmark_internal
benchmark.sort(key=lambda x: (x.algorithm, x.time))
print(cls.get_table(benchmark))
benchmark = list(cls.filter_benchmark(benchmark, benchmark_internal))
cls.save(benchmark)
if __name__ == '__main__':
Benchmark.run()

View File

@ -0,0 +1,80 @@
{
"DamerauLevenshtein": [
[
"rapidfuzz.distance.OSA",
"distance"
],
[
"rapidfuzz.distance.DamerauLevenshtein",
"distance"
],
[
"jellyfish",
"damerau_levenshtein_distance"
],
[
"pyxdameraulevenshtein",
"damerau_levenshtein_distance"
]
],
"Hamming": [
[
"Levenshtein",
"hamming"
],
[
"rapidfuzz.distance.Hamming",
"distance"
],
[
"jellyfish",
"hamming_distance"
],
[
"distance",
"hamming"
]
],
"Jaro": [
[
"rapidfuzz.distance.Jaro",
"similarity"
],
[
"jellyfish",
"jaro_similarity"
]
],
"JaroWinkler": [
[
"rapidfuzz.distance.JaroWinkler",
"similarity"
],
[
"jellyfish",
"jaro_winkler_similarity"
]
],
"Levenshtein": [
[
"rapidfuzz.distance.Levenshtein",
"distance"
],
[
"Levenshtein",
"distance"
],
[
"jellyfish",
"levenshtein_distance"
],
[
"pylev",
"levenshtein"
],
[
"distance",
"levenshtein"
]
]
}

View File

@ -0,0 +1,200 @@
from __future__ import annotations
# built-in
import json
from collections import defaultdict
from copy import deepcopy
from importlib import import_module
from pathlib import Path
from typing import Any, Callable, Sequence
LIBRARIES_PATH = Path(__file__).parent / 'libraries.json'
class LibrariesManager:
libs: defaultdict[str, list[LibraryBase]]
def __init__(self) -> None:
self.libs = defaultdict(list)
def register(self, alg: str, lib: LibraryBase) -> None:
"""Register new lib
"""
self.libs[alg].append(lib)
def optimize(self) -> None:
"""Sort algorithm implementations by speed.
"""
# load benchmarks results
with LIBRARIES_PATH.open('r', encoding='utf8') as f:
libs_data: dict = json.load(f)
# optimize
for alg, libs_names in libs_data.items():
libs = self.get_libs(alg)
if not libs:
continue
# drop slow libs
self.libs[alg] = [lib for lib in libs if [lib.module_name, lib.func_name] in libs_names]
# sort libs by speed
self.libs[alg].sort(key=lambda lib: libs_names.index([lib.module_name, lib.func_name]))
def get_algorithms(self) -> list[str]:
"""Get list of available algorithms.
"""
return list(self.libs.keys())
def get_libs(self, alg: str) -> list[LibraryBase]:
"""Get libs list for algorithm
"""
if alg not in self.libs:
return []
return self.libs[alg]
def clone(self) -> LibrariesManager:
"""Clone library manager prototype
"""
obj = self.__class__()
obj.libs = deepcopy(self.libs)
return obj
class LibraryBase:
func: Callable | None | Any = NotImplemented
def __init__(
self,
module_name: str,
func_name: str,
*,
presets: dict[str, Any] | None = None,
attr: str | None = None,
conditions: dict[str, bool] | None = None,
) -> None:
self.module_name = module_name
self.func_name = func_name
self.presets = presets
self.conditions = conditions
self.attr = attr
def check_conditions(self, obj: object, *sequences: Sequence) -> bool:
# external libs can compare only 2 strings
if len(sequences) != 2:
return False
if not self.conditions:
return True
for name, value in self.conditions.items():
if getattr(obj, name) != value:
return False
return True
def prepare(self, *sequences: Sequence) -> tuple:
return sequences
@property
def setup(self) -> str:
result = f'from {self.module_name} import {self.func_name} as func'
result += '\nfunc = func'
if self.presets is not None:
result += f'(**{repr(self.presets)})'
if self.attr is not None:
result += f'.{self.attr}'
return result
def get_function(self) -> Callable | None:
if self.func is NotImplemented:
# import module
try:
module = import_module(self.module_name)
except ImportError:
self.func = None
return None
# get object from module
obj = getattr(module, self.func_name)
# init class
if self.presets is not None:
obj = obj(**self.presets)
# get needed attribute
if self.attr is not None:
obj = getattr(obj, self.attr)
self.func = obj
return self.func
def __str__(self) -> str:
return f'{self.module_name}.{self.func_name}'
class TextLibrary(LibraryBase):
def check_conditions(self, obj: object, *sequences: Sequence) -> bool:
if not super().check_conditions(obj, *sequences):
return False
# compare only by letters
if getattr(obj, 'qval', 0) != 1:
return False
# every sequence must be string
for seq in sequences:
if type(seq) is not str:
return False
return True
def prepare(self, *sequences: Sequence) -> tuple:
# convert list of letters to string
if isinstance(sequences[0], (tuple, list)):
sequences = tuple(map(lambda x: ''.join(x), sequences))
return sequences
class SameLengthLibrary(LibraryBase):
def check_conditions(self, obj: object, *sequences: Sequence) -> bool:
if not super().check_conditions(obj, *sequences):
return False
# compare only same length iterators
if min(map(len, sequences)) != max(map(len, sequences)):
return False
return True
class SameLengthTextLibrary(SameLengthLibrary, TextLibrary):
pass
prototype = LibrariesManager()
reg = prototype.register
alg = 'DamerauLevenshtein'
reg(alg, LibraryBase('pyxdameraulevenshtein', 'damerau_levenshtein_distance', conditions=dict(restricted=True)))
reg(alg, TextLibrary('jellyfish', 'damerau_levenshtein_distance', conditions=dict(restricted=False)))
reg(alg, LibraryBase('rapidfuzz.distance.DamerauLevenshtein', 'distance', conditions=dict(restricted=False)))
reg(alg, LibraryBase('rapidfuzz.distance.OSA', 'distance', conditions=dict(restricted=True)))
alg = 'Hamming'
reg(alg, SameLengthLibrary('distance', 'hamming'))
reg(alg, SameLengthTextLibrary('Levenshtein', 'hamming'))
reg(alg, TextLibrary('jellyfish', 'hamming_distance'))
reg(alg, SameLengthLibrary('rapidfuzz.distance.Hamming', 'distance'))
alg = 'Jaro'
reg(alg, TextLibrary('jellyfish', 'jaro_similarity'))
reg(alg, LibraryBase('rapidfuzz.distance.Jaro', 'similarity'))
# reg(alg, TextLibrary('Levenshtein', 'jaro'))
# reg(alg, TextLibrary('py_stringmatching.similarity_measure.jaro', 'jaro'))
alg = 'JaroWinkler'
# reg(alg, LibraryBase('py_stringmatching.similarity_measure.jaro_winkler', 'jaro_winkler'))
reg(alg, TextLibrary('jellyfish', 'jaro_winkler_similarity', conditions=dict(winklerize=True)))
reg(alg, LibraryBase('rapidfuzz.distance.JaroWinkler', 'similarity', conditions=dict(winklerize=True)))
# https://github.com/life4/textdistance/issues/39
# reg(alg, TextLibrary('Levenshtein', 'jaro_winkler', conditions=dict(winklerize=True)))
alg = 'Levenshtein'
reg(alg, LibraryBase('distance', 'levenshtein'))
reg(alg, LibraryBase('pylev', 'levenshtein'))
reg(alg, TextLibrary('jellyfish', 'levenshtein_distance'))
reg(alg, TextLibrary('Levenshtein', 'distance'))
reg(alg, LibraryBase('rapidfuzz.distance.Levenshtein', 'distance'))
# reg(alg, TextLibrary('py_stringmatching.similarity_measure.levenshtein', 'levenshtein'))

View File

View File

@ -0,0 +1,28 @@
from __future__ import annotations
# built-in
from itertools import permutations, product
from typing import Sequence
__all__ = ['words_combinations', 'find_ngrams']
def words_combinations(f, *texts) -> float:
m = float('Inf')
# split by words
texts = [t.split() for t in texts]
# permutations
texts = [permutations(words) for words in texts]
# combinations
for subtexts in product(*texts):
if f.equality:
words_min_cnt = len(min(subtexts, key=len))
subtexts = [t[:words_min_cnt] for t in subtexts]
subtexts = [' '.join(t) for t in subtexts]
m = min(m, f(*subtexts))
return m
def find_ngrams(input_list: Sequence, n: int) -> list[tuple]:
return list(zip(*[input_list[i:] for i in range(n)]))

View File

@ -40,6 +40,7 @@ semver==3.0.2
signalrcore==0.9.5
simple-websocket==1.0.0
sqlalchemy==2.0.27
textdistance==4.6.2
unidecode==1.3.8
waitress==3.0.0
whichcraft==0.6.1