Source code for intessa.api

from functools import wraps

from urecord import Record
from urlobject import URLObject

from conneg.accept import make_accept_header
from conneg.default import DEFAULT_REGISTER
from http import urllib3_http_backend
from utils.caseless_dict import CaselessDictionary

__all__ = ['API']


def replaces_url(meth):
    """Wrap a method on :class:`API` to return a new API with a new URL."""

    @wraps(meth)
    def wrapper(self, *args, **kwargs):
        url = meth(self, *args, **kwargs)
        # If the method is already reflexive, return the result directly.
        if isinstance(url, type(self)):
            return url
        return self._with_url(url)
    return wrapper


# We're using Record here for efficiency and immutability.
[docs]class API(Record('url', 'http')): """ Represents a remote HTTP-accessible resource (and operations thereon). Example usage:: >>> api = API(u'https://graph.facebook.com/') >>> api <intessa.API(u'https://graph.facebook.com/')> Build paths using attribute access or subscription syntax:: >>> api[u'19292868552'] <intessa.API(u'https://graph.facebook.com/19292868552')> >>> api.cocacola <intessa.API(u'https://graph.facebook.com/cocacola')> >>> api[u'btaylor'][u'picture'] <intessa.API(u'https://graph.facebook.com/btaylor/picture')> Perform requests and receive :class:`responses <intessa.response.Response>` by calling the object:: >>> api[u'19292868552']() # doctest: +SKIP <intessa.Response[200, text/javascript]> """ def __new__(cls, url, http=urllib3_http_backend): if not isinstance(url, URLObject): url = URLObject(url) return super(API, cls).__new__(cls, url, http) def __repr__(self): return '<intessa.API(%r)>' % (unicode(self),) def __unicode__(self): return unicode(self.url) def __eq__(self, other): return unicode(self) == other def __hash__(self): return hash(unicode(self.url)) @replaces_url
[docs] def __add__(self, other): """ Simple string concatenation is reflexive. >>> API(u'http://example.com/') + u'foo' <intessa.API(u'http://example.com/foo')> """ return unicode(self) + other
@replaces_url
[docs] def add_query_params(self, *args, **kwargs): """ Add parameters to this URL. Accepts dictionaries, lists of pairs, and keyword arguments: >>> API(u'http://example.com/').add_query_params({'a': 'b'}) <intessa.API(u'http://example.com/?a=b')> >>> API(u'http://example.com/').add_query_params(key='value') <intessa.API(u'http://example.com/?key=value')> Multiple additions of the same parameter will keep all of them, in the order they were given: >>> API(u'http://example.com/').add_query_params([ ... ('a', 'b'), ('a', 'c')]) <intessa.API(u'http://example.com/?a=b&a=c')> """ return self.url.add_query_params(*args, **kwargs)
@replaces_url
[docs] def add_query_param(self, key, value=None): """ Add a single parameter to this URL. Accepts a key and a value: >>> API(u'http://example.com/').add_query_param('a', 'b') <intessa.API(u'http://example.com/?a=b')> >>> API(u'http://example.com/').add_query_param('a', '') <intessa.API(u'http://example.com/?a=')> >>> API(u'http://example.com/').add_query_param('a') <intessa.API(u'http://example.com/?a')> """ return self.url.add_query_param(key, value)
@replaces_url
[docs] def set_query_params(self, *args, **kwargs): """ Add parameters to this URL, replacing existing parameters. Accepts dictionaries, lists of pairs, and keyword arguments: >>> API(u'http://example.com/?a=foo').set_query_params({'a': 'b'}) <intessa.API(u'http://example.com/?a=b')> >>> API(u'http://example.com/?key=oldvalue').set_query_params( \\ ... key='newvalue') <intessa.API(u'http://example.com/?key=newvalue')> Multiple additions of the same parameter will replace existing values: >>> API(u'http://example.com/') \\ ... .set_query_params({'a': 'b'}) \\ ... .set_query_params({'a': 'c'}) <intessa.API(u'http://example.com/?a=c')> """ return self.url.set_query_params(*args, **kwargs)
@replaces_url
[docs] def set_query_param(self, key, value=None): """ Add a single parameter to this URL, replacing it if it already exists. Accepts a key and a value: >>> API(u'http://example.com/?a=foo').set_query_param('a', 'b') <intessa.API(u'http://example.com/?a=b')> >>> API(u'http://example.com/?a=foo').set_query_param('a', '') <intessa.API(u'http://example.com/?a=')> >>> API(u'http://example.com/?a=foo').set_query_param('a') <intessa.API(u'http://example.com/?a')> """ return self.url.set_query_param(key, value)
@replaces_url
[docs] def __getattr__(self, attr): """ Dynamic attribute access adds path components to the current URL. >>> API(u'http://example.com/').users <intessa.API(u'http://example.com/users')> """ return self[attr]
@replaces_url
[docs] def __getitem__(self, item): """ Subscription syntax is overridden to extend or modify the current URL. Example usage:: >>> api = API(u'http://example.com/') In the simplest case, add a path component:: >>> api['foo'] <intessa.API(u'http://example.com/foo')> >>> api['foo']['bar'] <intessa.API(u'http://example.com/foo/bar')> Replace the whole path by prefixing with a '/':: >>> api['foo']['bar']['/baz'] <intessa.API(u'http://example.com/baz')> Add or modify a fragment identifier:: >>> api['#foo'] <intessa.API(u'http://example.com/#foo')> >>> api['#foo']['#bar'] <intessa.API(u'http://example.com/#bar')> Add or modify a file extension:: >>> api['foo']['.json'] <intessa.API(u'http://example.com/foo.json')> >>> api['foo']['.json']['.xml'] <intessa.API(u'http://example.com/foo.xml')> Adding a file extension to a non-leaf node will add an 'index' component:: >>> api['.json'] <intessa.API(u'http://example.com/index.json')> Any necessary escaping will be taken care of:: >>> api['foo bar baz'] <intessa.API(u'http://example.com/foo%20bar%20baz')> >>> api['#foo bar#baz'] <intessa.API(u'http://example.com/#foo%20bar%23baz')> >>> api['foo']['.js o%n'] <intessa.API(u'http://example.com/foo.js%20o%25n')> """ if not isinstance(item, basestring): item = unicode(item) # Replace whole path with a new one. if item.startswith('/'): return self.url.with_path(item[1:]) # Add/replace fragment identifier. if item.startswith('#'): return self.url.with_fragment(item[1:]) # Add/replace file extension. Add an 'index' path component if current # path is not a leaf node. if item.startswith('.'): if self.url.endswith('/'): return self['index'][item] return self._with_file_ext(item[1:]) # Standard case of adding a path component. return self.url.add_path(item)
[docs] def _with_url(self, url): """Replace the URL on this API with another.""" return self._replace(url=URLObject(url))
@replaces_url
[docs] def _with_file_ext(self, file_ext): """ Add/replace a file extension on this URL. Example:: >>> API(u'http://example.com/foo')._with_file_ext('.json') <intessa.API(u'http://example.com/foo.json')> The leading period is optional:: >>> API(u'http://example.com/foo')._with_file_ext('json') <intessa.API(u'http://example.com/foo.json')> Existing file extensions will be replaced:: >>> API(u'http://example.com/foo.json')._with_file_ext('xml') <intessa.API(u'http://example.com/foo.xml')> This method only works on leaf nodes:: >>> API(u'http://example.com/foo/')._with_file_ext('json') Traceback (most recent call last): ... ValueError: Cannot add a file extension to directories. """ if not self.url.is_leaf: raise ValueError("Cannot add a file extension to directories.") # 'https://example.com/foo/bar/baz.xyz' => 'baz.xyz' leaf_node = self.url.path.segments[-1] # 'baz.xyz' => 'baz' base = leaf_node.rsplit('.', 1)[0] if not file_ext.startswith('.'): file_ext = '.' + file_ext return self.url.parent.add_path_segment(base + file_ext)
[docs] def __call__(self, *args, **kwargs): """ Perform an HTTP request and return a :class:`~intessa.response.Response`. :param method: The HTTP method to use (defaults to ``GET``). :param data: A Python object representing the body of the request. There are several valid values this parameter can take: * A bytestring, with a ``Content-Type`` header (given in ``headers``). * A :class:`~intessa.conneg.streaming.StreamingBody` (or any file-like object which supports ``__len__()``). * A file-like object, with ``Content-Type`` and ``Content-Length`` headers. * Any other Python object, with the ``type`` parameter provided. In the first two instances, it will be sent as-is. In the second, it will first be encoded to a bytestring, using the codec for the specified type. :param type: The Internet media type with which the ``data`` argument should be encoded. This will be used as a lookup type against the ``codec_register``. :param codec_register: A :class:`~intessa.conneg.codec_base.CodecRegister` to use for encoding the request body and decoding the response. Defaults to :data:`~intessa.conneg.default.DEFAULT_REGISTER`. :param accept: A specification of acceptable response types, which will be turned into an HTTP ``Accept`` header. This argument can be a string representing a media type (see: :class:`~intessa.conneg.content_type.ContentType`), a list of such strings, or a list of ``(media_type, quality)`` pairs, where ``quality`` is a floating-point number. See the documentation for :func:`~intessa.conneg.accept.make_accept_header` for examples of how this argument is turned into an ``Accept`` header. :param headers: A dictionary of raw HTTP headers to send. Note that if a non-successful response is returned, an instance of :class:`intessa.Error` will be *raised*. """ if 'params' in kwargs: self = self.add_query_params(kwargs.pop('params')) method = kwargs.pop('method', 'GET') headers = CaselessDictionary(kwargs.pop('headers', {})) codec_register = kwargs.pop('codec_register', DEFAULT_REGISTER) if 'data' in kwargs: if method == 'GET': method = 'POST' data = kwargs.pop('data') type = kwargs.pop('type', None) headers, kwargs['data'] = encode_request( headers, data, type, codec_register=codec_register) accept = kwargs.pop('accept', None) if accept is not None: headers['Accept'] = make_accept_header(accept) kwargs['headers'] = headers return self.http(self, method, self.url, *args, **kwargs)
[docs] def _post(self, *args, **kwargs): """Shim for issuing POST requests.""" kwargs.setdefault('method', 'POST') return self(*args, **kwargs)
[docs] def _put(self, *args, **kwargs): """Shim for issuing PUT requests.""" kwargs.setdefault('method', 'PUT') return self(*args, **kwargs)
[docs] def _delete(self, *args, **kwargs): """Shim for issuing DELETE requests.""" kwargs.setdefault('method', 'DELETE') return self(*args, **kwargs)
def encode_request(headers, data, type=None, codec_register=DEFAULT_REGISTER): """ Encode an object and a type into a bytestring/stream and HTTP headers. :param headers: The HTTP headers for the request. A new dictionary of headers will be returned which will contain a ``Content-Type`` and ``Content-Length``. :param data: A Python object representing the body of the request. There are several valid values this parameter can take: * A bytestring (with a ``Content-Type`` header given in ``headers``). * A :class:`~intessa.conneg.streaming.StreamingBody` (or any file-like object which supports ``__len__()``). * A file-like object (with ``Content-Type`` and ``Content-Length`` headers). * Any other Python object, with the ``type`` parameter provided. In the first two instances, it will be sent as-is. In the second, it will first be encoded to a bytestring, using the codec for the specified type. :param type: The Internet media type with which ``data`` should be encoded. This will be used as a lookup type against the ``codec_register``. If this argument is omitted, there *must* be a specified ``Content-Type`` in ``headers``, otherwise a ``TypeError`` will be raised. :param codec_register: The codec register to use for encoding the request. Defaults to :data:`~intessa.conneg.default.DEFAULT_REGISTER`. """ headers = CaselessDictionary(headers) if 'content-type' not in headers: if type is not None: headers['content-type'], data = codec_register.encode(type, data) else: raise TypeError( "Request data provided with no content type information") if 'content-length' not in headers: if hasattr(data, 'read'): try: headers['content-length'] = str(len(data)) except Exception, exc: raise TypeError("Streaming request provided with no length") else: headers['content-length'] = str(len(data)) return headers, data