diff options
author | Arthur de Jong <arthur@arthurdejong.org> | 2016-09-16 20:04:45 +0200 |
---|---|---|
committer | Arthur de Jong <arthur@arthurdejong.org> | 2016-09-17 15:39:31 +0200 |
commit | 84bfb8a6ebd71be8119b03a9e88dde9554c5900d (patch) | |
tree | 438df9a54d6928f66f4176cf36c2922c89554b31 | |
parent | 426e821b1366095990304b04bbc41b3af384fe21 (diff) |
Move XML generation to own module
Similar to the change for parsing, move the XML serialisation of PSKC
data to a single class in a separate module.
-rw-r--r-- | pskc/__init__.py | 13 | ||||
-rw-r--r-- | pskc/encryption.py | 34 | ||||
-rw-r--r-- | pskc/key.py | 92 | ||||
-rw-r--r-- | pskc/mac.py | 22 | ||||
-rw-r--r-- | pskc/policy.py | 24 | ||||
-rw-r--r-- | pskc/serialiser.py | 214 |
6 files changed, 216 insertions, 183 deletions
diff --git a/pskc/__init__.py b/pskc/__init__.py index 71401cd..b9126a3 100644 --- a/pskc/__init__.py +++ b/pskc/__init__.py @@ -81,16 +81,6 @@ class PSKC(object): else: self.version = '1.0' - def make_xml(self): - from pskc.xml import mk_elem - container = mk_elem('pskc:KeyContainer', Version=self.version, - Id=self.id) - self.encryption.make_xml(container) - self.mac.make_xml(container) - for key in self.keys: - key.make_xml(container) - return container - def add_key(self, **kwargs): """Create a new key instance for the PSKC file. @@ -109,8 +99,9 @@ class PSKC(object): def write(self, filename): """Write the PSKC file to the provided file.""" from pskc.xml import tostring + from pskc.serialiser import PSKCSerialiser if hasattr(filename, 'write'): - xml = tostring(self.make_xml()) + xml = tostring(PSKCSerialiser.serialise_document(self)) try: filename.write(xml) except TypeError: # pragma: no cover (Python 3 specific) diff --git a/pskc/encryption.py b/pskc/encryption.py index 80eb1b4..a493e79 100644 --- a/pskc/encryption.py +++ b/pskc/encryption.py @@ -26,8 +26,6 @@ algorithms and decryption. The encryption key can be derived using the KeyDerivation class. """ -import base64 - def algorithm_key_lengths(algorithm): """Return the possible key lengths for the configured algorithm.""" @@ -144,24 +142,6 @@ class KeyDerivation(object): self.pbkdf2_key_length = None self.pbkdf2_prf = None - def make_xml(self, encryption_key, key_names): - from pskc.xml import mk_elem - derived_key = mk_elem(encryption_key, 'xenc11:DerivedKey', empty=True) - key_derivation = mk_elem(derived_key, 'xenc11:KeyDerivationMethod', - Algorithm=self.algorithm) - if self.algorithm.endswith('#pbkdf2'): - pbkdf2 = mk_elem(key_derivation, 'xenc11:PBKDF2-params', - empty=True) - if self.pbkdf2_salt: - salt = mk_elem(pbkdf2, 'Salt', empty=True) - mk_elem(salt, 'Specified', base64.b64encode(self.pbkdf2_salt)) - mk_elem(pbkdf2, 'IterationCount', self.pbkdf2_iterations) - mk_elem(pbkdf2, 'KeyLength', self.pbkdf2_key_length) - mk_elem(pbkdf2, 'PRF', self.pbkdf2_prf) - # TODO: serialise ReferenceList/DataReference - for name in key_names: - mk_elem(derived_key, 'xenc11:MasterKeyName', name) - def derive_pbkdf2(self, password): from Crypto.Protocol.KDF import PBKDF2 from pskc.mac import get_hmac @@ -239,20 +219,6 @@ class Encryption(object): self.derivation = KeyDerivation() self.fields = [] - def make_xml(self, container): - from pskc.xml import mk_elem - if all(x is None - for x in (self.id, self.key_name, self.key, - self.derivation.algorithm)): - return - encryption_key = mk_elem(container, 'pskc:EncryptionKey', - Id=self.id, empty=True) - if self.derivation.algorithm: - self.derivation.make_xml(encryption_key, self.key_names) - else: - for name in self.key_names: - mk_elem(encryption_key, 'ds:KeyName', name) - @property def key_name(self): """Provide the name of the (first) key.""" diff --git a/pskc/key.py b/pskc/key.py index 4678f32..fe84db2 100644 --- a/pskc/key.py +++ b/pskc/key.py @@ -62,44 +62,6 @@ class DataType(object): """Convert the value to an unencrypted string representation.""" raise NotImplementedError # pragma: no cover - def make_xml(self, key, tag, field): - from pskc.xml import find, mk_elem - # skip empty values - if self.value in (None, '') and not self.cipher_value: - return - # find the data tag and create our tag under it - data = find(key, 'pskc:Data') - if data is None: - data = mk_elem(key, 'pskc:Data', empty=True) - element = mk_elem(data, tag, empty=True) - # see if we should encrypt - if field in self.pskc.encryption.fields and not self.cipher_value: - self.cipher_value = self.pskc.encryption.encrypt_value( - self._to_bin(self.value)) - self.algorithm = self.pskc.encryption.algorithm - self.value = None - # write out value - if self.cipher_value: - encrypted_value = mk_elem( - element, 'pskc:EncryptedValue', empty=True) - mk_elem( - encrypted_value, 'xenc:EncryptionMethod', - Algorithm=self.algorithm) - cipher_data = mk_elem( - encrypted_value, 'xenc:CipherData', empty=True) - mk_elem( - cipher_data, 'xenc:CipherValue', - base64.b64encode(self.cipher_value).decode()) - if self.value_mac: - mk_elem(element, 'pskc:ValueMAC', base64.b64encode( - self.value_mac).decode()) - elif self.pskc.mac.algorithm: - mk_elem(element, 'pskc:ValueMAC', base64.b64encode( - self.pskc.mac.generate_mac(self.cipher_value) - ).decode()) - else: - mk_elem(element, 'pskc:PlainValue', self._to_text(self.value)) - def get_value(self): """Provide the attribute value, decrypting as needed.""" if self.value is not None: @@ -274,60 +236,6 @@ class Key(object): self.policy = Policy(self) - def make_xml(self, container): - from pskc.xml import mk_elem - - key_package = mk_elem(container, 'pskc:KeyPackage', empty=True) - - if any(x is not None - for x in (self.manufacturer, self.serial, self.model, - self.issue_no, self.device_binding, self.start_date, - self.expiry_date, self.device_userid)): - device_info = mk_elem(key_package, 'pskc:DeviceInfo', empty=True) - mk_elem(device_info, 'pskc:Manufacturer', self.manufacturer) - mk_elem(device_info, 'pskc:SerialNo', self.serial) - mk_elem(device_info, 'pskc:Model', self.model) - mk_elem(device_info, 'pskc:IssueNo', self.issue_no) - mk_elem(device_info, 'pskc:DeviceBinding', self.device_binding) - mk_elem(device_info, 'pskc:StartDate', self.start_date) - mk_elem(device_info, 'pskc:ExpiryDate', self.expiry_date) - mk_elem(device_info, 'pskc:UserId', self.device_userid) - - if self.crypto_module is not None: - crypto_module = mk_elem(key_package, 'pskc:CryptoModuleInfo', - empty=True) - mk_elem(crypto_module, 'pskc:Id', self.crypto_module) - - key = mk_elem(key_package, 'pskc:Key', empty=True, Id=self.id, - Algorithm=self.algorithm, ) - mk_elem(key, 'pskc:Issuer', self.issuer) - - if any((self.algorithm_suite, self.challenge_encoding, - self.response_encoding, self.response_length)): - parameters = mk_elem(key, 'pskc:AlgorithmParameters', empty=True) - mk_elem(parameters, 'pskc:Suite', self.algorithm_suite) - mk_elem(parameters, 'pskc:ChallengeFormat', - Encoding=self.challenge_encoding, - Min=self.challenge_min_length, - Max=self.challenge_max_length, - CheckDigits=self.challenge_check) - mk_elem(parameters, 'pskc:ResponseFormat', - Encoding=self.response_encoding, - Length=self.response_length, - CheckDigits=self.response_check) - - mk_elem(key, 'pskc:KeyProfileId', self.key_profile) - mk_elem(key, 'pskc:KeyReference', self.key_reference) - mk_elem(key, 'pskc:FriendlyName', self.friendly_name) - self._secret.make_xml(key, 'pskc:Secret', 'secret') - self._counter.make_xml(key, 'pskc:Counter', 'counter') - self._time_offset.make_xml(key, 'pskc:Time', 'time_offset') - self._time_interval.make_xml(key, 'pskc:TimeInterval', 'time_interval') - self._time_drift.make_xml(key, 'pskc:TimeDrift', 'time_drif') - mk_elem(key, 'pskc:UserId', self.key_userid) - - self.policy.make_xml(key) - secret = property( fget=lambda self: self._secret.get_value(), fset=lambda self, x: self._secret.set_value(x), diff --git a/pskc/mac.py b/pskc/mac.py index 4ad8aa6..c5ac1ec 100644 --- a/pskc/mac.py +++ b/pskc/mac.py @@ -29,7 +29,6 @@ with the PSKC encryption key. """ -import base64 import re @@ -83,27 +82,6 @@ class MAC(object): self.key_cipher_value = None self.key_algorithm = None - def make_xml(self, container): - from pskc.xml import mk_elem - if not self.algorithm and not self.key: - return - mac_method = mk_elem( - container, 'pskc:MACMethod', Algorithm=self.algorithm, empty=True) - mac_key = mk_elem(mac_method, 'pskc:MACKey', empty=True) - mk_elem( - mac_key, 'xenc:EncryptionMethod', - Algorithm=self.pskc.encryption.algorithm) - cipher_data = mk_elem(mac_key, 'xenc:CipherData', empty=True) - if self.key_cipher_value: - mk_elem( - cipher_data, 'xenc:CipherValue', - base64.b64encode(self.key_cipher_value).decode()) - elif self.key_plain_value: - mk_elem( - cipher_data, 'xenc:CipherValue', base64.b64encode( - self.pskc.encryption.encrypt_value(self.key_plain_value) - ).decode()) - @property def key(self): """Provides access to the MAC key binary value if available.""" diff --git a/pskc/policy.py b/pskc/policy.py index a955a16..3cfdad1 100644 --- a/pskc/policy.py +++ b/pskc/policy.py @@ -114,30 +114,6 @@ class Policy(object): self.pin_encoding = None self.unknown_policy_elements = False - def make_xml(self, key): - from pskc.xml import mk_elem - # check if any policy attribute is set - if not self.key_usage and all(x is None for x in ( - self.start_date, self.expiry_date, - self.number_of_transactions, self.pin_key_id, self.pin_usage, - self.pin_max_failed_attemtps, self.pin_min_length, - self.pin_max_length, self.pin_encoding)): - return - policy = mk_elem(key, 'pskc:Policy', empty=True) - mk_elem(policy, 'pskc:StartDate', self.start_date) - mk_elem(policy, 'pskc:ExpiryDate', self.expiry_date) - mk_elem(policy, 'pskc:PINPolicy', - PINKeyId=self.pin_key_id, - PINUsageMode=self.pin_usage, - MaxFailedAttempts=self.pin_max_failed_attemtps, - MinLength=self.pin_min_length, - MaxLength=self.pin_max_length, - PINEncoding=self.pin_encoding) - for usage in self.key_usage: - mk_elem(policy, 'pskc:KeyUsage', usage) - mk_elem(policy, 'pskc:NumberOfTransactions', - self.number_of_transactions) - def may_use(self, usage=None, now=None): """Check whether the key may be used for the provided purpose.""" import datetime diff --git a/pskc/serialiser.py b/pskc/serialiser.py new file mode 100644 index 0000000..f1bcdf9 --- /dev/null +++ b/pskc/serialiser.py @@ -0,0 +1,214 @@ +# serialiser.py - PSKC file parsing functions +# coding: utf-8 +# +# Copyright (C) 2016 Arthur de Jong +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA +# 02110-1301 USA + +"""Module for serialising PSKC files to XML.""" + + +import base64 + +from pskc.xml import find, mk_elem + + +class PSKCSerialiser(object): + + @classmethod + def serialise_document(cls, pskc): + container = mk_elem('pskc:KeyContainer', Version=pskc.version, + Id=pskc.id) + cls.serialise_encryption(pskc.encryption, container) + cls.serialise_mac(pskc.mac, container) + for key in pskc.keys: + cls.serialise_key(key, container) + return container + + @classmethod + def serialise_encryption(cls, encryption, container): + if all(x is None + for x in (encryption.id, encryption.key_name, encryption.key, + encryption.derivation.algorithm)): + return + encryption_key = mk_elem(container, 'pskc:EncryptionKey', + Id=encryption.id, empty=True) + if encryption.derivation.algorithm: + cls.serialise_key_derivation( + encryption.derivation, encryption_key, encryption.key_names) + else: + for name in encryption.key_names: + mk_elem(encryption_key, 'ds:KeyName', name) + + @classmethod + def serialise_key_derivation(cls, derivation, encryption_key, key_names): + derived_key = mk_elem(encryption_key, 'xenc11:DerivedKey', empty=True) + key_derivation = mk_elem(derived_key, 'xenc11:KeyDerivationMethod', + Algorithm=derivation.algorithm) + if derivation.algorithm.endswith('#pbkdf2'): + pbkdf2 = mk_elem(key_derivation, 'xenc11:PBKDF2-params', + empty=True) + if derivation.pbkdf2_salt: + salt = mk_elem(pbkdf2, 'Salt', empty=True) + mk_elem(salt, 'Specified', + base64.b64encode(derivation.pbkdf2_salt)) + mk_elem(pbkdf2, 'IterationCount', derivation.pbkdf2_iterations) + mk_elem(pbkdf2, 'KeyLength', derivation.pbkdf2_key_length) + mk_elem(pbkdf2, 'PRF', derivation.pbkdf2_prf) + # TODO: serialise ReferenceList/DataReference + for name in key_names: + mk_elem(derived_key, 'xenc11:MasterKeyName', name) + + @classmethod + def serialise_mac(cls, mac, container): + if not mac.algorithm and not mac.key: + return + mac_method = mk_elem( + container, 'pskc:MACMethod', Algorithm=mac.algorithm, empty=True) + mac_key = mk_elem(mac_method, 'pskc:MACKey', empty=True) + mk_elem( + mac_key, 'xenc:EncryptionMethod', + Algorithm=mac.pskc.encryption.algorithm) + cipher_data = mk_elem(mac_key, 'xenc:CipherData', empty=True) + if mac.key_cipher_value: + mk_elem(cipher_data, 'xenc:CipherValue', + base64.b64encode(mac.key_cipher_value).decode()) + elif mac.key_plain_value: + mk_elem(cipher_data, 'xenc:CipherValue', + base64.b64encode(mac.pskc.encryption.encrypt_value( + mac.key_plain_value)).decode()) + + @classmethod + def serialise_key(cls, key, container): + + key_package = mk_elem(container, 'pskc:KeyPackage', empty=True) + + if any(x is not None + for x in (key.manufacturer, key.serial, key.model, + key.issue_no, key.device_binding, key.start_date, + key.expiry_date, key.device_userid)): + device_info = mk_elem(key_package, 'pskc:DeviceInfo', empty=True) + mk_elem(device_info, 'pskc:Manufacturer', key.manufacturer) + mk_elem(device_info, 'pskc:SerialNo', key.serial) + mk_elem(device_info, 'pskc:Model', key.model) + mk_elem(device_info, 'pskc:IssueNo', key.issue_no) + mk_elem(device_info, 'pskc:DeviceBinding', key.device_binding) + mk_elem(device_info, 'pskc:StartDate', key.start_date) + mk_elem(device_info, 'pskc:ExpiryDate', key.expiry_date) + mk_elem(device_info, 'pskc:UserId', key.device_userid) + + if key.crypto_module is not None: + crypto_module = mk_elem(key_package, 'pskc:CryptoModuleInfo', + empty=True) + mk_elem(crypto_module, 'pskc:Id', key.crypto_module) + + key_elm = mk_elem(key_package, 'pskc:Key', empty=True, Id=key.id, + Algorithm=key.algorithm, ) + mk_elem(key_elm, 'pskc:Issuer', key.issuer) + + if any((key.algorithm_suite, key.challenge_encoding, + key.response_encoding, key.response_length)): + parameters = mk_elem(key_elm, 'pskc:AlgorithmParameters', + empty=True) + mk_elem(parameters, 'pskc:Suite', key.algorithm_suite) + mk_elem(parameters, 'pskc:ChallengeFormat', + Encoding=key.challenge_encoding, + Min=key.challenge_min_length, + Max=key.challenge_max_length, + CheckDigits=key.challenge_check) + mk_elem(parameters, 'pskc:ResponseFormat', + Encoding=key.response_encoding, + Length=key.response_length, + CheckDigits=key.response_check) + + mk_elem(key_elm, 'pskc:KeyProfileId', key.key_profile) + mk_elem(key_elm, 'pskc:KeyReference', key.key_reference) + mk_elem(key_elm, 'pskc:FriendlyName', key.friendly_name) + cls.serialise_datatype( + key._secret, key_elm, 'pskc:Secret', 'secret') + cls.serialise_datatype( + key._counter, key_elm, 'pskc:Counter', 'counter') + cls.serialise_datatype( + key._time_offset, key_elm, 'pskc:Time', 'time_offset') + cls.serialise_datatype( + key._time_interval, key_elm, 'pskc:TimeInterval', 'time_interval') + cls.serialise_datatype( + key._time_drift, key_elm, 'pskc:TimeDrift', 'time_drif') + mk_elem(key_elm, 'pskc:UserId', key.key_userid) + + cls.serialise_policy(key.policy, key_elm) + + @classmethod + def serialise_datatype(cls, dt, key_elm, tag, field): + # skip empty values + if dt.value in (None, '') and not dt.cipher_value: + return + # find the data tag and create our tag under it + data = find(key_elm, 'pskc:Data') + if data is None: + data = mk_elem(key_elm, 'pskc:Data', empty=True) + element = mk_elem(data, tag, empty=True) + # see if we should encrypt + if field in dt.pskc.encryption.fields and not dt.cipher_value: + dt.cipher_value = dt.pskc.encryption.encrypt_value( + dt._to_bin(dt.value)) + dt.algorithm = dt.pskc.encryption.algorithm + dt.value = None + # write out value + if dt.cipher_value: + encrypted_value = mk_elem( + element, 'pskc:EncryptedValue', empty=True) + mk_elem( + encrypted_value, 'xenc:EncryptionMethod', + Algorithm=dt.algorithm) + cipher_data = mk_elem( + encrypted_value, 'xenc:CipherData', empty=True) + mk_elem( + cipher_data, 'xenc:CipherValue', + base64.b64encode(dt.cipher_value).decode()) + if dt.value_mac: + mk_elem(element, 'pskc:ValueMAC', base64.b64encode( + dt.value_mac).decode()) + elif dt.pskc.mac.algorithm: + mk_elem(element, 'pskc:ValueMAC', base64.b64encode( + dt.pskc.mac.generate_mac(dt.cipher_value) + ).decode()) + else: + mk_elem(element, 'pskc:PlainValue', dt._to_text(dt.value)) + + @classmethod + def serialise_policy(cls, policy, key_elm): + # check if any policy attribute is set + if not policy.key_usage and all(x is None for x in ( + policy.start_date, policy.expiry_date, + policy.number_of_transactions, policy.pin_key_id, policy.pin_usage, + policy.pin_max_failed_attemtps, policy.pin_min_length, + policy.pin_max_length, policy.pin_encoding)): + return + policy_elm = mk_elem(key_elm, 'pskc:Policy', empty=True) + mk_elem(policy_elm, 'pskc:StartDate', policy.start_date) + mk_elem(policy_elm, 'pskc:ExpiryDate', policy.expiry_date) + mk_elem(policy_elm, 'pskc:PINPolicy', + PINKeyId=policy.pin_key_id, + PINUsageMode=policy.pin_usage, + MaxFailedAttempts=policy.pin_max_failed_attemtps, + MinLength=policy.pin_min_length, + MaxLength=policy.pin_max_length, + PINEncoding=policy.pin_encoding) + for usage in policy.key_usage: + mk_elem(policy_elm, 'pskc:KeyUsage', usage) + mk_elem(policy_elm, 'pskc:NumberOfTransactions', + policy.number_of_transactions) |