Source code for mongoengine_goodjson.queryset

#!/usr/bin/env python
# coding=utf-8

"""Queryset encoder."""

import json

try:
    from functools import singledispatch
except ImportError:
    from singledispatch import singledispatch

import bson
import mongoengine as db

from .encoder import GoodJSONEncoder
from .decoder import generate_object_hook


[docs]class QuerySet(db.QuerySet): """QuerySet that supports human-readable json.""" def __start_good_json(self): """Start good json flag.""" setattr(self, "$$good_json$$", True) def __end_good_json(self): """End good json flag.""" setattr(self, "$$good_json$$", None) delattr(self, "$$good_json$$") def __get_doc(self, fld, item): """Get document as dict or a list of documents.""" from mongoengine_goodjson.fields import FollowReferenceField @singledispatch def doc(fld, item): return item @doc.register(db.ListField) def doc_list(fld, item): return [self.__get_doc(fld.field, el) for el in item] @doc.register(FollowReferenceField) def doc_frl(fld, item): from .document import Document doc = fld.document_type.objects(id=item).get() # doc.begin_goodjson() result = \ doc.to_mongo(current_depth=0) \ if isinstance(fld.document_type, Document) \ else doc.to_mongo() # doc.end_goodjson() return result result = doc(fld, item) if isinstance(result, dict) and "id" not in result and "_id" in result: result["id"] = result.pop("_id") return result
[docs] def as_pymongo(self, *args, **kwargs): """Return pymongo encoded dict.""" lst = super(QuerySet, self).as_pymongo()() if getattr(self, "$$good_json$$", None): for item in lst: for (name, fld) in self._document._fields.items(): if name in item: item[name] = self.__get_doc(fld, item[name]) item["id"] = item.pop("_id") return lst
def __cut_excluded_field(self, doc_list, exclude_attrs): lst = [item for item in doc_list] # Using for loop twice is not good in the case that there's # a lot of data, and to reduce for loop, picking out the field of # which exclude fields are truhty is the idea (If you know more # suitable idea, make a PR.). exclude = [ name for (name, fld) in self._document._fields.items() if any([ getattr(fld, "exclude_json", None) ] + [getattr(fld, item, None) for item in exclude_attrs]) ] for dct in lst: for exc in exclude: dct.pop(exc, None) return lst
[docs] def to_json(self, *args, **kwargs): """Convert to JSON.""" from .document import Document if issubclass(self._document, Document): kwargs.setdefault("cls", GoodJSONEncoder) self.__start_good_json() lst = self.as_pymongo() self.__end_good_json() lst = self.__cut_excluded_field(lst, ["exclude_to_json"]) return json.dumps(lst, *args, **kwargs) return super(QuerySet, self).to_json()
[docs] def from_json(self, json_data): """Convert from JSON.""" mongo_data = self.__cut_excluded_field(json.loads( json_data, object_hook=generate_object_hook(self._document) ), ["exclude_from_json"]) return [ self._document._from_son(bson.SON(data)) for data in mongo_data ]