# -*- coding: utf-8 -*- from __future__ import unicode_literals from datetime import datetime from django.core import serializers from django.core.serializers import SerializerDoesNotExist from django.core.serializers.base import ProgressBar from django.db import connection, transaction from django.http import HttpResponse from django.test import ( SimpleTestCase, mock, override_settings, skipUnlessDBFeature, ) from django.test.utils import Approximate from django.utils.functional import curry from django.utils.six import StringIO from .models import ( Actor, Article, Author, AuthorProfile, BaseModel, Category, ComplexModel, Movie, Player, ProxyBaseModel, ProxyProxyBaseModel, Score, Team, ) @override_settings( SERIALIZATION_MODULES={ "json2": "django.core.serializers.json", } ) class SerializerRegistrationTests(SimpleTestCase): def setUp(self): self.old_serializers = serializers._serializers serializers._serializers = {} def tearDown(self): serializers._serializers = self.old_serializers def test_register(self): "Registering a new serializer populates the full registry. Refs #14823" serializers.register_serializer('json3', 'django.core.serializers.json') public_formats = serializers.get_public_serializer_formats() self.assertIn('json3', public_formats) self.assertIn('json2', public_formats) self.assertIn('xml', public_formats) def test_unregister(self): "Unregistering a serializer doesn't cause the registry to be repopulated. Refs #14823" serializers.unregister_serializer('xml') serializers.register_serializer('json3', 'django.core.serializers.json') public_formats = serializers.get_public_serializer_formats() self.assertNotIn('xml', public_formats) self.assertIn('json3', public_formats) def test_unregister_unknown_serializer(self): with self.assertRaises(SerializerDoesNotExist): serializers.unregister_serializer("nonsense") def test_builtin_serializers(self): "Requesting a list of serializer formats popuates the registry" all_formats = set(serializers.get_serializer_formats()) public_formats = set(serializers.get_public_serializer_formats()) self.assertIn('xml', all_formats), self.assertIn('xml', public_formats) self.assertIn('json2', all_formats) self.assertIn('json2', public_formats) self.assertIn('python', all_formats) self.assertNotIn('python', public_formats) def test_get_unknown_serializer(self): """ #15889: get_serializer('nonsense') raises a SerializerDoesNotExist """ with self.assertRaises(SerializerDoesNotExist): serializers.get_serializer("nonsense") with self.assertRaises(KeyError): serializers.get_serializer("nonsense") # SerializerDoesNotExist is instantiated with the nonexistent format with self.assertRaises(SerializerDoesNotExist) as cm: serializers.get_serializer("nonsense") self.assertEqual(cm.exception.args, ("nonsense",)) def test_get_unknown_deserializer(self): with self.assertRaises(SerializerDoesNotExist): serializers.get_deserializer("nonsense") class SerializersTestBase(object): serializer_name = None # Set by subclasses to the serialization format name @staticmethod def _comparison_value(value): return value def setUp(self): sports = Category.objects.create(name="Sports") music = Category.objects.create(name="Music") op_ed = Category.objects.create(name="Op-Ed") self.joe = Author.objects.create(name="Joe") self.jane = Author.objects.create(name="Jane") self.a1 = Article( author=self.jane, headline="Poker has no place on ESPN", pub_date=datetime(2006, 6, 16, 11, 00) ) self.a1.save() self.a1.categories.set([sports, op_ed]) self.a2 = Article( author=self.joe, headline="Time to reform copyright", pub_date=datetime(2006, 6, 16, 13, 00, 11, 345) ) self.a2.save() self.a2.categories.set([music, op_ed]) def test_serialize(self): """Tests that basic serialization works.""" serial_str = serializers.serialize(self.serializer_name, Article.objects.all()) self.assertTrue(self._validate_output(serial_str)) def test_serializer_roundtrip(self): """Tests that serialized content can be deserialized.""" serial_str = serializers.serialize(self.serializer_name, Article.objects.all()) models = list(serializers.deserialize(self.serializer_name, serial_str)) self.assertEqual(len(models), 2) def test_serialize_to_stream(self): obj = ComplexModel(field1='first', field2='second', field3='third') obj.save_base(raw=True) # Serialize the test database to a stream for stream in (StringIO(), HttpResponse()): serializers.serialize(self.serializer_name, [obj], indent=2, stream=stream) # Serialize normally for a comparison string_data = serializers.serialize(self.serializer_name, [obj], indent=2) # Check that the two are the same if isinstance(stream, StringIO): self.assertEqual(string_data, stream.getvalue()) else: self.assertEqual(string_data, stream.content.decode('utf-8')) def test_serialize_specific_fields(self): obj = ComplexModel(field1='first', field2='second', field3='third') obj.save_base(raw=True) # Serialize then deserialize the test database serialized_data = serializers.serialize( self.serializer_name, [obj], indent=2, fields=('field1', 'field3') ) result = next(serializers.deserialize(self.serializer_name, serialized_data)) # Check that the deserialized object contains data in only the serialized fields. self.assertEqual(result.object.field1, 'first') self.assertEqual(result.object.field2, '') self.assertEqual(result.object.field3, 'third') def test_altering_serialized_output(self): """ Tests the ability to create new objects by modifying serialized content. """ old_headline = "Poker has no place on ESPN" new_headline = "Poker has no place on television" serial_str = serializers.serialize(self.serializer_name, Article.objects.all()) serial_str = serial_str.replace(old_headline, new_headline) models = list(serializers.deserialize(self.serializer_name, serial_str)) # Prior to saving, old headline is in place self.assertTrue(Article.objects.filter(headline=old_headline)) self.assertFalse(Article.objects.filter(headline=new_headline)) for model in models: model.save() # After saving, new headline is in place self.assertTrue(Article.objects.filter(headline=new_headline)) self.assertFalse(Article.objects.filter(headline=old_headline)) def test_one_to_one_as_pk(self): """ Tests that if you use your own primary key field (such as a OneToOneField), it doesn't appear in the serialized field list - it replaces the pk identifier. """ profile = AuthorProfile(author=self.joe, date_of_birth=datetime(1970, 1, 1)) profile.save() serial_str = serializers.serialize(self.serializer_name, AuthorProfile.objects.all()) self.assertFalse(self._get_field_values(serial_str, 'author')) for obj in serializers.deserialize(self.serializer_name, serial_str): self.assertEqual(obj.object.pk, self._comparison_value(self.joe.pk)) def test_serialize_field_subset(self): """Tests that output can be restricted to a subset of fields""" valid_fields = ('headline', 'pub_date') invalid_fields = ("author", "categories") serial_str = serializers.serialize(self.serializer_name, Article.objects.all(), fields=valid_fields) for field_name in invalid_fields: self.assertFalse(self._get_field_values(serial_str, field_name)) for field_name in valid_fields: self.assertTrue(self._get_field_values(serial_str, field_name)) def test_serialize_unicode(self): """Tests that unicode makes the roundtrip intact""" actor_name = "Za\u017c\u00f3\u0142\u0107" movie_title = 'G\u0119\u015bl\u0105 ja\u017a\u0144' ac = Actor(name=actor_name) mv = Movie(title=movie_title, actor=ac) ac.save() mv.save() serial_str = serializers.serialize(self.serializer_name, [mv]) self.assertEqual(self._get_field_values(serial_str, "title")[0], movie_title) self.assertEqual(self._get_field_values(serial_str, "actor")[0], actor_name) obj_list = list(serializers.deserialize(self.serializer_name, serial_str)) mv_obj = obj_list[0].object self.assertEqual(mv_obj.title, movie_title) def test_serialize_progressbar(self): fake_stdout = StringIO() serializers.serialize( self.serializer_name, Article.objects.all(), progress_output=fake_stdout, object_count=Article.objects.count() ) self.assertTrue( fake_stdout.getvalue().endswith('[' + '.' * ProgressBar.progress_width + ']\n') ) def test_serialize_superfluous_queries(self): """Ensure no superfluous queries are made when serializing ForeignKeys #17602 """ ac = Actor(name='Actor name') ac.save() mv = Movie(title='Movie title', actor_id=ac.pk) mv.save() with self.assertNumQueries(0): serializers.serialize(self.serializer_name, [mv]) def test_serialize_with_null_pk(self): """ Tests that serialized data with no primary key results in a model instance with no id """ category = Category(name="Reference") serial_str = serializers.serialize(self.serializer_name, [category]) pk_value = self._get_pk_values(serial_str)[0] self.assertFalse(pk_value) cat_obj = list(serializers.deserialize(self.serializer_name, serial_str))[0].object self.assertEqual(cat_obj.id, None) def test_float_serialization(self): """Tests that float values serialize and deserialize intact""" sc = Score(score=3.4) sc.save() serial_str = serializers.serialize(self.serializer_name, [sc]) deserial_objs = list(serializers.deserialize(self.serializer_name, serial_str)) self.assertEqual(deserial_objs[0].object.score, Approximate(3.4, places=1)) def test_deferred_field_serialization(self): author = Author.objects.create(name='Victor Hugo') author = Author.objects.defer('name').get(pk=author.pk) serial_str = serializers.serialize(self.serializer_name, [author]) deserial_objs = list(serializers.deserialize(self.serializer_name, serial_str)) # Check the class instead of using isinstance() because model instances # with deferred fields (e.g. Author_Deferred_name) will pass isinstance. self.assertEqual(deserial_objs[0].object.__class__, Author) def test_custom_field_serialization(self): """Tests that custom fields serialize and deserialize intact""" team_str = "Spartak Moskva" player = Player() player.name = "Soslan Djanaev" player.rank = 1 player.team = Team(team_str) player.save() serial_str = serializers.serialize(self.serializer_name, Player.objects.all()) team = self._get_field_values(serial_str, "team") self.assertTrue(team) self.assertEqual(team[0], team_str) deserial_objs = list(serializers.deserialize(self.serializer_name, serial_str)) self.assertEqual(deserial_objs[0].object.team.to_string(), player.team.to_string()) def test_pre_1000ad_date(self): """Tests that year values before 1000AD are properly formatted""" # Regression for #12524 -- dates before 1000AD get prefixed # 0's on the year a = Article.objects.create( author=self.jane, headline="Nobody remembers the early years", pub_date=datetime(1, 2, 3, 4, 5, 6)) serial_str = serializers.serialize(self.serializer_name, [a]) date_values = self._get_field_values(serial_str, "pub_date") self.assertEqual(date_values[0].replace('T', ' '), "0001-02-03 04:05:06") def test_pkless_serialized_strings(self): """ Tests that serialized strings without PKs can be turned into models """ deserial_objs = list(serializers.deserialize(self.serializer_name, self.pkless_str)) for obj in deserial_objs: self.assertFalse(obj.object.id) obj.save() self.assertEqual(Category.objects.all().count(), 5) def test_deterministic_mapping_ordering(self): """Mapping such as fields should be deterministically ordered. (#24558)""" output = serializers.serialize(self.serializer_name, [self.a1], indent=2) categories = self.a1.categories.values_list('pk', flat=True) self.assertEqual(output, self.mapping_ordering_str % { 'article_pk': self.a1.pk, 'author_pk': self.a1.author_id, 'first_category_pk': categories[0], 'second_category_pk': categories[1], }) def test_deserialize_force_insert(self): """Tests that deserialized content can be saved with force_insert as a parameter.""" serial_str = serializers.serialize(self.serializer_name, [self.a1]) deserial_obj = list(serializers.deserialize(self.serializer_name, serial_str))[0] with mock.patch('django.db.models.Model') as mock_model: deserial_obj.save(force_insert=False) mock_model.save_base.assert_called_with(deserial_obj.object, raw=True, using=None, force_insert=False) @skipUnlessDBFeature('can_defer_constraint_checks') def test_serialize_proxy_model(self): BaseModel.objects.create(parent_data=1) base_objects = BaseModel.objects.all() proxy_objects = ProxyBaseModel.objects.all() proxy_proxy_objects = ProxyProxyBaseModel.objects.all() base_data = serializers.serialize("json", base_objects) proxy_data = serializers.serialize("json", proxy_objects) proxy_proxy_data = serializers.serialize("json", proxy_proxy_objects) self.assertEqual(base_data, proxy_data.replace('proxy', '')) self.assertEqual(base_data, proxy_proxy_data.replace('proxy', '')) class SerializersTransactionTestBase(object): available_apps = ['serializers'] @skipUnlessDBFeature('supports_forward_references') def test_forward_refs(self): """ Tests that objects ids can be referenced before they are defined in the serialization data. """ # The deserialization process needs to run in a transaction in order # to test forward reference handling. with transaction.atomic(): objs = serializers.deserialize(self.serializer_name, self.fwd_ref_str) with connection.constraint_checks_disabled(): for obj in objs: obj.save() for model_cls in (Category, Author, Article): self.assertEqual(model_cls.objects.all().count(), 1) art_obj = Article.objects.all()[0] self.assertEqual(art_obj.categories.all().count(), 1) self.assertEqual(art_obj.author.name, "Agnes") def register_tests(test_class, method_name, test_func, exclude=None): """ Dynamically create serializer tests to ensure that all registered serializers are automatically tested. """ formats = [ f for f in serializers.get_serializer_formats() if (not isinstance(serializers.get_serializer(f), serializers.BadSerializer) and not f == 'geojson' and (exclude is None or f not in exclude)) ] for format_ in formats: setattr(test_class, method_name % format_, curry(test_func, format_))