Arthur de Jong

Open Source / Free Software developer

summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorArthur de Jong <arthur@arthurdejong.org>2016-09-16 20:04:45 +0200
committerArthur de Jong <arthur@arthurdejong.org>2016-09-17 15:39:31 +0200
commit84bfb8a6ebd71be8119b03a9e88dde9554c5900d (patch)
tree438df9a54d6928f66f4176cf36c2922c89554b31
parent426e821b1366095990304b04bbc41b3af384fe21 (diff)
Move XML generation to own module
Similar to the change for parsing, move the XML serialisation of PSKC data to a single class in a separate module.
-rw-r--r--pskc/__init__.py13
-rw-r--r--pskc/encryption.py34
-rw-r--r--pskc/key.py92
-rw-r--r--pskc/mac.py22
-rw-r--r--pskc/policy.py24
-rw-r--r--pskc/serialiser.py214
6 files changed, 216 insertions, 183 deletions
diff --git a/pskc/__init__.py b/pskc/__init__.py
index 71401cd..b9126a3 100644
--- a/pskc/__init__.py
+++ b/pskc/__init__.py
@@ -81,16 +81,6 @@ class PSKC(object):
else:
self.version = '1.0'
- def make_xml(self):
- from pskc.xml import mk_elem
- container = mk_elem('pskc:KeyContainer', Version=self.version,
- Id=self.id)
- self.encryption.make_xml(container)
- self.mac.make_xml(container)
- for key in self.keys:
- key.make_xml(container)
- return container
-
def add_key(self, **kwargs):
"""Create a new key instance for the PSKC file.
@@ -109,8 +99,9 @@ class PSKC(object):
def write(self, filename):
"""Write the PSKC file to the provided file."""
from pskc.xml import tostring
+ from pskc.serialiser import PSKCSerialiser
if hasattr(filename, 'write'):
- xml = tostring(self.make_xml())
+ xml = tostring(PSKCSerialiser.serialise_document(self))
try:
filename.write(xml)
except TypeError: # pragma: no cover (Python 3 specific)
diff --git a/pskc/encryption.py b/pskc/encryption.py
index 80eb1b4..a493e79 100644
--- a/pskc/encryption.py
+++ b/pskc/encryption.py
@@ -26,8 +26,6 @@ algorithms and decryption.
The encryption key can be derived using the KeyDerivation class.
"""
-import base64
-
def algorithm_key_lengths(algorithm):
"""Return the possible key lengths for the configured algorithm."""
@@ -144,24 +142,6 @@ class KeyDerivation(object):
self.pbkdf2_key_length = None
self.pbkdf2_prf = None
- def make_xml(self, encryption_key, key_names):
- from pskc.xml import mk_elem
- derived_key = mk_elem(encryption_key, 'xenc11:DerivedKey', empty=True)
- key_derivation = mk_elem(derived_key, 'xenc11:KeyDerivationMethod',
- Algorithm=self.algorithm)
- if self.algorithm.endswith('#pbkdf2'):
- pbkdf2 = mk_elem(key_derivation, 'xenc11:PBKDF2-params',
- empty=True)
- if self.pbkdf2_salt:
- salt = mk_elem(pbkdf2, 'Salt', empty=True)
- mk_elem(salt, 'Specified', base64.b64encode(self.pbkdf2_salt))
- mk_elem(pbkdf2, 'IterationCount', self.pbkdf2_iterations)
- mk_elem(pbkdf2, 'KeyLength', self.pbkdf2_key_length)
- mk_elem(pbkdf2, 'PRF', self.pbkdf2_prf)
- # TODO: serialise ReferenceList/DataReference
- for name in key_names:
- mk_elem(derived_key, 'xenc11:MasterKeyName', name)
-
def derive_pbkdf2(self, password):
from Crypto.Protocol.KDF import PBKDF2
from pskc.mac import get_hmac
@@ -239,20 +219,6 @@ class Encryption(object):
self.derivation = KeyDerivation()
self.fields = []
- def make_xml(self, container):
- from pskc.xml import mk_elem
- if all(x is None
- for x in (self.id, self.key_name, self.key,
- self.derivation.algorithm)):
- return
- encryption_key = mk_elem(container, 'pskc:EncryptionKey',
- Id=self.id, empty=True)
- if self.derivation.algorithm:
- self.derivation.make_xml(encryption_key, self.key_names)
- else:
- for name in self.key_names:
- mk_elem(encryption_key, 'ds:KeyName', name)
-
@property
def key_name(self):
"""Provide the name of the (first) key."""
diff --git a/pskc/key.py b/pskc/key.py
index 4678f32..fe84db2 100644
--- a/pskc/key.py
+++ b/pskc/key.py
@@ -62,44 +62,6 @@ class DataType(object):
"""Convert the value to an unencrypted string representation."""
raise NotImplementedError # pragma: no cover
- def make_xml(self, key, tag, field):
- from pskc.xml import find, mk_elem
- # skip empty values
- if self.value in (None, '') and not self.cipher_value:
- return
- # find the data tag and create our tag under it
- data = find(key, 'pskc:Data')
- if data is None:
- data = mk_elem(key, 'pskc:Data', empty=True)
- element = mk_elem(data, tag, empty=True)
- # see if we should encrypt
- if field in self.pskc.encryption.fields and not self.cipher_value:
- self.cipher_value = self.pskc.encryption.encrypt_value(
- self._to_bin(self.value))
- self.algorithm = self.pskc.encryption.algorithm
- self.value = None
- # write out value
- if self.cipher_value:
- encrypted_value = mk_elem(
- element, 'pskc:EncryptedValue', empty=True)
- mk_elem(
- encrypted_value, 'xenc:EncryptionMethod',
- Algorithm=self.algorithm)
- cipher_data = mk_elem(
- encrypted_value, 'xenc:CipherData', empty=True)
- mk_elem(
- cipher_data, 'xenc:CipherValue',
- base64.b64encode(self.cipher_value).decode())
- if self.value_mac:
- mk_elem(element, 'pskc:ValueMAC', base64.b64encode(
- self.value_mac).decode())
- elif self.pskc.mac.algorithm:
- mk_elem(element, 'pskc:ValueMAC', base64.b64encode(
- self.pskc.mac.generate_mac(self.cipher_value)
- ).decode())
- else:
- mk_elem(element, 'pskc:PlainValue', self._to_text(self.value))
-
def get_value(self):
"""Provide the attribute value, decrypting as needed."""
if self.value is not None:
@@ -274,60 +236,6 @@ class Key(object):
self.policy = Policy(self)
- def make_xml(self, container):
- from pskc.xml import mk_elem
-
- key_package = mk_elem(container, 'pskc:KeyPackage', empty=True)
-
- if any(x is not None
- for x in (self.manufacturer, self.serial, self.model,
- self.issue_no, self.device_binding, self.start_date,
- self.expiry_date, self.device_userid)):
- device_info = mk_elem(key_package, 'pskc:DeviceInfo', empty=True)
- mk_elem(device_info, 'pskc:Manufacturer', self.manufacturer)
- mk_elem(device_info, 'pskc:SerialNo', self.serial)
- mk_elem(device_info, 'pskc:Model', self.model)
- mk_elem(device_info, 'pskc:IssueNo', self.issue_no)
- mk_elem(device_info, 'pskc:DeviceBinding', self.device_binding)
- mk_elem(device_info, 'pskc:StartDate', self.start_date)
- mk_elem(device_info, 'pskc:ExpiryDate', self.expiry_date)
- mk_elem(device_info, 'pskc:UserId', self.device_userid)
-
- if self.crypto_module is not None:
- crypto_module = mk_elem(key_package, 'pskc:CryptoModuleInfo',
- empty=True)
- mk_elem(crypto_module, 'pskc:Id', self.crypto_module)
-
- key = mk_elem(key_package, 'pskc:Key', empty=True, Id=self.id,
- Algorithm=self.algorithm, )
- mk_elem(key, 'pskc:Issuer', self.issuer)
-
- if any((self.algorithm_suite, self.challenge_encoding,
- self.response_encoding, self.response_length)):
- parameters = mk_elem(key, 'pskc:AlgorithmParameters', empty=True)
- mk_elem(parameters, 'pskc:Suite', self.algorithm_suite)
- mk_elem(parameters, 'pskc:ChallengeFormat',
- Encoding=self.challenge_encoding,
- Min=self.challenge_min_length,
- Max=self.challenge_max_length,
- CheckDigits=self.challenge_check)
- mk_elem(parameters, 'pskc:ResponseFormat',
- Encoding=self.response_encoding,
- Length=self.response_length,
- CheckDigits=self.response_check)
-
- mk_elem(key, 'pskc:KeyProfileId', self.key_profile)
- mk_elem(key, 'pskc:KeyReference', self.key_reference)
- mk_elem(key, 'pskc:FriendlyName', self.friendly_name)
- self._secret.make_xml(key, 'pskc:Secret', 'secret')
- self._counter.make_xml(key, 'pskc:Counter', 'counter')
- self._time_offset.make_xml(key, 'pskc:Time', 'time_offset')
- self._time_interval.make_xml(key, 'pskc:TimeInterval', 'time_interval')
- self._time_drift.make_xml(key, 'pskc:TimeDrift', 'time_drif')
- mk_elem(key, 'pskc:UserId', self.key_userid)
-
- self.policy.make_xml(key)
-
secret = property(
fget=lambda self: self._secret.get_value(),
fset=lambda self, x: self._secret.set_value(x),
diff --git a/pskc/mac.py b/pskc/mac.py
index 4ad8aa6..c5ac1ec 100644
--- a/pskc/mac.py
+++ b/pskc/mac.py
@@ -29,7 +29,6 @@ with the PSKC encryption key.
"""
-import base64
import re
@@ -83,27 +82,6 @@ class MAC(object):
self.key_cipher_value = None
self.key_algorithm = None
- def make_xml(self, container):
- from pskc.xml import mk_elem
- if not self.algorithm and not self.key:
- return
- mac_method = mk_elem(
- container, 'pskc:MACMethod', Algorithm=self.algorithm, empty=True)
- mac_key = mk_elem(mac_method, 'pskc:MACKey', empty=True)
- mk_elem(
- mac_key, 'xenc:EncryptionMethod',
- Algorithm=self.pskc.encryption.algorithm)
- cipher_data = mk_elem(mac_key, 'xenc:CipherData', empty=True)
- if self.key_cipher_value:
- mk_elem(
- cipher_data, 'xenc:CipherValue',
- base64.b64encode(self.key_cipher_value).decode())
- elif self.key_plain_value:
- mk_elem(
- cipher_data, 'xenc:CipherValue', base64.b64encode(
- self.pskc.encryption.encrypt_value(self.key_plain_value)
- ).decode())
-
@property
def key(self):
"""Provides access to the MAC key binary value if available."""
diff --git a/pskc/policy.py b/pskc/policy.py
index a955a16..3cfdad1 100644
--- a/pskc/policy.py
+++ b/pskc/policy.py
@@ -114,30 +114,6 @@ class Policy(object):
self.pin_encoding = None
self.unknown_policy_elements = False
- def make_xml(self, key):
- from pskc.xml import mk_elem
- # check if any policy attribute is set
- if not self.key_usage and all(x is None for x in (
- self.start_date, self.expiry_date,
- self.number_of_transactions, self.pin_key_id, self.pin_usage,
- self.pin_max_failed_attemtps, self.pin_min_length,
- self.pin_max_length, self.pin_encoding)):
- return
- policy = mk_elem(key, 'pskc:Policy', empty=True)
- mk_elem(policy, 'pskc:StartDate', self.start_date)
- mk_elem(policy, 'pskc:ExpiryDate', self.expiry_date)
- mk_elem(policy, 'pskc:PINPolicy',
- PINKeyId=self.pin_key_id,
- PINUsageMode=self.pin_usage,
- MaxFailedAttempts=self.pin_max_failed_attemtps,
- MinLength=self.pin_min_length,
- MaxLength=self.pin_max_length,
- PINEncoding=self.pin_encoding)
- for usage in self.key_usage:
- mk_elem(policy, 'pskc:KeyUsage', usage)
- mk_elem(policy, 'pskc:NumberOfTransactions',
- self.number_of_transactions)
-
def may_use(self, usage=None, now=None):
"""Check whether the key may be used for the provided purpose."""
import datetime
diff --git a/pskc/serialiser.py b/pskc/serialiser.py
new file mode 100644
index 0000000..f1bcdf9
--- /dev/null
+++ b/pskc/serialiser.py
@@ -0,0 +1,214 @@
+# serialiser.py - PSKC file parsing functions
+# coding: utf-8
+#
+# Copyright (C) 2016 Arthur de Jong
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+# 02110-1301 USA
+
+"""Module for serialising PSKC files to XML."""
+
+
+import base64
+
+from pskc.xml import find, mk_elem
+
+
+class PSKCSerialiser(object):
+
+ @classmethod
+ def serialise_document(cls, pskc):
+ container = mk_elem('pskc:KeyContainer', Version=pskc.version,
+ Id=pskc.id)
+ cls.serialise_encryption(pskc.encryption, container)
+ cls.serialise_mac(pskc.mac, container)
+ for key in pskc.keys:
+ cls.serialise_key(key, container)
+ return container
+
+ @classmethod
+ def serialise_encryption(cls, encryption, container):
+ if all(x is None
+ for x in (encryption.id, encryption.key_name, encryption.key,
+ encryption.derivation.algorithm)):
+ return
+ encryption_key = mk_elem(container, 'pskc:EncryptionKey',
+ Id=encryption.id, empty=True)
+ if encryption.derivation.algorithm:
+ cls.serialise_key_derivation(
+ encryption.derivation, encryption_key, encryption.key_names)
+ else:
+ for name in encryption.key_names:
+ mk_elem(encryption_key, 'ds:KeyName', name)
+
+ @classmethod
+ def serialise_key_derivation(cls, derivation, encryption_key, key_names):
+ derived_key = mk_elem(encryption_key, 'xenc11:DerivedKey', empty=True)
+ key_derivation = mk_elem(derived_key, 'xenc11:KeyDerivationMethod',
+ Algorithm=derivation.algorithm)
+ if derivation.algorithm.endswith('#pbkdf2'):
+ pbkdf2 = mk_elem(key_derivation, 'xenc11:PBKDF2-params',
+ empty=True)
+ if derivation.pbkdf2_salt:
+ salt = mk_elem(pbkdf2, 'Salt', empty=True)
+ mk_elem(salt, 'Specified',
+ base64.b64encode(derivation.pbkdf2_salt))
+ mk_elem(pbkdf2, 'IterationCount', derivation.pbkdf2_iterations)
+ mk_elem(pbkdf2, 'KeyLength', derivation.pbkdf2_key_length)
+ mk_elem(pbkdf2, 'PRF', derivation.pbkdf2_prf)
+ # TODO: serialise ReferenceList/DataReference
+ for name in key_names:
+ mk_elem(derived_key, 'xenc11:MasterKeyName', name)
+
+ @classmethod
+ def serialise_mac(cls, mac, container):
+ if not mac.algorithm and not mac.key:
+ return
+ mac_method = mk_elem(
+ container, 'pskc:MACMethod', Algorithm=mac.algorithm, empty=True)
+ mac_key = mk_elem(mac_method, 'pskc:MACKey', empty=True)
+ mk_elem(
+ mac_key, 'xenc:EncryptionMethod',
+ Algorithm=mac.pskc.encryption.algorithm)
+ cipher_data = mk_elem(mac_key, 'xenc:CipherData', empty=True)
+ if mac.key_cipher_value:
+ mk_elem(cipher_data, 'xenc:CipherValue',
+ base64.b64encode(mac.key_cipher_value).decode())
+ elif mac.key_plain_value:
+ mk_elem(cipher_data, 'xenc:CipherValue',
+ base64.b64encode(mac.pskc.encryption.encrypt_value(
+ mac.key_plain_value)).decode())
+
+ @classmethod
+ def serialise_key(cls, key, container):
+
+ key_package = mk_elem(container, 'pskc:KeyPackage', empty=True)
+
+ if any(x is not None
+ for x in (key.manufacturer, key.serial, key.model,
+ key.issue_no, key.device_binding, key.start_date,
+ key.expiry_date, key.device_userid)):
+ device_info = mk_elem(key_package, 'pskc:DeviceInfo', empty=True)
+ mk_elem(device_info, 'pskc:Manufacturer', key.manufacturer)
+ mk_elem(device_info, 'pskc:SerialNo', key.serial)
+ mk_elem(device_info, 'pskc:Model', key.model)
+ mk_elem(device_info, 'pskc:IssueNo', key.issue_no)
+ mk_elem(device_info, 'pskc:DeviceBinding', key.device_binding)
+ mk_elem(device_info, 'pskc:StartDate', key.start_date)
+ mk_elem(device_info, 'pskc:ExpiryDate', key.expiry_date)
+ mk_elem(device_info, 'pskc:UserId', key.device_userid)
+
+ if key.crypto_module is not None:
+ crypto_module = mk_elem(key_package, 'pskc:CryptoModuleInfo',
+ empty=True)
+ mk_elem(crypto_module, 'pskc:Id', key.crypto_module)
+
+ key_elm = mk_elem(key_package, 'pskc:Key', empty=True, Id=key.id,
+ Algorithm=key.algorithm, )
+ mk_elem(key_elm, 'pskc:Issuer', key.issuer)
+
+ if any((key.algorithm_suite, key.challenge_encoding,
+ key.response_encoding, key.response_length)):
+ parameters = mk_elem(key_elm, 'pskc:AlgorithmParameters',
+ empty=True)
+ mk_elem(parameters, 'pskc:Suite', key.algorithm_suite)
+ mk_elem(parameters, 'pskc:ChallengeFormat',
+ Encoding=key.challenge_encoding,
+ Min=key.challenge_min_length,
+ Max=key.challenge_max_length,
+ CheckDigits=key.challenge_check)
+ mk_elem(parameters, 'pskc:ResponseFormat',
+ Encoding=key.response_encoding,
+ Length=key.response_length,
+ CheckDigits=key.response_check)
+
+ mk_elem(key_elm, 'pskc:KeyProfileId', key.key_profile)
+ mk_elem(key_elm, 'pskc:KeyReference', key.key_reference)
+ mk_elem(key_elm, 'pskc:FriendlyName', key.friendly_name)
+ cls.serialise_datatype(
+ key._secret, key_elm, 'pskc:Secret', 'secret')
+ cls.serialise_datatype(
+ key._counter, key_elm, 'pskc:Counter', 'counter')
+ cls.serialise_datatype(
+ key._time_offset, key_elm, 'pskc:Time', 'time_offset')
+ cls.serialise_datatype(
+ key._time_interval, key_elm, 'pskc:TimeInterval', 'time_interval')
+ cls.serialise_datatype(
+ key._time_drift, key_elm, 'pskc:TimeDrift', 'time_drif')
+ mk_elem(key_elm, 'pskc:UserId', key.key_userid)
+
+ cls.serialise_policy(key.policy, key_elm)
+
+ @classmethod
+ def serialise_datatype(cls, dt, key_elm, tag, field):
+ # skip empty values
+ if dt.value in (None, '') and not dt.cipher_value:
+ return
+ # find the data tag and create our tag under it
+ data = find(key_elm, 'pskc:Data')
+ if data is None:
+ data = mk_elem(key_elm, 'pskc:Data', empty=True)
+ element = mk_elem(data, tag, empty=True)
+ # see if we should encrypt
+ if field in dt.pskc.encryption.fields and not dt.cipher_value:
+ dt.cipher_value = dt.pskc.encryption.encrypt_value(
+ dt._to_bin(dt.value))
+ dt.algorithm = dt.pskc.encryption.algorithm
+ dt.value = None
+ # write out value
+ if dt.cipher_value:
+ encrypted_value = mk_elem(
+ element, 'pskc:EncryptedValue', empty=True)
+ mk_elem(
+ encrypted_value, 'xenc:EncryptionMethod',
+ Algorithm=dt.algorithm)
+ cipher_data = mk_elem(
+ encrypted_value, 'xenc:CipherData', empty=True)
+ mk_elem(
+ cipher_data, 'xenc:CipherValue',
+ base64.b64encode(dt.cipher_value).decode())
+ if dt.value_mac:
+ mk_elem(element, 'pskc:ValueMAC', base64.b64encode(
+ dt.value_mac).decode())
+ elif dt.pskc.mac.algorithm:
+ mk_elem(element, 'pskc:ValueMAC', base64.b64encode(
+ dt.pskc.mac.generate_mac(dt.cipher_value)
+ ).decode())
+ else:
+ mk_elem(element, 'pskc:PlainValue', dt._to_text(dt.value))
+
+ @classmethod
+ def serialise_policy(cls, policy, key_elm):
+ # check if any policy attribute is set
+ if not policy.key_usage and all(x is None for x in (
+ policy.start_date, policy.expiry_date,
+ policy.number_of_transactions, policy.pin_key_id, policy.pin_usage,
+ policy.pin_max_failed_attemtps, policy.pin_min_length,
+ policy.pin_max_length, policy.pin_encoding)):
+ return
+ policy_elm = mk_elem(key_elm, 'pskc:Policy', empty=True)
+ mk_elem(policy_elm, 'pskc:StartDate', policy.start_date)
+ mk_elem(policy_elm, 'pskc:ExpiryDate', policy.expiry_date)
+ mk_elem(policy_elm, 'pskc:PINPolicy',
+ PINKeyId=policy.pin_key_id,
+ PINUsageMode=policy.pin_usage,
+ MaxFailedAttempts=policy.pin_max_failed_attemtps,
+ MinLength=policy.pin_min_length,
+ MaxLength=policy.pin_max_length,
+ PINEncoding=policy.pin_encoding)
+ for usage in policy.key_usage:
+ mk_elem(policy_elm, 'pskc:KeyUsage', usage)
+ mk_elem(policy_elm, 'pskc:NumberOfTransactions',
+ policy.number_of_transactions)