#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
#
# Copyright 2012 ShopWiki
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from datetime import timedelta, datetime
from functools import partial
from threading import Lock
from weakref import WeakValueDictionary
import numbers
import uuid
from tornado import gen
from tornado.gen import Task, Callback, Wait
from tornado.ioloop import IOLoop
from tornado import stack_context
import pika
import simplejson as json
from chu.connection import AsyncRabbitConnectionBase
import logging
logger = logging.getLogger(__name__)
[docs]class RPCTimeoutError(Exception):
'''
Raised when a call to :meth:`.RPCResponseFuture.wait` times out.
'''
pass
class RPCErrorResponse(object):
pass
[docs]class RPCRequest(object):
'''
A wrapper object around the exchange, routing_key, and a dictionary
of parameters that will be json encoded into the body of the rabbit
message.
:class:`RPCRequest` is hashable and can therefore be used as a key
in a dictionary.
'''
def __init__(self, exchange, routing_key, params, timeout=None):
self.exchange = exchange
self.routing_key = routing_key
self.params = params
self.json_params = json.dumps(params, sort_keys=True)
# if timeout is a number, treat it as seconds
self.timeout = timeout
def __hash__(self):
if not hasattr(self, '_hash'):
routing_key_hash = hash(self.routing_key)
params_hash = hash(self.json_params)
self._hash = hash((routing_key_hash, params_hash))
return self._hash
[docs]class RPCResponseFuture(object):
'''
A convenience object for using :class:`AsyncTornadoRPCClient` in a
non-blocking manner within a Tornado handler. Calls to
:meth:`~AsyncTornadoRPCClient.rpc` will return an instance of
:class:`RPCResponseFuture`. The user may then call
:meth:`RPCResponseFuture.get` in order to retrieve the result of the
RPC. Using :class:`tornado.gen.Task` is recommended for calling
:meth:`~RPCResponseFuture.get`.
'''
def __init__(self, cid, timeout=None, io_loop=None):
self.cid = cid
self.response_received = False
self.response = None
self.timeout = timeout
self.timed_out = False
self.wait_callback = None
self.io_loop = io_loop
if not self.io_loop:
self.io_loop = IOLoop.instance()
self.init_time = datetime.now()
self.get_time = None
self.timeout_time = None
@gen.engine
[docs] def get(self, callback, timeout=None):
'''
Wait for the RPC associated with this :class:`RPCResponseFuture`
to return a result. When the result is received, resolve the
task by calling the passed in ``callback``.
:param callback: The callback that will be called with the RPC
response upon completion of the RPC. It is recommended that
this not be passed in directly, but rather that
:meth:`~.get` be called as a function passed to
:class:`tornado.gen.Task`.
:param timeout: The amount of time to wait before raising an
:exc:`RPCTimeoutError` to indicate that the RPC has timed
out. This can be a number or a :class:`timedelta`
object. If it is a number, it will be treated as
seconds.
'''
self.get_time = datetime.now()
if self.response_received:
logger.info('Response has already been received, return '
'the value immediately.')
callback(self.response)
else:
callback = stack_context.wrap(callback)
if self.timeout and not timeout:
timeout = self.timeout
elif not self.timeout and not timeout:
timeout = timedelta(seconds=6)
key = uuid.uuid4()
self.wait_callback = yield gen.Callback(key)
self.wait_callback = stack_context.wrap(self.wait_callback)
logger.info('Response has not been received yet. Adding '
'timeout to the io_loop in case the response '
'times out.')
if isinstance(timeout, numbers.Real):
timeout = timedelta(seconds=timeout)
self.io_loop.add_timeout(timeout,
stack_context.wrap(self.timeout_callback))
logger.info('Waiting for the response.')
yield gen.Wait(key)
if self.timed_out:
raise RPCTimeoutError('Future waiting for message with cid: '
'"%s" timed out' % str(self.cid))
elif self.response_received:
logger.info('Response received successfully.')
callback(self.response)
else:
raise Exception("Neither timed out nor response received")
def response_callback(self, response):
logger.info('Response callback called.')
self.response_received = True
self.response = response
logger.info('Time between init and response: %s'
% (datetime.now() - self.init_time))
if self.wait_callback:
logger.info('The wait callback has been constructed, '
'the user is blocking on the response.')
self.wait_callback()
else:
logger.info('The wait callback has not been constructed, ',
'the user has not called get() yet.')
def timeout_callback(self):
self.timeout_time = datetime.now()
if not self.response_received:
logger.error('The response timeout was called before the'
' response was received. cid: %s. Difference'
' between init and timeout time was: %s'
% (self.cid, self.timeout_time - self.init_time))
self.timed_out = True
self.wait_callback()
else:
logger.info('The response timeout was called after the '
'response was received.')
class RPCResponse(object):
def __init__(self, channel, method, header, body):
self.channel = channel
self.method = method
self.header = header
self.body_json = body
_body = None
@property
def body(self):
if not self._body:
self._body = json.loads(self.body_json, use_decimal=True)
return self._body
class AsyncSimpleConsumer(AsyncRabbitConnectionBase):
@gen.engine
def consume_queue(self, queue):
yield gen.Task(self.basic_consume,
consumer_callback=self.consume_message,
queue=queue)
def consume_message(self, channel, method, properties, body):
raise NotImplemented('consume_message should be implemented '
'by subclasses of AsyncSimpleConsumer.')
[docs]class AsyncTornadoRPCClient(AsyncRabbitConnectionBase):
"""
Wrap `pika.adapters.tornado_connection.TornadoConnection`
to provide a simple RPC client powered by ``tornado.gen.engine``
semantics.
"""
def __init__(self, *args, **kwargs):
self.declare_rpc_queue_lock = Lock()
self.connection_open_callbacks = []
self.rpc_queue_callbacks = []
self.rpc_queue = None
self.futures = WeakValueDictionary()
super(AsyncTornadoRPCClient, self).__init__(*args, **kwargs)
def on_closed(self, connection):
logger.warning('AsyncRabbitClient.on_close: closed!')
self.rpc_queue = None
@gen.engine
def rpc_queue_declare(self, callback, **kwargs):
if not self.declare_rpc_queue_lock.acquire(False):
logger.info('RPC Queue is already in the process of '
'being declared (declare_rpc_queue_lock '
'could not be acquired).')
callback()
return
try:
self.rpc_queue = yield gen.Task(self.queue_declare,
exclusive=True,
auto_delete=True)
yield gen.Task(self.basic_consume, queue=self.rpc_queue)
logger.info('Adding callbacks that are waiting for an RPC '
'queue to the tornado queue.')
while self.rpc_queue_callbacks:
cb = self.rpc_queue_callbacks.pop()
self.io_loop.add_callback(cb)
logger.info('Done adding callbacks.')
finally:
self.declare_rpc_queue_lock.release()
callback()
@gen.engine
def ensure_rpc_queue(self, callback):
logger.info('Ensuring that an RPC queue has been declared.')
yield Task(self.ensure_connection)
if self.rpc_queue:
logger.info('The RPC queue is already open.')
callback()
else:
logger.info('Adding callback to list of callbacks '
'waiting for the RPC queue to be open.')
callback = stack_context.wrap(callback)
self.rpc_queue_callbacks.append(callback)
logger.info('Calling rpc_queue_declare().')
rpc_queue = yield gen.Task(self.rpc_queue_declare)
logger.info('rpc_queue_declare has been called.')
@gen.engine
[docs] def rpc(self, rpc_request, properties=None, callback=None):
'''
Publish an RPC request. Returns a :class:`RPCResponseFuture`.
:param rpc_request: An instance of :class:`RPCRequest`.
'''
callback = stack_context.wrap(callback)
yield Task(self.ensure_connection)
yield Task(self.ensure_rpc_queue)
if not properties:
correlation_id = str(uuid.uuid4())
properties = pika.BasicProperties(reply_to=self.rpc_queue,
correlation_id=correlation_id)
logger.info('Publishing RPC request with key: %s' %
rpc_request.routing_key)
self.channel.basic_publish(exchange=rpc_request.exchange,
routing_key=rpc_request.routing_key,
body=rpc_request.json_params,
properties=properties)
logger.info('Constructing RPC response future with cid: %s' %
correlation_id)
future = RPCResponseFuture(correlation_id,
timeout=rpc_request.timeout,
io_loop=self.io_loop)
self.futures[correlation_id] = future
callback(future)
@gen.engine
def basic_publish(self, rpc_request, properties=None, callback=None):
yield Task(self.ensure_connection)
yield Task(self.ensure_rpc_queue)
if not properties:
properties = pika.BasicProperties()
logger.info('Publishing message request with key: %s' %
rpc_request.routing_key)
self.channel.basic_publish(exchange=rpc_request.exchange,
routing_key=rpc_request.routing_key,
body=rpc_request.json_params,
properties=properties)
logger.info('channel.basic_publish finished.')
callback()
def consume_message(self, channel, method, header, body):
logger.info('RPC response consumed')
cid = header.correlation_id
try:
future = self.futures.pop(cid)
response = RPCResponse(channel, method, header, body)
cb = partial(future.response_callback, response)
self.io_loop.add_callback(cb)
except KeyError:
logger.warning('AsyncRabbitClient.consume_message received an'
' unrecognized correlation_id: %s. Maybe the'
' RPC took too long and was timed out, or maybe'
' the response was sent more than once.' % cid)