#!/usr/bin/env python
# coding=utf-8
"""Documents implementing human-readable JSON serializer."""
import json
import types
import mongoengine as db
from bson import SON, DBRef, ObjectId
from .encoder import GoodJSONEncoder
from .decoder import generate_object_hook
from .queryset import QuerySet
from .utils import singledispatch, normalize_reference, id_first
[docs]class Helper(object):
"""Helper class to serialize / deserialize JSON document."""
def __get_doc(self, fld, fldname, value):
"""Get the target document."""
# Honestly, I don't feel this implementation is good.
# However, as #892 at MongoEngine
# (https://github.com/MongoEngine/mongoengine/issues/892)
# shows, to keep follow reference, this is only the way...
#
# (Of course, PR is appreciated.)
try:
doc = getattr(self, fldname, None)
except db.DoesNotExist:
doc = self._fields[fldname].document_type.objects(id=value).get()
return fld.document_type.objects(id=doc.id).get() \
if isinstance(doc, DBRef) else doc
def __follow_reference_list(self, fld, fldname, *args, **kwargs):
"""Follow reference with list."""
value = []
for doc in getattr(self, fldname, []):
tdoc = fld.document_type.objects(
id=doc.id
).get() if isinstance(doc, DBRef) else doc
dct = self.__serialize_doc_to_dict(fld, tdoc, *args, **kwargs)
value.append(id_first(dct))
return value
def __follow_reference_dict(self, fld, fldname, *args, **kwargs):
"""Follow reference with dict."""
value = {}
for (key, doc) in getattr(self, fldname).items():
tdoc = fld.document_type.objects(
id=doc.id
).get() if isinstance(doc, DBRef) else doc
dct = self.__serialize_doc_to_dict(fld, tdoc, *args, **kwargs)
value[key] = id_first(dct)
return value
def __serialize_doc_to_dict(self, fld, doc, *args, **kwargs):
"""Serialize the document into dict."""
dct = json.loads(doc.to_json(
*args, **kwargs
)) if issubclass(
fld.document_type, Helper
) else doc.to_mongo()
if "_id" in dct:
dct["id"] = dct.pop("_id")
return dct
def _follow_reference(self, max_depth, current_depth,
use_db_field, data, *args, **kwargs):
from .fields import FollowReferenceField
ret = {}
for fldname in self:
fld = self._fields.get(fldname)
is_list = isinstance(fld, db.ListField)
is_dict = isinstance(fld, db.DictField)
target = fld.field if is_list or is_dict else fld
if all([
isinstance(
target, (db.ReferenceField, db.EmbeddedDocumentField)
), not isinstance(target, FollowReferenceField)
]):
value = None
ckwargs = kwargs.copy()
if issubclass(target.document_type, Helper):
ckwargs.update({
"follow_reference": True,
"max_depth": max_depth,
"current_depth": current_depth + 1,
"use_db_field": use_db_field
})
if is_list:
value = self.__follow_reference_list(
target, fldname, *args, **ckwargs
)
elif is_dict:
value = self.__follow_reference_dict(
target, fldname, *args, **ckwargs
)
else:
tdoc = self.__get_doc(target, fldname, data.get(fldname))
if tdoc:
value = self.__serialize_doc_to_dict(
target, tdoc, *args, **ckwargs
)
if value is not None:
ret[fldname] = value
# ret.update({fldname: value})
return id_first(ret)
def __apply_element(
self, name, fld, cur_depth, func, flagfunc_attr=None
):
"""Apply field flag by calling parameter func."""
from mongoengine_goodjson.fields import FollowReferenceField
@singledispatch
def recursive_apply_flag(fld):
func(fld, cur_depth)
@recursive_apply_flag.register(db.ListField)
def set_flag_list(fld):
if fld.field:
recursive_apply_flag(fld.field)
@recursive_apply_flag.register(db.EmbeddedDocumentField)
def set_flag_emb(fld):
if issubclass(fld.document_type_obj, Helper):
obj = getattr(self, name)
if isinstance(obj, list):
for item in obj:
getattr(item, flagfunc_attr)()
elif obj:
getattr(obj, flagfunc_attr)(cur_depth)
@recursive_apply_flag.register(FollowReferenceField)
def set_flag_self(fld):
func(fld, cur_depth)
recursive_apply_flag(fld)
def __set_gj_flag_sub_field(self, name, fld, cur_depth):
"""Tell current depth to subfield."""
def set_good_json(traget, depth_lv):
setattr(traget, "$$cur_depth$$", depth_lv)
self.__apply_element(
name, fld, cur_depth, set_good_json, "begin_goodjson"
)
def __unset_gj_flag_sub_field(self, name, fld, cur_depth):
"""Remove current depth to subfield."""
def unset_flag(fld, depth_lv):
setattr(fld, "$$cur_depth$$", depth_lv - 1)
cur_depth_attr = getattr(fld, "$$cur_depth$$")
if (not isinstance(cur_depth_attr, int)) or cur_depth_attr < 0:
delattr(fld, "$$cur_depth$$")
self.__apply_element(
name, fld, cur_depth, unset_flag, "end_goodjson"
)
[docs] def begin_goodjson(self, cur_depth=0):
"""Enable GoodJSON Flag."""
for (name, fld) in self._fields.items():
self.__set_gj_flag_sub_field(name, fld, cur_depth=cur_depth)
[docs] def end_goodjson(self, cur_depth=0):
"""Stop GoodJSON Flag."""
for (name, fld) in self._fields.items():
self.__unset_gj_flag_sub_field(name, fld, cur_depth=cur_depth)
[docs] def to_mongo(self, *args, **kwargs):
"""Convert into mongodb compatible dict."""
cur_depth = kwargs.pop("cur_depth", None) or 0
good_json = bool(kwargs.pop("good_json", False))
if good_json:
self.begin_goodjson(cur_depth)
result = super(Helper, self).to_mongo(*args, **kwargs)
if good_json:
self.end_goodjson(cur_depth)
return result
def __to_json_drop_excluded_data(self, data, flds=None):
"""
Cosider exclude_to_json and exclude_json flag.
Arguments:
data: The return value of to_mongo from the top-level document.
flds: The fields of child document. In usual use, this parameter
should be None, because this is used internally.
"""
ret = data.copy()
for name, fld in (flds or self._fields).items():
if any([
getattr(fld, "exclude_to_json", None),
getattr(fld, "exclude_json", None)
]):
ret.pop(name, None)
elif isinstance(ret.get(name), list):
if isinstance(fld.field, db.EmbeddedDocumentField) and \
issubclass(fld.field.document_type, Helper):
ret[name] = [
self.__to_json_drop_excluded_data(
item, fld.field.document_type._fields
) for item in ret[name]
]
elif isinstance(ret.get(name), dict) and \
hasattr(fld, "document_type"):
ret[name] = self.__to_json_drop_excluded_data(
ret[name], fld.document_type._fields
)
return ret
[docs] def to_json(self, *args, **kwargs):
"""
Encode to human-readable json.
Parameters:
use_db_field: use_db_field that is passed to to_mongo.
follow_reference: set True to follow reference field.
max_depth: maximum recursion depth. If this value is set to None,
the reference is followed until it is end. Setting 0 is the
same meaning of follow_reference=0.
By default, the value is 3.
current_depth: This is used internally to identify current
recursion depth. Therefore, you should leave this value as-is.
By default, the value is 0.
*args, **kwargs: Any arguments, and keyword arguments to
tell json.dumps.
"""
use_db_field = kwargs.pop('use_db_field', True)
follow_reference = kwargs.pop("follow_reference", False)
max_depth = kwargs.pop("max_depth", 3)
current_depth = kwargs.pop("current_depth", 0)
kwargs.setdefault("cls", GoodJSONEncoder)
data = self.to_mongo(
use_db_field, cur_depth=current_depth, good_json=True
)
if "_id" in data:
data.setdefault("id", data.pop("_id", None))
if follow_reference:
max_depth_value = None
try:
max_depth_value = max_depth(self, current_depth)
except TypeError:
max_depth_value = max_depth
max_depth_value = max_depth_value or 0
if not (0 < max_depth_value <= current_depth):
data.update(self._follow_reference(
max_depth, current_depth, use_db_field,
data, *args, **kwargs
))
data = id_first(self.__to_json_drop_excluded_data(data))
ret = json.dumps(data, *args, **kwargs)
return ret
[docs] @classmethod
def from_json(cls, json_str, created=False, *args, **kwargs):
# Proposition: add a private method like from_json that allows
# dictionaries to be used as inputs to avoid having to use
# loads(dumps(data)) all the time
"""
Decode from human-readable json.
Parameters:
json_str: JSON string that should be passed to the serialized
created: a parameter that is passed to cls._from_son.
*args, **kwargs: Any additional arguments that is passed to
json.loads.
"""
from .fields import FollowReferenceField
kwargs.setdefault("object_hook", generate_object_hook(cls))
dct = json.loads(json_str, *args, **kwargs)
for name, fld in cls._fields.items():
if any([
getattr(fld, "exclude_from_json", None),
getattr(fld, "exclude_json", None)
]):
dct.pop(name, None)
from_son_result = cls._from_son(SON(dct), created=created)
atLeastOneReference = False
for fldname, fld in cls._fields.items():
if isinstance(fld, db.ListField):
target = fld.field
if not isinstance(target, db.ReferenceField) or \
isinstance(target, FollowReferenceField):
continue
atLeastOneReference = True
values = dct.get(fldname)
setattr(
from_son_result, fldname, []
)
for value in values:
valueDoc = value.as_doc()
if 'id' not in valueDoc['$id']:
valueDoc['$id']['id'] = str(ObjectId())
getattr(from_son_result, fldname).append(
target.document_type_obj.from_json(
json.dumps(value.as_doc()['$id'])
)
)
elif isinstance(fld, db.DictField):
target = fld.field
if not isinstance(target, db.ReferenceField) or \
isinstance(target, FollowReferenceField):
continue
atLeastOneReference = True
values = dct.get(fldname)
setattr(
from_son_result, fldname, {}
)
for k, value in values.items():
valueDoc = value.as_doc()
if 'id' not in valueDoc['$id']:
valueDoc['$id']['id'] = str(ObjectId())
getattr(from_son_result, fldname)[k] = \
target.document_type_obj.from_json(
json.dumps(valueDoc['$id']))
else:
target = fld
if not isinstance(target, db.ReferenceField) or \
isinstance(target, FollowReferenceField):
continue
atLeastOneReference = True
value = dct.get(fldname)
try:
valueDoc = value.as_doc()
# If there is no ID in the JSON (aka the JSON was not saved
# from mongoengine but rather created manually), create an
# ObjectId on the fly
if 'id' not in valueDoc['$id']:
valueDoc['$id']['id'] = ObjectId()
valueDoc['$id']['id'] = str(valueDoc['$id']['id'])
setattr(
from_son_result,
fldname,
target.document_type_obj.from_json(
json.dumps(valueDoc['$id']))
)
except TypeError:
setattr(
from_son_result, fldname,
normalize_reference(
getattr(value, "id", value), target)
)
# All fields have been changed, because the document was loaded from a
# JSON file. However, mongoengine does not detect it automatically. In
# order for all fields to be saved, we set the _changed_fields variable
# manually
from_son_result._changed_fields = list(cls._fields.keys())
if atLeastOneReference:
# If the document contains at least one reference, override the
# save() method to save the referenced documents at the same time
# as the master document. Otherwise, the referenced documents would
# not be saved and the document would not be valid anymore after a
# save and load from the database.
def save(self, *args, **kwargs):
for fldname, fld in cls._fields.items():
if isinstance(fld, (db.ReferenceField,
FollowReferenceField)):
getattr(self, fldname).save(*args, **kwargs)
elif isinstance(fld, db.fields.ComplexBaseField):
isReferences = isinstance(
fld.field, (db.ReferenceField,
FollowReferenceField))
if isinstance(fld, db.DictField) and isReferences:
field = getattr(self, fldname)
for key, value in field.items():
field[key].save(*args, **kwargs)
elif isinstance(fld, db.ListField) and isReferences:
field = getattr(self, fldname)
for valueIndex in range(len(field)):
field[valueIndex].save(*args, **kwargs)
super(self.__class__, self).save(*args, **kwargs)
from_son_result.save = types.MethodType(save, from_son_result)
return from_son_result
[docs]class Document(Helper, db.Document):
"""Document implementing human-readable JSON serializer."""
meta = {
"abstract": True,
"queryset_class": QuerySet
}
[docs]class EmbeddedDocument(Helper, db.EmbeddedDocument):
"""EmbeddedDocument implementing human-readable JSON serializer."""
meta = {
"abstract": True,
"queryset_class": QuerySet
}