Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1#!/usr/bin/python3 

2 

3""" 

4Helper module used for XenRT testing of the VDI encryption feature (REQ-718). 

5This module implements the key lookup plugin interface, so if it is 

6installed, SM will use it to retrieve keys based on their hashes. 

7This key store is backed by a file stored on disk in dom0, and helper 

8functions are provided to manipulate it. 

9""" 

10import base64 

11import os 

12import os.path 

13import hashlib 

14import json 

15import argparse 

16import string 

17import sys 

18from random import SystemRandom 

19 

20sys.path.append('/opt/xensource/sm/') 

21import util 

22 

23import XenAPI 

24 

25PROGRAM_NAME = 'keymanagerutil' 

26 

27 

28def load_key(key_hash, vdi_uuid): 

29 """ 

30 load_key is called by SM plugin when it needs to find the key for 

31 specified key_hash from the key store 

32 """ 

33 _check_key(key_hash, vdi_uuid) 

34 try: 

35 key = KeyManager(key_hash=key_hash).get_key(log_key_info=False) 

36 return key 

37 except KeyLookUpError: 

38 return None 

39 

40 

41def _check_key(key_hash, vdi_uuid): 

42 session = XenAPI.xapi_local() 

43 session.xenapi.login_with_password('root', '', '', PROGRAM_NAME) 

44 try: 

45 vdi = session.xenapi.VDI.get_by_uuid(vdi_uuid) 

46 sm_config = session.xenapi.VDI.get_sm_config(vdi) 

47 if 'key_hash' in sm_config: 

48 if key_hash != sm_config['key_hash']: 

49 raise Exception('A key was requested with key hash {}' 

50 ' for VDI {}, but it has a different' 

51 ' key_hash in its sm_config:' 

52 ' {}'.format(key_hash, vdi_uuid, sm_config['key_hash'])) 

53 else: 

54 raise Exception('Encryption key requested for VDI {}' 

55 ' whose sm_config does not contain the key_hash' 

56 ' entry. Its sm_config is {}'.format(vdi_uuid, sm_config)) 

57 finally: 

58 session.xenapi.logout() 

59 

60 

61class InputError(Exception): 

62 def __init__(self, message): 

63 super(InputError, self).__init__(message) 

64 

65 

66class KeyLookUpError(Exception): 

67 """Raised when the key / key hash we've requested is not in the keystore""" 

68 

69 def __init__(self, message): 

70 super(KeyLookUpError, self).__init__(message) 

71 

72 

73def _print_key_info(key=None, key_hash=None): 

74 """ 

75 Output the key details as JSON to the standard output. This output 

76 will be interpreted by XenRT. 

77 """ 

78 data = {} 

79 if key: 

80 data['key_base64'] = base64.b64encode(key).decode() 

81 if key_hash: 81 ↛ 83line 81 didn't jump to line 83, because the condition on line 81 was never false

82 data['key_hash'] = key_hash 

83 print(json.dumps(data)) 

84 

85 

86KEYSTORE_PATH = '/tmp/keystore.json' 

87 

88 

89def _read_keystore(): 

90 """If the keystore file exists, returns its contents, otherwise returns an empty dictionary.""" 

91 if os.path.isfile(KEYSTORE_PATH): 

92 with open(KEYSTORE_PATH, "r") as key_store_file: 

93 key_store = json.load(key_store_file) 

94 for key_hash in key_store: 

95 key_base64 = key_store[key_hash] 

96 key = base64.b64decode(key_base64) 

97 key_store[key_hash] = key 

98 return key_store 

99 else: 

100 util.SMlog(f'Keystore path {KEYSTORE_PATH} not found') 

101 return {} 

102 

103 

104def _write_keystore(key_store): 

105 """ 

106 Write the given key store contents to the key store file, which will be 

107 created if it does not exist. 

108 """ 

109 for key_hash in key_store: 

110 key = key_store[key_hash] 

111 key_base64 = base64.b64encode(key).decode() 

112 key_store[key_hash] = key_base64 

113 with open(KEYSTORE_PATH, "w+") as key_store_file: 

114 json.dump(key_store, key_store_file) 

115 key_store_file.write("\n") 

116 

117 

118class KeyManager(object): 

119 """ 

120 KeyManager is a python utility tool for generating and managing the keys in the jey store. 

121 One can request KeyManager to generate the keys, passing just the type of 

122 the key - either strong or weak or even the length of the key. 

123 One can request KeyManger to get the key from the key store by passing key_hash. 

124 One can request KeyManager to get the key_hash from the key store by passing encryption key. 

125 KeyManager maintains the keystore(json record) under /tmp/keystore.json. 

126 

127 """ 

128 

129 def __init__(self, key_type=None, key_length=None, key=None, key_hash=None): 

130 self.key_type = key_type 

131 self.key_length = key_length 

132 self.key = key 

133 self.key_hash = key_hash 

134 

135 def __add_to_keystore(self): 

136 """ 

137 Update the key and key hash in the key store - requires both hash and 

138 key. 

139 """ 

140 if not self.key_hash or not self.key: 140 ↛ 141line 140 didn't jump to line 141, because the condition on line 140 was never true

141 raise InputError("Need both key_hash and key to update into key store") 

142 key_store = _read_keystore() 

143 key_store[self.key_hash] = self.key 

144 _write_keystore(key_store) 

145 

146 def __hash_key(self): 

147 

148 # hash the given key - requires key 

149 if not self.key: 149 ↛ 150line 149 didn't jump to line 150, because the condition on line 149 was never true

150 raise InputError("Need key to hash") 

151 

152 hash_it = hashlib.new('sha256') 

153 hash_it.update(b'\0' * 32) 

154 hash_it.update(self.key) 

155 self.key_hash = hash_it.hexdigest() 

156 return self.key_hash 

157 

158 def generate(self): 

159 """ 

160 generate the encryption key 

161 Hash the generated key 

162 Update the key store with key and hash 

163 """ 

164 util.SMlog('Generating key') 

165 self.key = _get_key_generator(key_length=self.key_length, key_type=self.key_type).generate() 

166 self.key_hash = self.__hash_key() 

167 _print_key_info(key=self.key, key_hash=self.key_hash) 

168 util.SMlog(f'Generated key, hash {self.key_hash}') 

169 self.__add_to_keystore() 

170 

171 def get_key(self, log_key_info=True): 

172 """Fetch the key from the key store based on the key_hash - requires key hash""" 

173 if not self.key_hash: 

174 util.SMlog('No key hash set, cannot retrieve unknown key') 

175 raise InputError("Need key hash to get the key from the key store") 

176 

177 key_store = _read_keystore() 

178 key = key_store.get(self.key_hash, None) 

179 if key and log_key_info: 179 ↛ 180line 179 didn't jump to line 180, because the condition on line 179 was never true

180 _print_key_info(key=key) 

181 if not key: 

182 util.SMlog(f'No key found in keystore for hash {self.key_hash}, known hashes {key_store.keys()}') 

183 raise KeyLookUpError("No keys in the keystore which matches the given key hash") 

184 

185 return key 

186 

187 def get_keyhash(self): 

188 """Fetch the key hash from the key store based on the key - requires key""" 

189 if not self.key: 189 ↛ 190line 189 didn't jump to line 190, because the condition on line 189 was never true

190 raise InputError("Need key to get the key hash from the key store") 

191 key_store = _read_keystore() 

192 try: 

193 hashes = [x for x in key_store.keys() if key_store[x] == self.key] 

194 key_hash = hashes[0] if hashes else None 

195 _print_key_info(key_hash=key_hash) 

196 except ValueError: 

197 raise KeyLookUpError("No key hash in the keystore which matches the given key") 

198 

199 def update_keystore(self): 

200 """If this key hash is already in the key store, update its corresponding key""" 

201 

202 if not (self.key_hash and self.key): 

203 raise InputError("Need key hash and key to update the key store") 

204 

205 key_store = _read_keystore() 

206 if self.key_hash in key_store: 

207 key_store[self.key_hash] = self.key 

208 else: 

209 raise InputError("No existing key in the keystore" 

210 "with key hash {}".format(self.key_hash)) 

211 _write_keystore(key_store) 

212 

213 

214def _get_key_generator(key_length=None, key_type=None): 

215 if key_type == "alphanumeric": 215 ↛ 216line 215 didn't jump to line 216, because the condition on line 215 was never true

216 return AlphaNumericKeyGenerator(key_length=key_length) 

217 elif key_length: 217 ↛ 218line 217 didn't jump to line 218, because the condition on line 217 was never true

218 return RandomKeyGenerator(key_length=key_length) 

219 elif key_type == "weak": 219 ↛ 220line 219 didn't jump to line 220, because the condition on line 219 was never true

220 return WeakKeyGenerator() 

221 elif key_type == "strong": 221 ↛ 224line 221 didn't jump to line 224, because the condition on line 221 was never false

222 return StrongKeyGenerator() 

223 else: 

224 raise InputError("Either key_length in byte or key_type(\"strong OR weak\")" 

225 " should be specified to generate the key") 

226 

227 

228class RandomKeyGenerator(object): 

229 """Generates a completely random key of the specified length""" 

230 

231 def __init__(self, key_length): 

232 self.key_length = key_length 

233 

234 def generate(self): 

235 """Generate a completely random byte sequence""" 

236 return os.urandom(self.key_length) 

237 

238 

239class StrongKeyGenerator(RandomKeyGenerator): 

240 """Generates a completely random 512-bit key""" 

241 

242 def __init__(self): 

243 super(StrongKeyGenerator, self).__init__(key_length=64) 

244 

245 

246class WeakKeyGenerator(RandomKeyGenerator): 

247 """Generates a completely random 256-bit key""" 

248 

249 def __init__(self): 

250 super(WeakKeyGenerator, self).__init__(key_length=32) 

251 

252 

253class AlphaNumericKeyGenerator(object): 

254 """Generates alphanumeric keys""" 

255 

256 def __init__(self, key_length=None): 

257 self.key_length = 64 if key_length is None else key_length 

258 

259 def generate(self): 

260 """Generate a completely random alphanumeric sequence""" 

261 keys_from = string.ascii_letters + string.digits 

262 return ("".join([SystemRandom().choice(keys_from) for _ in range(self.key_length)])).encode("utf-8") 

263 

264 

265if __name__ == '__main__': 265 ↛ 267line 265 didn't jump to line 267, because the condition on line 265 was never true

266 

267 parser = argparse.ArgumentParser() 

268 

269 parser.add_argument('--generatekey', action='store_true', dest='generate', 

270 default=False, 

271 help="Generates the encryption key based on the given either keytype or keylength") 

272 

273 parser.add_argument('--getkey', action='store_true', dest='get_key', 

274 default=False, help="To get the key from the keystore based on the given key hash") 

275 

276 parser.add_argument('--getkeyhash', action='store_true', dest='get_key_hash', 

277 default=False, help="To get the key hash from the keystore based on the given key") 

278 

279 parser.add_argument('--updatekeystore', action='store_true', dest='update_keystore', 

280 default=False, 

281 help="If needs to update the already existing key in the keystore pass the keyHash and new key") 

282 

283 parser.add_argument('--keytype', action='store', dest='key_type', default=None, 

284 help='Type of the key: values expected weak or strong') 

285 

286 parser.add_argument('--keylength', action='store', default=None, type=int, 

287 dest='key_length', 

288 help='length of the encryption key in byte') 

289 

290 parser.add_argument('--keyhash', action='store', dest='key_hash', default=None, 

291 help='Encryption key') 

292 

293 parser.add_argument('--key', action='store', dest='key', default=None, 

294 help='Base64-encoded encryption key') 

295 

296 parser_input = parser.parse_args() 

297 

298 if parser_input.key: 

299 parser_input.key = base64.b64decode(parser_input.key) 

300 

301 if parser_input.generate: 

302 KeyManager(key_type=parser_input.key_type, key_length=parser_input.key_length).generate() 

303 elif parser_input.get_key: 

304 KeyManager(key_hash=parser_input.key_hash).get_key() 

305 elif parser_input.get_key_hash: 

306 KeyManager(key=parser_input.key).get_keyhash() 

307 elif parser_input.update_keystore: 

308 KeyManager(key_hash=parser_input.key_hash, key=parser_input.key).update_keystore()