99 lines
3.1 KiB
Python
99 lines
3.1 KiB
Python
"""Scrape mastodon tag searches and feed into your search."""
|
|
import sys
|
|
import urllib.robotparser
|
|
import urllib.parse
|
|
|
|
import yaml
|
|
import requests
|
|
|
|
|
|
def fetch_toots(conf):
|
|
toots = set()
|
|
for instance in conf.get("instances", []):
|
|
for tag in conf.get("tags", []):
|
|
try:
|
|
uri = f"https://{instance}/tags/{tag}.json"
|
|
if _check_path_allowed(uri):
|
|
curr_posts = requests.get(
|
|
uri
|
|
).json()
|
|
except Exception as e:
|
|
print("Got some error fetching toots, continuing...")
|
|
print(e)
|
|
for post in curr_posts.get("orderedItems", []):
|
|
toots.add(post)
|
|
return toots
|
|
|
|
|
|
def search_for(toots):
|
|
for toot in toots:
|
|
headers = {
|
|
"Authorization": f"Bearer {config.get('own_bearer_token', '')}"
|
|
}
|
|
print(f"searching for {toot}...")
|
|
try:
|
|
requests.get(f"https://{config.get('own_instance', '')}/api/v2/search?type=statuses&resolve=true&q={toot}", headers=headers, timeout=30)
|
|
except Exception as e:
|
|
print(f"Searching for {toot} failed *shrug*")
|
|
print(e)
|
|
|
|
|
|
def _check_path_allowed(uri):
|
|
rp = urllib.robotparser.RobotFileParser()
|
|
scheme, netloc, _, _, _, _ = urllib.parse.urlparse(uri)
|
|
rp.set_url(f"{scheme}://{netloc}/robots.txt")
|
|
rp.read()
|
|
return rp.can_fetch('*', uri)
|
|
|
|
|
|
def __check_lst_of_str(a):
|
|
return bool(a) and isinstance(a, list) and all(isinstance(elem, str) for elem in a)
|
|
|
|
|
|
def __check_str(a):
|
|
return bool(a) and isinstance(a, str)
|
|
|
|
|
|
def __check_nonplain_host(a):
|
|
return 1 in [c in a for c in {':', '/'}]
|
|
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
config = yaml.full_load(open('config.yaml', encoding="utf-8").read())
|
|
except Exception as e:
|
|
print("Couldn't open config.yaml. Please check that"
|
|
"it exists and is readable")
|
|
print(e)
|
|
|
|
if not __check_lst_of_str(config.get('instances', [])):
|
|
print("*instances* configuration must be a list of strings")
|
|
sys.exit(-1)
|
|
if any(__check_nonplain_host(instance)
|
|
for instance in config.get('instances', [])):
|
|
print("all *instances* must be plain hostnames, no"
|
|
"paths or protcol")
|
|
sys.exit(-1)
|
|
|
|
if not __check_lst_of_str(config.get('tags', [])):
|
|
print("*tags* configuration must be a list of strings")
|
|
sys.exit(-1)
|
|
if any('#' in tag for tag in config.get('tags', '')):
|
|
print("*tags* must not contain # charactger")
|
|
sys.exit(-1)
|
|
|
|
if not __check_str(config.get('own_instance', "")):
|
|
print("*own_instance* configuration must be a string")
|
|
sys.exit(-1)
|
|
if __check_nonplain_host(config.get('own_instance', '')):
|
|
print("*own_instance* must only contain your instances hostname, no"
|
|
"paths or protocol")
|
|
sys.exit(-1)
|
|
|
|
if not __check_str(config.get('own_bearer_token', "")):
|
|
print("*own_bearer_token* configuration must be a string")
|
|
sys.exit(-1)
|
|
|
|
toots = fetch_toots(config)
|
|
search_for(toots)
|