FIX: more general fixes for DDL option, IMP: Added DDL Queue so items hitting the DDL provider option will be queued in sequence for downloading & immediate post-processing thereafter

This commit is contained in:
evilhero 2019-01-17 13:22:36 -05:00
parent 464c8c69b1
commit 319d7f7f54
4 changed files with 124 additions and 60 deletions

View File

@ -123,10 +123,13 @@ USE_WATCHDIR = False
SNPOOL = None
NZBPOOL = None
SEARCHPOOL = None
PPPOOL = None
DDLPOOL = None
SNATCHED_QUEUE = Queue.Queue()
NZB_QUEUE = Queue.Queue()
PP_QUEUE = Queue.Queue()
SEARCH_QUEUE = Queue.Queue()
DDL_QUEUE = Queue.Queue()
SEARCH_TIER_DATE = None
COMICSORT = None
PULLBYFILE = False
@ -142,6 +145,7 @@ LOCAL_IP = None
DOWNLOAD_APIKEY = None
APILOCK = False
SEARCHLOCK = False
DDL_LOCK = False
CMTAGGER_PATH = None
STATIC_COMICRN_VERSION = "1.01"
STATIC_APC_VERSION = "2.04"
@ -162,11 +166,11 @@ def initialize(config_file):
with INIT_LOCK:
global CONFIG, _INITIALIZED, QUIET, CONFIG_FILE, OS_DETECT, MAINTENANCE, CURRENT_VERSION, LATEST_VERSION, COMMITS_BEHIND, INSTALL_TYPE, IMPORTLOCK, PULLBYFILE, INKDROPS_32P, \
DONATEBUTTON, CURRENT_WEEKNUMBER, CURRENT_YEAR, UMASK, USER_AGENT, SNATCHED_QUEUE, NZB_QUEUE, PP_QUEUE, SEARCH_QUEUE, PULLNEW, COMICSORT, WANTED_TAB_OFF, CV_HEADERS, \
DONATEBUTTON, CURRENT_WEEKNUMBER, CURRENT_YEAR, UMASK, USER_AGENT, SNATCHED_QUEUE, NZB_QUEUE, PP_QUEUE, SEARCH_QUEUE, DDL_QUEUE, PULLNEW, COMICSORT, WANTED_TAB_OFF, CV_HEADERS, \
IMPORTBUTTON, IMPORT_FILES, IMPORT_TOTALFILES, IMPORT_CID_COUNT, IMPORT_PARSED_COUNT, IMPORT_FAILURE_COUNT, CHECKENABLED, CVURL, DEMURL, WWTURL, WWT_CF_COOKIEVALUE, \
USE_SABNZBD, USE_NZBGET, USE_BLACKHOLE, USE_RTORRENT, USE_UTORRENT, USE_QBITTORRENT, USE_DELUGE, USE_TRANSMISSION, USE_WATCHDIR, SAB_PARAMS, \
PROG_DIR, DATA_DIR, CMTAGGER_PATH, DOWNLOAD_APIKEY, LOCAL_IP, STATIC_COMICRN_VERSION, STATIC_APC_VERSION, KEYS_32P, AUTHKEY_32P, FEED_32P, FEEDINFO_32P, \
MONITOR_STATUS, SEARCH_STATUS, RSS_STATUS, WEEKLY_STATUS, VERSION_STATUS, UPDATER_STATUS, DBUPDATE_INTERVAL, LOG_LANG, LOG_CHARSET, APILOCK, SEARCHLOCK, LOG_LEVEL, \
MONITOR_STATUS, SEARCH_STATUS, RSS_STATUS, WEEKLY_STATUS, VERSION_STATUS, UPDATER_STATUS, DBUPDATE_INTERVAL, LOG_LANG, LOG_CHARSET, APILOCK, SEARCHLOCK, DDL_LOCK, LOG_LEVEL, \
SCHED_RSS_LAST, SCHED_WEEKLY_LAST, SCHED_MONITOR_LAST, SCHED_SEARCH_LAST, SCHED_VERSION_LAST, SCHED_DBUPDATE_LAST, COMICINFO, SEARCH_TIER_DATE
cc = mylar.config.Config(config_file)
@ -367,6 +371,9 @@ def start():
search_diff = datetime.datetime.utcfromtimestamp(helpers.utctimestamp() + ((int(CONFIG.SEARCH_INTERVAL) * 60) - (duration_diff*60)))
logger.fdebug('[AUTO-SEARCH] Scheduling next run @ %s every %s minutes' % (search_diff, CONFIG.SEARCH_INTERVAL))
SCHED.add_job(func=ss.run, id='search', name='Auto-Search', next_run_time=search_diff, trigger=IntervalTrigger(hours=0, minutes=CONFIG.SEARCH_INTERVAL, timezone='UTC'))
else:
ss = searchit.CurrentSearcher()
SCHED.add_job(func=ss.run, id='search', name='Auto-Search', next_run_time=None, trigger=IntervalTrigger(hours=0, minutes=CONFIG.SEARCH_INTERVAL, timezone='UTC'))
if all([CONFIG.ENABLE_TORRENTS, CONFIG.AUTO_SNATCH, OS_DETECT != 'Windows']) and any([CONFIG.TORRENT_DOWNLOADER == 2, CONFIG.TORRENT_DOWNLOADER == 4]):
logger.info('[AUTO-SNATCHER] Auto-Snatch of completed torrents enabled & attempting to background load....')
@ -396,6 +403,12 @@ def start():
PPPOOL.start()
logger.info('[POST-PROCESS-QUEUE] Succesfully started Post-Processing Queuer....')
if CONFIG.ENABLE_DDL is True:
logger.info('[DDL-QUEUE] DDL Download queue enabled & monitoring for requests....')
DDLPOOL = threading.Thread(target=helpers.ddl_downloader, args=(DDL_QUEUE,), name="DDL-QUEUE")
DDLPOOL.start()
logger.info('[DDL-QUEUE] Succesfully started DDL Download Queuer....')
helpers.latestdate_fix()
if CONFIG.ALT_PULL == 2:
@ -1223,6 +1236,29 @@ def halt():
SEARCHPOOL.join(5)
except AssertionError:
os._exit(0)
if PPPOOL is not None:
logger.info('Terminating the post-processing queue thread.')
try:
PPPOOL.join(10)
logger.info('Joined pool for termination - successful')
except KeyboardInterrupt:
PP_QUEUE.put('exit')
PPPOOL.join(5)
except AssertionError:
os._exit(0)
if DDLPOOL is not None:
logger.info('Terminating the DDL download queue thread.')
try:
DDLPOOL.join(10)
logger.info('Joined pool for termination - successful')
except KeyboardInterrupt:
DDL_QUEUE.put('exit')
DDLPOOL.join(5)
except AssertionError:
os._exit(0)
_INITIALIZED = False
def shutdown(restart=False, update=False, maintenance=False):

View File

@ -17,7 +17,6 @@
from StringIO import StringIO
import urllib
from threading import Thread
from Queue import Queue
import os
import sys
import re
@ -28,14 +27,12 @@ import json
from bs4 import BeautifulSoup
import requests
import cfscrape
import logger
import mylar
from mylar import logger
class GC(object):
def __init__(self, query):
self.queue = Queue()
def __init__(self, query=None, issueid=None, comicid=None):
self.valreturn = []
@ -43,6 +40,10 @@ class GC(object):
self.query = query
self.comicid = comicid
self.issueid = issueid
self.local_filename = os.path.join(mylar.CONFIG.CACHE_DIR, "getcomics.html")
self.headers = {'Accept-encoding': 'gzip', 'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:40.0) Gecko/20100101 Firefox/40.1', 'Referer': 'https://getcomics.info/'}
@ -94,7 +95,7 @@ class GC(object):
option_find = option_find.findNext(text=True)
if 'Year' in option_find:
year = option_find.findNext(text=True)
year = re.sub('|', '', year).strip()
year = re.sub('\|', '', year).strip()
else:
size = option_find.findNext(text=True)
if 'MB' in size:
@ -118,10 +119,8 @@ class GC(object):
results['entries'] = resultlist
return results
#self.loadsite(title, link)
#self.parse_downloadresults(title)
def parse_downloadresults(self, title):
def parse_downloadresults(self, title, mainlink):
soup = BeautifulSoup(open(title+'.html'), 'html.parser')
orig_find = soup.find("p", {"style": "text-align: center;"})
@ -191,56 +190,61 @@ class GC(object):
if link is None:
logger.warn('Unable to retrieve any valid immediate download links. They might not exist.')
return
return {'success': False}
for x in links:
logger.fdebug('[%s] %s - %s' % (x['site'], x['volume'], x['link']))
thread_ = Thread(target=self.downloadit, args=[link])
thread_.start()
thread_.join()
chk = self.queue.get()
while True:
if chk[0]['mode'] == 'stop':
return {"filename": chk[0]['filename'],
"status": 'fail'}
elif chk[0]['mode'] == 'success':
try:
if os.path.isfile(os.path.join(mylar.CONFIG.DDL_LOCATION, chk[0]['filename'])):
logger.fdebug('Finished downloading %s [%s]' % (path, size))
except:
pass
return {"filename": chk[0]['filename'],
"status": 'success'}
mylar.DDL_QUEUE.put({'link': link,
'mainlink': mainlink,
'series': series,
'year': year,
'size': size,
'comicid': self.comicid,
'issueid': self.issueid})
return {'success': True}
def downloadit(self, link, mainlink):
if mylar.DDL_LOCK is True:
logger.fdebug('[DDL] Another item is currently downloading via DDL. Only one item can be downloaded at a time using DDL. Patience.')
return
else:
mylar.DDL_LOCK = True
def downloadit(self, link):
filename = None
try:
t = requests.get(link, verify=True, cookies=self.cf_cookievalue, headers=self.headers, stream=True)
with cfscrape.create_scraper() as s:
cf_cookievalue, cf_user_agent = s.get_tokens(mainlink, headers=self.headers)
t = s.get(link, verify=True, cookies=cf_cookievalue, headers=self.headers, stream=True)
filename = os.path.basename(urllib.unquote(t.url).decode('utf-8'))
filename = os.path.basename(urllib.unquote(t.url).decode('utf-8'))
path = os.path.join(mylar.CONFIG.DDL_LOCATION, filename)
path = os.path.join(mylar.CONFIG.DDL_LOCATION, filename)
if t.headers.get('content-encoding') == 'gzip': #.get('Content-Encoding') == 'gzip':
buf = StringIO(t.content)
f = gzip.GzipFile(fileobj=buf)
if t.headers.get('content-encoding') == 'gzip': #.get('Content-Encoding') == 'gzip':
buf = StringIO(t.content)
f = gzip.GzipFile(fileobj=buf)
with open(path, 'wb') as f:
for chunk in t.iter_content(chunk_size=1024):
if chunk: # filter out keep-alive new chunks
f.write(chunk)
f.flush()
with open(path, 'wb') as f:
for chunk in t.iter_content(chunk_size=1024):
if chunk: # filter out keep-alive new chunks
f.write(chunk)
f.flush()
except:
self.valreturn.append({"mode": "stop",
"filename": filename})
return self.queue.put(self.valreturn)
except exception as e:
logger.error('[ERROR] %s' % e)
mylar.DDL_LOCK = False
return ({"success": False,
"filename": filename,
"path": None})
else:
self.valreturn.append({"mode": "success",
"filename": filename})
return self.queue.put(self.valreturn)
mylar.DDL_LOCK = False
if os.path.isfile(path):
return ({"success": True,
"filename": filename,
"path": path})
def issue_list(self, pack):
#packlist = [x.strip() for x in pack.split(',)]

View File

@ -37,7 +37,7 @@ from apscheduler.triggers.interval import IntervalTrigger
import mylar
import logger
from mylar import sabnzbd, nzbget, process
from mylar import sabnzbd, nzbget, process, getcomics
def multikeysort(items, columns):
@ -3027,6 +3027,38 @@ def latestdate_update():
logger.info('updating latest date for : ' + a['ComicID'] + ' to ' + a['LatestDate'] + ' #' + a['LatestIssue'])
myDB.upsert("comics", newVal, ctrlVal)
def ddl_downloader(queue):
while True:
if mylar.DDL_LOCK is True:
time.sleep(5)
elif mylar.DDL_LOCK is False and queue.qsize() >= 1:
item = queue.get(True)
logger.info('Now loading request from DDL queue: %s (%s)' % item['series'])
if item == 'exit':
logger.info('Cleaning up workers for shutdown')
break
ddz = getcomics.GC()
ddzstat = ddz.downloadit(item['link'], item['mainlink'])
if all([ddzstat['success'] is True, mylar.CONFIG.POST_PROCESSING is True]):
logger.info('%s successfully downloaded - now initiating post-processing.' % (ddzstat['filename']))
try:
mylar.PP_QUEUE.put({'nzb_name': ddzstat['filename'],
'nzb_folder': ddzstat['path'],
'failed': False,
'issueid': item['issueid'],
'comicid': item['comicid'],
'apicall': True,
'ddl': True})
except Exception as e:
logger.info('process error: %s [%s]' %(e, ddzstat))
elif mylar.CONFIG.POST_PROCESSING is True:
logger.info('File successfully downloaded. Post Processing is not enabled - item retained here: %s' % os.path.join(ddzstat['path'],ddzstat['filename']))
else:
logger.info('[Status: %s] Failed to download: %s ' % (ddzstat['success'], ddzstat))
def postprocess_main(queue):
while True:
if mylar.APILOCK is True:

View File

@ -2298,20 +2298,12 @@ def searcher(nzbprov, nzbname, comicinfo, link, IssueID, ComicID, tmpprov, direc
sent_to = None
t_hash = None
if mylar.CONFIG.ENABLE_DDL is True and nzbprov == 'ddl':
ggc = getcomics.GC('nope')
ggc = getcomics.GC(issueid=IssueID, comicid=ComicID)
sendsite = ggc.loadsite(os.path.join(mylar.CONFIG.CACHE_DIR, 'getcomics-' + nzbid), link)
ddl_it = ggc.parse_downloadresults(os.path.join(mylar.CONFIG.CACHE_DIR, 'getcomics-' + nzbid))
ddl_it = ggc.parse_downloadresults(os.path.join(mylar.CONFIG.CACHE_DIR, 'getcomics-' + nzbid), link)
logger.info("ddl status response: %s" % ddl_it)
if ddl_it['status'] == 'success':
nzbname = ddl_it['filename']
logger.info('Successfully retrieved %s from DDL site. Now submitting for post-processing...' % (nzbname))
mylar.PP_QUEUE.put({'nzb_name': nzbname,
'nzb_folder': mylar.CONFIG.DDL_LOCATION,
'issueid': IssueID,
'failed': False,
'comicid': ComicID,
'apicall': True,
'ddl': True})
if ddl_it['success'] is True:
logger.info('Successfully snatched %s from DDL site. It is currently being queued to download in position %s' % (nzbname, mylar.DDL_QUEUE.qsize()))
else:
logger.info('Failed to retrieve %s from the DDL site.' %s (nzbname))
return "ddl-fail"