Source code for support.async

import os
import os.path
import sys
import functools
import time
import imp
import code
import platform
import threading
import collections
import weakref

import gevent
import gevent.pool
import gevent.socket
import gevent.threadpool
import gevent.greenlet
import faststat

from support import context

from support import ll

ml = ll.LLogger()


# TODO: investigate replacing curtime with nanotime
# (mostly used in Threadpool dispatch
if hasattr(time, "perf_counter"):
    curtime = time.perf_counter  # 3.3
elif platform.system() == "Windows":
    curtime = time.clock
else:
    curtime = time.time


class Greenlet(gevent.Greenlet):
[docs] ''' A subclass of gevent.Greenlet which adds additional members: - locals: a dict of variables that are local to the "spawn tree" of greenlets - spawner: a weak-reference back to the spawner of the greenlet - stacks: a record of the stack at which the greenlet was spawned, and ancestors ''' def __init__(self, f, *a, **kw): super(Greenlet, self).__init__(f, *a, **kw) spawner = self.spawn_parent = weakref.proxy(gevent.getcurrent()) if not hasattr(spawner, 'locals'): spawner.locals = {} self.locals = spawner.locals stack = [] cur = sys._getframe() while cur: stack.extend((cur.f_code, cur.f_lineno)) cur = cur.f_back self.stacks = (tuple(stack),) + getattr(spawner, 'stacks', ())[:10] def get_spawntree_local(name):
[docs] """ Essentially provides dynamic scope lookup for programming aspects that cross greenlet borders. Be wary of overusing this functionality, as it effectively constitutes mutating global state which can lead to race conditions and architectural problems. """ locals = getattr(gevent.getcurrent(), 'locals', None) if locals: return locals.get(name) return None def set_spawntree_local(name, val):
[docs] """ Similar to get_spawntree_local except that it allows setting these values. Again, be wary of overuse. """ cur = gevent.getcurrent() if not hasattr(cur, 'locals'): cur.locals = {} cur.locals[name] = val def staggered_retries(run, *a, **kw):
[docs] """ A version of spawn that will block will it is done running the function, and which will call the function repeatedly as time progresses through the timeouts list. Best used for idempotent network calls (e.g. HTTP GETs). e.g. user_data = async.staggered_retries(get_data, max_results, latent_data_ok, public_credential_load, timeouts_secs=[0.1, 0.5, 1, 2]) returns None on timeout. """ ctx = context.get_context() ready = gevent.event.Event() ready.clear() def callback(source): if source.successful(): ready.set() if 'timeouts_secs' in kw: timeouts_secs = kw.pop('timeouts_secs') else: timeouts_secs = [0.05, 0.1, 0.15, 0.2] if timeouts_secs[0] > 0: timeouts_secs.insert(0, 0) gs = gevent.spawn(run, *a, **kw) gs.link_value(callback) running = [gs] for i in range(1, len(timeouts_secs)): this_timeout = timeouts_secs[i] - timeouts_secs[i - 1] if ctx.dev: this_timeout = this_timeout * 5.0 ml.ld2("Using timeout {0}", this_timeout) try: with gevent.Timeout(this_timeout): ready.wait() break except gevent.Timeout: ml.ld2("Timed out!") log_rec = ctx.log.critical('ASYNC.STAGGER', run.__name__) log_rec.failure('timed out after {timeout}', timeout=this_timeout) gs = gevent.spawn(run, *a, **kw) gs.link_value(callback) running.append(gs) vals = [l.value for l in running if l.successful()] for g in running: g.kill() if vals: return vals[0] else: return None def timed(f):
[docs] ''' wrap a function and time all of its execution calls in milliseconds ''' fname = os.path.basename(f.__code__.co_filename) or '_' line_no = repr(f.__code__.co_firstlineno) name = 'timed.{0}[{1}:{2}](ms)'.format(f.__name__, fname, line_no) @functools.wraps(f) def g(*a, **kw): s = faststat.nanotime() r = f(*a, **kw) context.get_context().stats[name].add((faststat.nanotime() - s) / 1e6) return r return g class ThreadPoolDispatcher(object):
[docs] def __init__(self, pool, name): self.pool = pool self.name = name def __call__(self, f): @functools.wraps(f) def g(*a, **kw): enqueued = curtime() ctx = context.get_context() started = [] def in_thread(*a, **kw): ml.ld3("In thread {0}", f.__name__) started.append(curtime()) return f(*a, **kw) # some modules import things lazily; it is too dangerous # to run a function in another thread if the import lock is # held by the current thread (this happens rarely -- only # if the thread dispatched function is being executed at # the import time of a module) if not ctx.cpu_thread_enabled or imp.lock_held(): ret = in_thread(*a, **kw) elif in_threadpool() is self.pool: ret = in_thread(*a, **kw) else: ctx.stats[self.name + '.depth'].add(1 + len(self.pool)) ret = self.pool.apply_e((Exception,), in_thread, a, kw) ml.ld3("Enqueued to thread {0}/depth {1}", f.__name__, len(pool)) start = started[0] duration = curtime() - start queued = start - enqueued if hasattr(ret, '__len__') and callable(ret.__len__): prsize = ret.__len__() # parameter-or-return size elif a and hasattr(a[0], '__len__') and callable(a[0].__len__): prsize = a[0].__len__() else: prsize = None _queue_stats(name, f.__name__, queued, duration, prsize) return ret g.no_defer = f return g def in_threadpool():
[docs] 'function to return the threadpool in which code is currently executing (if any)' frame = sys._getframe() while frame.f_back: frame = frame.f_back self = frame.f_locals.get('self') if (isinstance(self, gevent.threadpool.ThreadPool) and frame.f_code is getattr(getattr(self._worker, "im_func"), "func_code")): return self return None class CPUThread(object):
[docs] ''' Manages a single worker thread to dispatch cpu intensive tasks to. Signficantly less overhead than gevent.threadpool.ThreadPool() since it uses prompt notifications rather than polling. The trade-off is that only one thread can be managed this way. Since there is only one thread, hub.loop.async() objects may be used instead of polling to handle inter-thread communication. ''' def __init__(self): self.in_q = collections.deque() self.out_q = collections.deque() self.in_async = None self.out_async = gevent.get_hub().loop.async() self.out_q_has_data = gevent.event.Event() self.out_async.start(self.out_q_has_data.set) self.worker = threading.Thread(target=self._run) self.worker.daemon = True self.stopping = False self.results = {} # start running thread / greenlet after everything else is set up self.worker.start() self.notifier = gevent.spawn(self._notify) def _run(self): # in_cpubound_thread is sentinel to prevent double thread dispatch context.get_context().thread_locals.in_cpubound_thread = True try: self.in_async = gevent.get_hub().loop.async() self.in_q_has_data = gevent.event.Event() self.in_async.start(self.in_q_has_data.set) while not self.stopping: if not self.in_q: # wait for more work self.in_q_has_data.clear() self.in_q_has_data.wait() continue # arbitrary non-preemptive service discipline can go here # FIFO for now, but we should experiment with others jobid, func, args, kwargs, enqueued = self.in_q.popleft() started = curtime() try: ret = self.results[jobid] = func(*args, **kwargs) except Exception as e: ret = self.results[jobid] = self._Caught(e) self.out_q.append(jobid) self.out_async.send() # keep track of some statistics queued, duration = started - enqueued, curtime() - started size = None # ret s set up above before async send if hasattr(ret, '__len__') and callable(ret.__len__): size = len(ret) _queue_stats('cpu_bound', func.__name__, queued, duration, size) except: self._error() # this may always halt the server process def apply(self, func, args, kwargs):
[docs] done = gevent.event.Event() self.in_q.append((done, func, args, kwargs, curtime())) context.get_context().stats['cpu_bound.depth'].add(1 + len(self.in_q)) while not self.in_async: gevent.sleep(0.01) # poll until worker thread has initialized self.in_async.send() done.wait() res = self.results[done] del self.results[done] if isinstance(res, self._Caught): raise res.err return res def _notify(self):
try: while not self.stopping: if not self.out_q: # wait for jobs to complete self.out_q_has_data.clear() self.out_q_has_data.wait() continue self.out_q.popleft().set() except: self._error() class _Caught(object): def __init__(self, err): self.err = err def __repr__(self): cn = self.__class__.__name__ return ("<%s@%s in_q:%s out_q:%s>" % (cn, id(self), len(self.in_q), len(self.out_q))) def _error(self): # TODO: something better, but this is darn useful for debugging import traceback traceback.print_exc() ctx = context.get_context() tl = ctx.thread_locals if hasattr(tl, 'cpu_bound_thread') and tl.cpu_bound_thread is self: del tl.cpu_bound_thread def _queue_stats(qname, fname, queued_ns, duration_ns, size_B=None):
ctx = context.get_context() fprefix = qname + '.' + fname ctx.stats[fprefix + '.queued(ms)'].add(queued_ns * 1000) ctx.stats[fprefix + '.duration(ms)'].add(duration_ns * 1000) ctx.stats[qname + '.queued(ms)'].add(queued_ns * 1000) ctx.stats[qname + '.duration(ms)'].add(duration_ns * 1000) if size_B is not None: ctx.stats[fprefix + '.len'].add(size_B) if duration_ns: # may be 0 ctx.stats[fprefix + '.rate(B/ms)'].add(size_B / (duration_ns * 1000.0)) # TODO: make size configurable io_bound = ThreadPoolDispatcher(gevent.threadpool.ThreadPool(10), 'io_bound') # N.B. In many cases fcntl could be used as an alternative method of # achieving non-blocking file io on unix systems def cpu_bound(f, p=None):
[docs] ''' Cause the decorated function to have its execution deferred to a separate thread to avoid blocking the IO loop in the main thread. Useful for wrapping encryption or serialization tasks. Example usage: @async.cpu_bound def my_slow_function(): pass ''' @functools.wraps(f) def g(*a, **kw): ctx = context.get_context() # in_cpubound_thread is sentinel to prevent double-thread dispatch if (not ctx.cpu_thread_enabled or imp.lock_held() or getattr(ctx.thread_locals, 'in_cpubound_thread', False)): return f(*a, **kw) if not hasattr(ctx.thread_locals, 'cpu_bound_thread'): ctx.thread_locals.cpu_bound_thread = CPUThread() ml.ld3("Calling in cpu thread {0}", f.__name__) return ctx.thread_locals.cpu_bound_thread.apply(f, a, kw) g.no_defer = f return g def cpu_bound_if(p):
[docs] ''' Similar to cpu_bound, but should be called with a predicate parameter which determines whether or not to dispatch to a cpu_bound thread. The predicate will be passed the same parameters as the function itself. Example usage: # will be deferred to a thread if parameter greater than 16k, # else run inline @async.cpu_bound_if(lambda s: len(s) > 16 * 1024) def my_string_function(data): pass ''' def g(f): f = cpu_bound(f) @functools.wraps(f) def h(*a, **kw): if p(*a, **kw): return f(*a, **kw) return f.no_defer(*a, **kw) return h return g def close_threadpool():
[docs] tlocals = context.get_context().thread_locals if hasattr(tlocals, 'cpu_bound_thread'): ml.ld2("Closing thread pool {0}", id(tlocals.cpu_thread)) cpu_thread = tlocals.cpu_bound_thread cpu_thread.stopping = True del tlocals.cpu_bound_thread def killsock(sock):
[docs] """Attempts to cleanly shutdown a socket. Regardless of cleanliness, ensures that upon return, the socket is fully closed, catching any exceptions along the way. A safe and prompt way to dispose of the socket, freeing system resources. """ if hasattr(sock, '_sock'): ml.ld("Killing socket {0}/FD {1}", id(sock), sock._sock.fileno()) else: ml.ld("Killing socket {0}", id(sock)) try: # TODO: better ideas for how to get SHUT_RDWR constant? sock.shutdown(gevent.socket.SHUT_RDWR) except gevent.socket.error: pass # just being nice to the server, don't care if it fails except Exception as e: log_rec = context.get_context().log.info("SOCKET", "SHUTDOWN") log_rec.failure('error ({exc}) shutting down socket: {socket}', socket=sock, exc=e) try: sock.close() except gevent.socket.error: pass # just being nice to the server, don't care if it fails except Exception as e: log_rec = context.get_context().log.info("SOCKET", "CLOSE") log_rec.failure('error ({exc}) closing socket: {socket}', socket=sock, exc=e) PID = os.getpid()
def check_fork(fn):
[docs] """Hack for Django/gevent interaction to reset after non-gevent fork""" @functools.wraps(fn) def wrapper(request): global PID if PID != os.getpid(): gevent.get_hub().loop.reinit() PID = os.getpid() return fn(request) return wrapper # a little helper for running a greenlet-friendly console # implemented here since it directly references gevent class GreenConsole(code.InteractiveConsole):
[docs] @io_bound def raw_input(self, prompt=""):
[docs] return code.InteractiveConsole.raw_input(self, prompt) def start_repl(local=None, banner="infra REPL (exit with Ctrl+C)"):
[docs] gevent.spawn(GreenConsole().interact, banner) def greenify(banner="REPL is now greenlet friendly (exit with Ctrl+C)"):
[docs] import __main__ GreenConsole(__main__.__dict__).interact(banner) # The following are imported/aliased for user import convenience spawn = Greenlet.spawn
sleep = gevent.sleep Timeout = gevent.Timeout with_timeout = gevent.with_timeout nanotime = faststat.nanotime # end user imports/aliases