Arthur de Jong

Open Source / Free Software developer

summaryrefslogtreecommitdiffstats
path: root/tests/serializers/tests.py
blob: af275a856861c14cd089a0b3121307054978a7b0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
# -*- coding: utf-8 -*-
from __future__ import unicode_literals

from datetime import datetime

from django.core import serializers
from django.core.serializers import SerializerDoesNotExist
from django.core.serializers.base import ProgressBar
from django.db import connection, transaction
from django.http import HttpResponse
from django.test import (
    SimpleTestCase, mock, override_settings, skipUnlessDBFeature,
)
from django.test.utils import Approximate
from django.utils.functional import curry
from django.utils.six import StringIO

from .models import (
    Actor, Article, Author, AuthorProfile, BaseModel, Category, ComplexModel,
    Movie, Player, ProxyBaseModel, ProxyProxyBaseModel, Score, Team,
)


@override_settings(
    SERIALIZATION_MODULES={
        "json2": "django.core.serializers.json",
    }
)
class SerializerRegistrationTests(SimpleTestCase):
    def setUp(self):
        self.old_serializers = serializers._serializers
        serializers._serializers = {}

    def tearDown(self):
        serializers._serializers = self.old_serializers

    def test_register(self):
        "Registering a new serializer populates the full registry. Refs #14823"
        serializers.register_serializer('json3', 'django.core.serializers.json')

        public_formats = serializers.get_public_serializer_formats()
        self.assertIn('json3', public_formats)
        self.assertIn('json2', public_formats)
        self.assertIn('xml', public_formats)

    def test_unregister(self):
        "Unregistering a serializer doesn't cause the registry to be repopulated. Refs #14823"
        serializers.unregister_serializer('xml')
        serializers.register_serializer('json3', 'django.core.serializers.json')

        public_formats = serializers.get_public_serializer_formats()

        self.assertNotIn('xml', public_formats)
        self.assertIn('json3', public_formats)

    def test_unregister_unknown_serializer(self):
        with self.assertRaises(SerializerDoesNotExist):
            serializers.unregister_serializer("nonsense")

    def test_builtin_serializers(self):
        "Requesting a list of serializer formats popuates the registry"
        all_formats = set(serializers.get_serializer_formats())
        public_formats = set(serializers.get_public_serializer_formats())

        self.assertIn('xml', all_formats),
        self.assertIn('xml', public_formats)

        self.assertIn('json2', all_formats)
        self.assertIn('json2', public_formats)

        self.assertIn('python', all_formats)
        self.assertNotIn('python', public_formats)

    def test_get_unknown_serializer(self):
        """
        #15889: get_serializer('nonsense') raises a SerializerDoesNotExist
        """
        with self.assertRaises(SerializerDoesNotExist):
            serializers.get_serializer("nonsense")

        with self.assertRaises(KeyError):
            serializers.get_serializer("nonsense")

        # SerializerDoesNotExist is instantiated with the nonexistent format
        with self.assertRaises(SerializerDoesNotExist) as cm:
            serializers.get_serializer("nonsense")
        self.assertEqual(cm.exception.args, ("nonsense",))

    def test_get_unknown_deserializer(self):
        with self.assertRaises(SerializerDoesNotExist):
            serializers.get_deserializer("nonsense")


class SerializersTestBase(object):
    serializer_name = None  # Set by subclasses to the serialization format name

    @staticmethod
    def _comparison_value(value):
        return value

    def setUp(self):
        sports = Category.objects.create(name="Sports")
        music = Category.objects.create(name="Music")
        op_ed = Category.objects.create(name="Op-Ed")

        self.joe = Author.objects.create(name="Joe")
        self.jane = Author.objects.create(name="Jane")

        self.a1 = Article(
            author=self.jane,
            headline="Poker has no place on ESPN",
            pub_date=datetime(2006, 6, 16, 11, 00)
        )
        self.a1.save()
        self.a1.categories.set([sports, op_ed])

        self.a2 = Article(
            author=self.joe,
            headline="Time to reform copyright",
            pub_date=datetime(2006, 6, 16, 13, 00, 11, 345)
        )
        self.a2.save()
        self.a2.categories.set([music, op_ed])

    def test_serialize(self):
        """Tests that basic serialization works."""
        serial_str = serializers.serialize(self.serializer_name,
                                           Article.objects.all())
        self.assertTrue(self._validate_output(serial_str))

    def test_serializer_roundtrip(self):
        """Tests that serialized content can be deserialized."""
        serial_str = serializers.serialize(self.serializer_name,
                                           Article.objects.all())
        models = list(serializers.deserialize(self.serializer_name, serial_str))
        self.assertEqual(len(models), 2)

    def test_serialize_to_stream(self):
        obj = ComplexModel(field1='first', field2='second', field3='third')
        obj.save_base(raw=True)

        # Serialize the test database to a stream
        for stream in (StringIO(), HttpResponse()):
            serializers.serialize(self.serializer_name, [obj], indent=2, stream=stream)

            # Serialize normally for a comparison
            string_data = serializers.serialize(self.serializer_name, [obj], indent=2)

            # Check that the two are the same
            if isinstance(stream, StringIO):
                self.assertEqual(string_data, stream.getvalue())
            else:
                self.assertEqual(string_data, stream.content.decode('utf-8'))

    def test_serialize_specific_fields(self):
        obj = ComplexModel(field1='first', field2='second', field3='third')
        obj.save_base(raw=True)

        # Serialize then deserialize the test database
        serialized_data = serializers.serialize(
            self.serializer_name, [obj], indent=2, fields=('field1', 'field3')
        )
        result = next(serializers.deserialize(self.serializer_name, serialized_data))

        # Check that the deserialized object contains data in only the serialized fields.
        self.assertEqual(result.object.field1, 'first')
        self.assertEqual(result.object.field2, '')
        self.assertEqual(result.object.field3, 'third')

    def test_altering_serialized_output(self):
        """
        Tests the ability to create new objects by
        modifying serialized content.
        """
        old_headline = "Poker has no place on ESPN"
        new_headline = "Poker has no place on television"
        serial_str = serializers.serialize(self.serializer_name,
                                           Article.objects.all())
        serial_str = serial_str.replace(old_headline, new_headline)
        models = list(serializers.deserialize(self.serializer_name, serial_str))

        # Prior to saving, old headline is in place
        self.assertTrue(Article.objects.filter(headline=old_headline))
        self.assertFalse(Article.objects.filter(headline=new_headline))

        for model in models:
            model.save()

        # After saving, new headline is in place
        self.assertTrue(Article.objects.filter(headline=new_headline))
        self.assertFalse(Article.objects.filter(headline=old_headline))

    def test_one_to_one_as_pk(self):
        """
        Tests that if you use your own primary key field
        (such as a OneToOneField), it doesn't appear in the
        serialized field list - it replaces the pk identifier.
        """
        profile = AuthorProfile(author=self.joe,
                                date_of_birth=datetime(1970, 1, 1))
        profile.save()
        serial_str = serializers.serialize(self.serializer_name,
                                           AuthorProfile.objects.all())
        self.assertFalse(self._get_field_values(serial_str, 'author'))

        for obj in serializers.deserialize(self.serializer_name, serial_str):
            self.assertEqual(obj.object.pk, self._comparison_value(self.joe.pk))

    def test_serialize_field_subset(self):
        """Tests that output can be restricted to a subset of fields"""
        valid_fields = ('headline', 'pub_date')
        invalid_fields = ("author", "categories")
        serial_str = serializers.serialize(self.serializer_name,
                                    Article.objects.all(),
                                    fields=valid_fields)
        for field_name in invalid_fields:
            self.assertFalse(self._get_field_values(serial_str, field_name))

        for field_name in valid_fields:
            self.assertTrue(self._get_field_values(serial_str, field_name))

    def test_serialize_unicode(self):
        """Tests that unicode makes the roundtrip intact"""
        actor_name = "Za\u017c\u00f3\u0142\u0107"
        movie_title = 'G\u0119\u015bl\u0105 ja\u017a\u0144'
        ac = Actor(name=actor_name)
        mv = Movie(title=movie_title, actor=ac)
        ac.save()
        mv.save()

        serial_str = serializers.serialize(self.serializer_name, [mv])
        self.assertEqual(self._get_field_values(serial_str, "title")[0], movie_title)
        self.assertEqual(self._get_field_values(serial_str, "actor")[0], actor_name)

        obj_list = list(serializers.deserialize(self.serializer_name, serial_str))
        mv_obj = obj_list[0].object
        self.assertEqual(mv_obj.title, movie_title)

    def test_serialize_progressbar(self):
        fake_stdout = StringIO()
        serializers.serialize(
            self.serializer_name, Article.objects.all(),
            progress_output=fake_stdout, object_count=Article.objects.count()
        )
        self.assertTrue(
            fake_stdout.getvalue().endswith('[' + '.' * ProgressBar.progress_width + ']\n')
        )

    def test_serialize_superfluous_queries(self):
        """Ensure no superfluous queries are made when serializing ForeignKeys

        #17602
        """
        ac = Actor(name='Actor name')
        ac.save()
        mv = Movie(title='Movie title', actor_id=ac.pk)
        mv.save()

        with self.assertNumQueries(0):
            serializers.serialize(self.serializer_name, [mv])

    def test_serialize_with_null_pk(self):
        """
        Tests that serialized data with no primary key results
        in a model instance with no id
        """
        category = Category(name="Reference")
        serial_str = serializers.serialize(self.serializer_name, [category])
        pk_value = self._get_pk_values(serial_str)[0]
        self.assertFalse(pk_value)

        cat_obj = list(serializers.deserialize(self.serializer_name,
                                               serial_str))[0].object
        self.assertEqual(cat_obj.id, None)

    def test_float_serialization(self):
        """Tests that float values serialize and deserialize intact"""
        sc = Score(score=3.4)
        sc.save()
        serial_str = serializers.serialize(self.serializer_name, [sc])
        deserial_objs = list(serializers.deserialize(self.serializer_name,
                                                serial_str))
        self.assertEqual(deserial_objs[0].object.score, Approximate(3.4, places=1))

    def test_deferred_field_serialization(self):
        author = Author.objects.create(name='Victor Hugo')
        author = Author.objects.defer('name').get(pk=author.pk)
        serial_str = serializers.serialize(self.serializer_name, [author])
        deserial_objs = list(serializers.deserialize(self.serializer_name, serial_str))
        # Check the class instead of using isinstance() because model instances
        # with deferred fields (e.g. Author_Deferred_name) will pass isinstance.
        self.assertEqual(deserial_objs[0].object.__class__, Author)

    def test_custom_field_serialization(self):
        """Tests that custom fields serialize and deserialize intact"""
        team_str = "Spartak Moskva"
        player = Player()
        player.name = "Soslan Djanaev"
        player.rank = 1
        player.team = Team(team_str)
        player.save()
        serial_str = serializers.serialize(self.serializer_name,
                                           Player.objects.all())
        team = self._get_field_values(serial_str, "team")
        self.assertTrue(team)
        self.assertEqual(team[0], team_str)

        deserial_objs = list(serializers.deserialize(self.serializer_name, serial_str))
        self.assertEqual(deserial_objs[0].object.team.to_string(),
                         player.team.to_string())

    def test_pre_1000ad_date(self):
        """Tests that year values before 1000AD are properly formatted"""
        # Regression for #12524 -- dates before 1000AD get prefixed
        # 0's on the year
        a = Article.objects.create(
            author=self.jane,
            headline="Nobody remembers the early years",
            pub_date=datetime(1, 2, 3, 4, 5, 6))

        serial_str = serializers.serialize(self.serializer_name, [a])
        date_values = self._get_field_values(serial_str, "pub_date")
        self.assertEqual(date_values[0].replace('T', ' '), "0001-02-03 04:05:06")

    def test_pkless_serialized_strings(self):
        """
        Tests that serialized strings without PKs
        can be turned into models
        """
        deserial_objs = list(serializers.deserialize(self.serializer_name,
                                                     self.pkless_str))
        for obj in deserial_objs:
            self.assertFalse(obj.object.id)
            obj.save()
        self.assertEqual(Category.objects.all().count(), 5)

    def test_deterministic_mapping_ordering(self):
        """Mapping such as fields should be deterministically ordered. (#24558)"""
        output = serializers.serialize(self.serializer_name, [self.a1], indent=2)
        categories = self.a1.categories.values_list('pk', flat=True)
        self.assertEqual(output, self.mapping_ordering_str % {
            'article_pk': self.a1.pk,
            'author_pk': self.a1.author_id,
            'first_category_pk': categories[0],
            'second_category_pk': categories[1],
        })

    def test_deserialize_force_insert(self):
        """Tests that deserialized content can be saved with force_insert as a parameter."""
        serial_str = serializers.serialize(self.serializer_name, [self.a1])
        deserial_obj = list(serializers.deserialize(self.serializer_name, serial_str))[0]
        with mock.patch('django.db.models.Model') as mock_model:
            deserial_obj.save(force_insert=False)
            mock_model.save_base.assert_called_with(deserial_obj.object, raw=True, using=None, force_insert=False)

    @skipUnlessDBFeature('can_defer_constraint_checks')
    def test_serialize_proxy_model(self):
        BaseModel.objects.create(parent_data=1)
        base_objects = BaseModel.objects.all()
        proxy_objects = ProxyBaseModel.objects.all()
        proxy_proxy_objects = ProxyProxyBaseModel.objects.all()
        base_data = serializers.serialize("json", base_objects)
        proxy_data = serializers.serialize("json", proxy_objects)
        proxy_proxy_data = serializers.serialize("json", proxy_proxy_objects)
        self.assertEqual(base_data, proxy_data.replace('proxy', ''))
        self.assertEqual(base_data, proxy_proxy_data.replace('proxy', ''))


class SerializersTransactionTestBase(object):

    available_apps = ['serializers']

    @skipUnlessDBFeature('supports_forward_references')
    def test_forward_refs(self):
        """
        Tests that objects ids can be referenced before they are
        defined in the serialization data.
        """
        # The deserialization process needs to run in a transaction in order
        # to test forward reference handling.
        with transaction.atomic():
            objs = serializers.deserialize(self.serializer_name, self.fwd_ref_str)
            with connection.constraint_checks_disabled():
                for obj in objs:
                    obj.save()

        for model_cls in (Category, Author, Article):
            self.assertEqual(model_cls.objects.all().count(), 1)
        art_obj = Article.objects.all()[0]
        self.assertEqual(art_obj.categories.all().count(), 1)
        self.assertEqual(art_obj.author.name, "Agnes")


def register_tests(test_class, method_name, test_func, exclude=None):
    """
    Dynamically create serializer tests to ensure that all registered
    serializers are automatically tested.
    """
    formats = [
        f for f in serializers.get_serializer_formats()
        if (not isinstance(serializers.get_serializer(f), serializers.BadSerializer)
            and not f == 'geojson'
            and (exclude is None or f not in exclude))
    ]
    for format_ in formats:
        setattr(test_class, method_name % format_, curry(test_func, format_))