Coverage for tests/__init__.py: 91%

146 statements  

« prev     ^ index     » next       coverage.py v7.4.3, created at 2024-02-26 12:11 +0000

1# -*- encoding: utf-8 -*- 

2# pass-import - test suite common resources 

3# Copyright (C) 2017-2024 Alexandre PUJOL <alexandre@pujol.io>. 

4# SPDX-License-Identifier: GPL-3.0-or-later 

5"""pass-import test suite common resources. 

6It provides: 

7 - tests.tmp Path to the test temporary directory. 

8 - tests.tests Root path for tests 

9 - tests.assets Root path of tests assets. 

10 - tests.db Root path of db where the files to import live. 

11 - tests.formats Root path with basic malformed formats. 

12 - tests.managers Interface to manage the managers classes. 

13 - tests.conf Dictionary with managers tests settings. 

14 - tests.Tests() Base test class. 

15 - tests.yaml_load() Open and load a yaml reference resource. 

16 - tests.cls() Load a password manager object. 

17 - tests.reference() Set the expected reference data for a given manager. 

18 - tests.clear() Clear data from key not in keep. 

19 - tests.captured() Context manager to capture stdout. 

20 - tests.mocked() Mock cloud password managers API response. 

21 - tests.skipIfNo() Skip a password manager test if it is disabled. 

22 - tests.skipIfNoInstalled() Skip a test if a program is not installed. 

23 - tests.skipIfNoModule() Skip a test if an optional module is not installed. 

24 - tests.mock_hibp() Mock HIBP API response. 

25""" 

26 

27import os 

28import sys 

29import shutil 

30import unittest 

31from io import StringIO 

32from contextlib import contextmanager 

33import yaml 

34 

35import pass_import 

36import pass_import.__main__ 

37 

38 

39tmp = '/tmp/tests/pass-import/' # nosec 

40tests = os.path.abspath('tests') 

41assets = os.path.join(tests, 'assets') + os.sep 

42formats = os.path.join(assets, 'format') + os.sep 

43db = os.path.join(assets, 'db') + os.sep 

44managers = pass_import.Managers() 

45with open(os.path.join(tests, 'tests.yml'), 'r') as cfile: 

46 conf = yaml.safe_load(cfile) 

47 

48 

49def _id(obj): 

50 return obj 

51 

52 

53def skipIfNo(name: str): 

54 """Skip a password manager test if it is disabled.""" 

55 if name not in {'bitwarden', 'lastpass', 'onepassword'}: 

56 return _id 

57 manager = name.upper() 

58 enabled = 'T_%s' % manager 

59 password = 'TESTS_%s_PASS' % manager 

60 if not (enabled in os.environ and password in os.environ): 

61 return unittest.skip(f"Skipping: {name} tests disabled.") 

62 return _id 

63 

64 

65def skipIfNoInstalled(name: str): 

66 """Skip a test if a program is not installed.""" 

67 if shutil.which(name) is None: 

68 return unittest.skip(f"Skipping: {name} not installed disabled.") 

69 return _id 

70 

71 

72def skipIfNoModule(name: str): 

73 """Skip a test if an optional module is not installed.""" 

74 try: 

75 __import__(name) 

76 except ImportError: 

77 return unittest.skip(f"Skipping: module {name} not installed.") 

78 return _id 

79 

80 

81def mocked(manager, cmd): 

82 """Mock cloud password managers API response.""" 

83 names = {'bitwarden', 'lastpass', 'onepassword'} 

84 if manager not in names: 

85 return '' 

86 

87 path = os.path.join(db, manager, cmd) 

88 with open(path) as file: 

89 return file.read() 

90 

91 

92@contextmanager 

93def captured(): 

94 """Context manager to capture stdout.""" 

95 new_out, new_err = StringIO(), StringIO() 

96 old_out, old_err = sys.stdout, sys.stderr 

97 try: 

98 sys.stdout, sys.stderr = new_out, new_err 

99 yield sys.stdout, sys.stderr 

100 finally: 

101 sys.stdout, sys.stderr = old_out, old_err 

102 

103 

104def yaml_load(ref_path): 

105 """Open and load a yaml reference resource.""" 

106 ref_path = os.path.join(assets, 'references', ref_path) 

107 with open(ref_path, 'r') as file: 

108 return yaml.safe_load(file) 

109 

110 

111def cls(name, prefix=None, **args): 

112 """Load a password manager object.""" 

113 if not prefix: 

114 prefix = os.path.join(db, conf[name]['path']) 

115 settings = {'extra': True} 

116 for key, value in args.items(): 

117 settings[key] = value 

118 return managers.get(name)(prefix, settings=settings) 

119 

120 

121def reference(name=None): 

122 """Set the expected reference data for a given manager. 

123 

124 Some password managers do not store a lot off data (no group...). 

125 Therefore, we need to remove these entries from the reference data 

126 when testing these managers. 

127 

128 """ 

129 with open(assets + '/references/main.yml', 'r') as file: 

130 ref = yaml.safe_load(file) 

131 if name: 

132 if 'without' in conf[name]: 

133 for key in conf[name]['without']: 

134 for entry in ref: 

135 entry.pop(key, None) 

136 if 'root' in conf[name]: 

137 for entry in ref: 

138 entry['group'] = conf[name]['root'] + entry['group'] 

139 return ref 

140 

141 

142def clear(data, keep=None): 

143 """Clear data from key not in keep.""" 

144 if not keep: 

145 keep = ['title', 'password', 'login', 'url', 'comments', 'group'] 

146 for entry in data: 

147 delete = [k for k in entry.keys() if k not in keep] 

148 empty = [k for k, v in entry.items() if not v] 

149 delete.extend(empty) 

150 for key in delete: 

151 entry.pop(key, None) 

152 

153 

154def mock_hibp(*args, **kwargs): 

155 """Mock HIBP API response.""" 

156 

157 class MockResponse: 

158 def __init__(self): 

159 data = [ 

160 "D5EE0CB1A41071812CCED2F1930E6E1A5D2:2", 

161 "2DC183F740EE76F27B78EB39C8AD972A757:52579", 

162 "CF164D7A51A1FD864B1BF9E1CE8A3EC171B:4", 

163 "D0B910E7A3028703C0B30039795E908CEB2:7", 

164 "AD6438836DBE526AA231ABDE2D0EEF74D42:3", 

165 "EBAB0A7CE978E0194608B572E4F9404AA21:3", 

166 "17727EAB0E800E62A776C76381DEFBC4145:120", 

167 "5370372AC65308F03F6ED75EC6068C8E1BE:1386", 

168 "1E4C9B93F3F0682250B6CF8331B7EE68FD8:3730471", 

169 "437FAA5A7FCE15D1DDCB9EAEAEA377667B8:123422", 

170 "944C22589AC652B0F47918D58CA0CDCCB63:411" 

171 ] 

172 

173 self.text = "\r\n".join(data) 

174 

175 def raise_for_status(self): 

176 pass 

177 

178 return MockResponse() 

179 

180 

181class Test(unittest.TestCase): 

182 """Common resources for all tests. 

183 

184 :param str key: Optional key for a password manager. 

185 :param str token: Optional token for a password manager. 

186 :param str login: Login for a password manager. 

187 :param str prefix: Path to a password repository. 

188 :param str masterpassword: Master password used for a password manager. 

189 :param list gpgids: Test GPGIDs. 

190 

191 """ 

192 key = '' 

193 token = '' # nosec 

194 login = '' 

195 prefix = '' 

196 masterpassword = 'correct horse battery staple' 

197 gpgids = ['D4C78DB7920E1E27F5416B81CC9DB947CF90C77B', ''] 

198 

199 def __init__(self, methodName='runTest'): # noqa 

200 super().__init__(methodName) 

201 

202 # GPG keyring & pass settings 

203 os.environ.pop('GPG_AGENT_INFO', None) 

204 os.environ.pop('PASSWORD_STORE_SIGNING_KEY', None) 

205 os.environ['GNUPGHOME'] = os.path.join(os.getcwd(), assets + 'gnupg') 

206 

207 # Main related method 

208 

209 def main(self, cmd, code=None, msg=''): 

210 """Call to the main function.""" 

211 sys.argv = ['main'] 

212 sys.argv.extend(cmd) 

213 if code is None: 

214 pass_import.__main__.main() 

215 elif msg == '': 

216 with self.assertRaises(SystemExit) as cm: 

217 pass_import.__main__.main() 

218 self.assertEqual(cm.exception.code, code) 

219 else: 

220 with captured() as (out, err): 

221 with self.assertRaises(SystemExit) as cm: 

222 pass_import.__main__.main() 

223 if code == 0: 

224 message = out.getvalue().strip() 

225 else: 

226 message = err.getvalue().strip() 

227 self.assertIn(msg, message) 

228 self.assertEqual(cm.exception.code, code) 

229 

230 # Export related method 

231 

232 def _tmpdir(self, path=''): 

233 """Create a temporary test directory named after the testname.""" 

234 self.prefix = os.path.join(tmp, self._testMethodName) 

235 

236 # Re-initialize the test directory 

237 if os.path.isdir(self.prefix): 

238 shutil.rmtree(self.prefix, ignore_errors=True) 

239 os.makedirs(self.prefix, exist_ok=True) 

240 

241 if path != '': 

242 self.prefix = os.path.join(self.prefix, path) 

243 

244 # Import related methods 

245 

246 def assertImport(self, data, refdata, keep=None): # noqa 

247 """Compare imported data with the reference data.""" 

248 clear(data, keep) 

249 self.assertEqual(len(data), len(refdata)) 

250 for entry in data: 

251 self.assertIn(entry, refdata) 

252 

253 # Special exporter access methods 

254 

255 def _credentials(self, manager=''): 

256 """Set credentials for cloud based password managers.""" 

257 names = {'bitwarden', 'lastpass', 'onepassword'} 

258 if manager in names: 

259 name = manager.upper() 

260 self.masterpassword = os.environ.get(f'TESTS_{name}_PASS', '') 

261 self.login = os.environ.get(f'TESTS_{name}_LOGIN', '') 

262 self.key = os.environ.get(f'TESTS_{name}_SECRETKEY', '') 

263 self.token = os.environ.get(f'TESTS_{name}_TOKEN', '') 

264 

265 def _init_pass(self): 

266 """Initialize a new password store repository.""" 

267 with open(os.path.join(self.prefix, '.gpg-id'), 'w') as file: 

268 file.write('\n'.join(self.gpgids)) 

269 

270 def _init_keepass(self): 

271 """Initialize a new keepass repository.""" 

272 shutil.copyfile(assets + 'export/keepass.kdbx', self.prefix)