from __future__ import absolute_import from apscheduler.jobstores.base import BaseJobStore, JobLookupError, ConflictingIdError from apscheduler.util import maybe_ref, datetime_to_utc_timestamp, utc_timestamp_to_datetime from apscheduler.job import Job try: import cPickle as pickle except ImportError: # pragma: nocover import pickle try: from rethinkdb import RethinkDB except ImportError: # pragma: nocover raise ImportError('RethinkDBJobStore requires rethinkdb installed') class RethinkDBJobStore(BaseJobStore): """ Stores jobs in a RethinkDB database. Any leftover keyword arguments are directly passed to rethinkdb's `RethinkdbClient `_. Plugin alias: ``rethinkdb`` :param str database: database to store jobs in :param str collection: collection to store jobs in :param client: a :class:`rethinkdb.net.Connection` instance to use instead of providing connection arguments :param int pickle_protocol: pickle protocol level to use (for serialization), defaults to the highest available """ def __init__(self, database='apscheduler', table='jobs', client=None, pickle_protocol=pickle.HIGHEST_PROTOCOL, **connect_args): super(RethinkDBJobStore, self).__init__() if not database: raise ValueError('The "database" parameter must not be empty') if not table: raise ValueError('The "table" parameter must not be empty') self.database = database self.table_name = table self.table = None self.client = client self.pickle_protocol = pickle_protocol self.connect_args = connect_args self.r = RethinkDB() self.conn = None def start(self, scheduler, alias): super(RethinkDBJobStore, self).start(scheduler, alias) if self.client: self.conn = maybe_ref(self.client) else: self.conn = self.r.connect(db=self.database, **self.connect_args) if self.database not in self.r.db_list().run(self.conn): self.r.db_create(self.database).run(self.conn) if self.table_name not in self.r.table_list().run(self.conn): self.r.table_create(self.table_name).run(self.conn) if 'next_run_time' not in self.r.table(self.table_name).index_list().run(self.conn): self.r.table(self.table_name).index_create('next_run_time').run(self.conn) self.table = self.r.db(self.database).table(self.table_name) def lookup_job(self, job_id): results = list(self.table.get_all(job_id).pluck('job_state').run(self.conn)) return self._reconstitute_job(results[0]['job_state']) if results else None def get_due_jobs(self, now): return self._get_jobs(self.r.row['next_run_time'] <= datetime_to_utc_timestamp(now)) def get_next_run_time(self): results = list( self.table .filter(self.r.row['next_run_time'] != None) # noqa .order_by(self.r.asc('next_run_time')) .map(lambda x: x['next_run_time']) .limit(1) .run(self.conn) ) return utc_timestamp_to_datetime(results[0]) if results else None def get_all_jobs(self): jobs = self._get_jobs() self._fix_paused_jobs_sorting(jobs) return jobs def add_job(self, job): job_dict = { 'id': job.id, 'next_run_time': datetime_to_utc_timestamp(job.next_run_time), 'job_state': self.r.binary(pickle.dumps(job.__getstate__(), self.pickle_protocol)) } results = self.table.insert(job_dict).run(self.conn) if results['errors'] > 0: raise ConflictingIdError(job.id) def update_job(self, job): changes = { 'next_run_time': datetime_to_utc_timestamp(job.next_run_time), 'job_state': self.r.binary(pickle.dumps(job.__getstate__(), self.pickle_protocol)) } results = self.table.get_all(job.id).update(changes).run(self.conn) skipped = False in map(lambda x: results[x] == 0, results.keys()) if results['skipped'] > 0 or results['errors'] > 0 or not skipped: raise JobLookupError(job.id) def remove_job(self, job_id): results = self.table.get_all(job_id).delete().run(self.conn) if results['deleted'] + results['skipped'] != 1: raise JobLookupError(job_id) def remove_all_jobs(self): self.table.delete().run(self.conn) def shutdown(self): self.conn.close() def _reconstitute_job(self, job_state): job_state = pickle.loads(job_state) job = Job.__new__(Job) job.__setstate__(job_state) job._scheduler = self._scheduler job._jobstore_alias = self._alias return job def _get_jobs(self, predicate=None): jobs = [] failed_job_ids = [] query = (self.table.filter(self.r.row['next_run_time'] != None).filter(predicate) # noqa if predicate else self.table) query = query.order_by('next_run_time', 'id').pluck('id', 'job_state') for document in query.run(self.conn): try: jobs.append(self._reconstitute_job(document['job_state'])) except Exception: self._logger.exception('Unable to restore job "%s" -- removing it', document['id']) failed_job_ids.append(document['id']) # Remove all the jobs we failed to restore if failed_job_ids: self.r.expr(failed_job_ids).for_each( lambda job_id: self.table.get_all(job_id).delete()).run(self.conn) return jobs def __repr__(self): connection = self.conn return '<%s (connection=%s)>' % (self.__class__.__name__, connection)