Coverage for pass_import/formats/kdbx.py: 90%

154 statements  

« prev     ^ index     » next       coverage.py v7.4.3, created at 2024-02-26 12:11 +0000

1# -*- encoding: utf-8 -*- 

2# pass import - Passwords importer swiss army knife 

3# Copyright (C) 2017-2024 Alexandre PUJOL <alexandre@pujol.io>. 

4# 

5 

6import os 

7import re 

8import uuid 

9 

10try: 

11 from pykeepass import PyKeePass 

12 from pykeepass.exceptions import (CredentialsError, HeaderChecksumError, 

13 PayloadChecksumError) 

14 PYKEEPASS = True 

15except ImportError: 

16 PYKEEPASS = False 

17 

18from pass_import.core import Cap, register_detecters 

19from pass_import.detecter import Formatter 

20from pass_import.errors import PMError 

21from pass_import.manager import PasswordExporter, PasswordImporter 

22from pass_import.tools import getpassword 

23 

24 

25class KDBX(Formatter, PasswordImporter, PasswordExporter): 

26 """Base class for KDBX based importer & exporter. 

27 

28 The importer supports binary attachments. It requires PyKeePass to run. 

29 

30 :param PyKeePass keepass: The keepass repository to work on. 

31 :param list attributes: List of the attributes of PyKeePass to import. 

32 

33 """ 

34 cap = Cap.FORMAT | Cap.IMPORT | Cap.EXPORT 

35 name = 'keepass' 

36 format = 'kdbx' 

37 magic = b'\x03\xd9\xa2\x9a' 

38 keys = {'login': 'username', 'comments': 'notes', 'group': 'path'} 

39 attributes = { 

40 'title', 'username', 'password', 'url', 'notes', 'icon', 'tags', 

41 'autotype_enabled', 'autotype_sequence', 'is_a_history_entry' 

42 } 

43 reference = re.compile(r'\{REF:([A-Z])@I:([0-9A-F]{32})\}') 

44 

45 def __init__(self, prefix=None, settings=None): 

46 self.keepass = None 

47 settings = {} if settings is None else settings 

48 keyfile = settings.get('key', '') 

49 self.keyfile = None if keyfile == '' else keyfile 

50 super().__init__(prefix, settings) 

51 

52 # Import methods 

53 

54 def _getentry(self, kpentry): 

55 entry = {} 

56 if kpentry.path is not None: 

57 entry['group'] = '' 

58 for item in kpentry.path: 

59 if item is not None: 

60 entry['group'] = os.path.join(entry['group'], item) 

61 keys = self.invkeys() 

62 for attr in self.attributes: 

63 if hasattr(kpentry, attr): 

64 value = getattr(kpentry, attr) 

65 if isinstance(value, str): 

66 value = self._subref(value) 

67 entry[keys.get(attr, attr)] = value 

68 for key, value in kpentry.custom_properties.items(): 

69 if isinstance(value, str): 

70 value = self._subref(value) 

71 entry[key] = value 

72 if kpentry.otp is not None: 

73 otpauth = kpentry.otp 

74 else: 

75 otpauth = self._getotpauth(kpentry.custom_properties) 

76 if otpauth: 

77 entry['otpauth'] = otpauth 

78 return entry 

79 

80 @staticmethod 

81 def _getotpauth(properties): 

82 # KeeWeb style 

83 if 'otp' in properties: 

84 return properties['otp'] 

85 

86 issuer = 'Imported' 

87 # KeePass 2.47 {TIMEOTP} style 

88 if 'TimeOtp-Secret-Base32' in properties: 

89 seed = properties['TimeOtp-Secret-Base32'] 

90 digits = '6' 

91 

92 # KeeTrayTOTP style 

93 elif 'TOTP Seed' in properties: 

94 seed = properties['TOTP Seed'] 

95 # Special-case Steam 

96 if 'TOTP Settings' in properties \ 

97 and properties['TOTP Settings'] == '30;S': 

98 digits = 's' # Android Password Store needs digits==s 

99 issuer = 'Steam' # pass-otp, via Pass::Otp, needs issuer=Steam 

100 else: 

101 # TODO: parse non-'30;6' settings 

102 digits = '6' 

103 else: 

104 return None 

105 

106 # Many sites print the secret with spaces 

107 seed = seed.replace(' ', '') 

108 

109 return ('otpauth://totp/totp-secret?' 

110 f'secret={seed}&issuer={issuer}&digits={digits}&period=30') 

111 

112 def _subref(self, value): 

113 while True: 

114 match = self.reference.search(value) 

115 if match is None: 

116 break 

117 cat, attid = match.group(1, 2) 

118 if cat not in ('U', 'P'): 

119 break 

120 start, end = match.start(0), match.end(0) 

121 kpentry = self.keepass.find_entries( 

122 uuid=uuid.UUID(attid), first=True) 

123 if kpentry is None: 

124 value = value[:start] + value[end:] 

125 else: 

126 attr = 'password' if cat == 'P' else 'username' 

127 if hasattr(kpentry, attr): 

128 attr = getattr(kpentry, attr) 

129 value = value[:start] + \ 

130 (attr if attr is not None else '') + value[end:] 

131 else: 

132 value = value[:start] + value[end:] 

133 return value 

134 

135 def parse(self): 

136 """Parse Keepass KDBX3 and KDBX4 files.""" 

137 for kpentry in self.keepass.entries: 

138 if self.root not in os.sep.join(filter(None, kpentry.path)): 

139 continue 

140 entry = self._getentry(kpentry) 

141 entry['group'] = os.path.dirname(entry.get('group', '')) 

142 

143 if kpentry.history: 

144 for hentry in kpentry.history: 

145 history = self._getentry(hentry) 

146 history['group'] = os.path.join('History', entry['group']) 

147 self.data.append(history) 

148 

149 for att in kpentry.attachments: 

150 attachment = {} 

151 attachment['group'] = entry['group'] 

152 attachment['title'] = att.filename 

153 attachment['data'] = att.data 

154 self.data.append(attachment) 

155 if entry.get('attachments', None): 

156 entry['attachments'] += f", {att.filename}" 

157 else: 

158 entry['attachments'] = att.filename 

159 self.data.append(entry) 

160 

161 # Export methods 

162 

163 def insert(self, entry): 

164 """Insert a password entry into KDBX encrypted vault file.""" 

165 ignore = {'password', 'path', 'title', 'group', 'data'} 

166 path = os.path.join(self.root, entry.get('path')) 

167 title = os.path.basename(path) 

168 group = os.path.dirname(path) 

169 

170 root_group = self.keepass.root_group 

171 kpgroup = self.keepass.find_groups( 

172 path=os.path.split(group)) if group else root_group 

173 if not kpgroup: 

174 for grp in os.path.split(group): 

175 # os.path.split creates an empty segment when there is nothing 

176 # to split, just ignore it 

177 if grp == '': 

178 continue 

179 kpgroup = self.keepass.find_groups( 

180 group=root_group, name=grp, first=True) 

181 if not kpgroup: 

182 kpgroup = self.keepass.add_group(root_group, grp) 

183 root_group = kpgroup 

184 

185 if not self.force: 

186 pkentry = self.keepass.find_entries(title=title, group=kpgroup, 

187 recursive=False, first=True) 

188 if pkentry is not None: 

189 raise PMError(f"An entry already exists for {path}.") 

190 

191 kpentry = self.keepass.add_entry( 

192 destination_group=kpgroup, 

193 title=title, 

194 username=entry.pop('login', ''), 

195 password=entry.pop('password', ''), 

196 url=entry.pop('url', None), 

197 notes=entry.pop('comments', None), 

198 tags=entry.pop('tags', None), 

199 expiry_time=entry.pop('expiry_time', None), 

200 icon=entry.pop('icon', None), 

201 force_creation=True) 

202 

203 for key, value in entry.items(): 

204 if key in ignore: 

205 continue 

206 kpentry.set_custom_property(key, str(value)) 

207 

208 if 'data' in entry: 

209 attid = self.keepass.add_binary(entry['data']) 

210 kpentry.add_attachment(attid, title) 

211 

212 # Context manager methods 

213 

214 def open(self): 

215 """Open the keepass repository.""" 

216 if not PYKEEPASS: 

217 raise ImportError(name='pykeepass') 

218 

219 try: 

220 self.keepass = PyKeePass(self.prefix, 

221 password=getpassword(self.prefix), 

222 keyfile=self.keyfile) 

223 except (CredentialsError, PayloadChecksumError, 

224 HeaderChecksumError) as error: # pragma: no cover 

225 raise PMError(error) from error 

226 

227 def close(self): 

228 """Close the keepass repository.""" 

229 self.keepass.save() 

230 

231 # Format recognition methods 

232 

233 def detecter_open(self): 

234 """Enter the tryformat context manager.""" 

235 self.file = open(self.prefix, 'rb') 

236 

237 def detecter_close(self): 

238 """Leave the tryformat context manager.""" 

239 self.file.close() 

240 

241 def is_format(self): 

242 """Return True if the file is a KDBX file.""" 

243 sign = self.file.read(4) 

244 if sign != self.magic: 

245 return False 

246 return True 

247 

248 def checkheader(self, header, only=False) -> bool: 

249 """No header check.""" 

250 return True 

251 

252 @classmethod 

253 def header(cls): 

254 """No header for KDBX file.""" 

255 return '' 

256 

257 

258register_detecters(KDBX)