Coverage for pass_import/formats/kdbx.py: 90%
154 statements
« prev ^ index » next coverage.py v7.4.3, created at 2024-02-26 12:11 +0000
« prev ^ index » next coverage.py v7.4.3, created at 2024-02-26 12:11 +0000
1# -*- encoding: utf-8 -*-
2# pass import - Passwords importer swiss army knife
3# Copyright (C) 2017-2024 Alexandre PUJOL <alexandre@pujol.io>.
4#
6import os
7import re
8import uuid
10try:
11 from pykeepass import PyKeePass
12 from pykeepass.exceptions import (CredentialsError, HeaderChecksumError,
13 PayloadChecksumError)
14 PYKEEPASS = True
15except ImportError:
16 PYKEEPASS = False
18from pass_import.core import Cap, register_detecters
19from pass_import.detecter import Formatter
20from pass_import.errors import PMError
21from pass_import.manager import PasswordExporter, PasswordImporter
22from pass_import.tools import getpassword
25class KDBX(Formatter, PasswordImporter, PasswordExporter):
26 """Base class for KDBX based importer & exporter.
28 The importer supports binary attachments. It requires PyKeePass to run.
30 :param PyKeePass keepass: The keepass repository to work on.
31 :param list attributes: List of the attributes of PyKeePass to import.
33 """
34 cap = Cap.FORMAT | Cap.IMPORT | Cap.EXPORT
35 name = 'keepass'
36 format = 'kdbx'
37 magic = b'\x03\xd9\xa2\x9a'
38 keys = {'login': 'username', 'comments': 'notes', 'group': 'path'}
39 attributes = {
40 'title', 'username', 'password', 'url', 'notes', 'icon', 'tags',
41 'autotype_enabled', 'autotype_sequence', 'is_a_history_entry'
42 }
43 reference = re.compile(r'\{REF:([A-Z])@I:([0-9A-F]{32})\}')
45 def __init__(self, prefix=None, settings=None):
46 self.keepass = None
47 settings = {} if settings is None else settings
48 keyfile = settings.get('key', '')
49 self.keyfile = None if keyfile == '' else keyfile
50 super().__init__(prefix, settings)
52 # Import methods
54 def _getentry(self, kpentry):
55 entry = {}
56 if kpentry.path is not None:
57 entry['group'] = ''
58 for item in kpentry.path:
59 if item is not None:
60 entry['group'] = os.path.join(entry['group'], item)
61 keys = self.invkeys()
62 for attr in self.attributes:
63 if hasattr(kpentry, attr):
64 value = getattr(kpentry, attr)
65 if isinstance(value, str):
66 value = self._subref(value)
67 entry[keys.get(attr, attr)] = value
68 for key, value in kpentry.custom_properties.items():
69 if isinstance(value, str):
70 value = self._subref(value)
71 entry[key] = value
72 if kpentry.otp is not None:
73 otpauth = kpentry.otp
74 else:
75 otpauth = self._getotpauth(kpentry.custom_properties)
76 if otpauth:
77 entry['otpauth'] = otpauth
78 return entry
80 @staticmethod
81 def _getotpauth(properties):
82 # KeeWeb style
83 if 'otp' in properties:
84 return properties['otp']
86 issuer = 'Imported'
87 # KeePass 2.47 {TIMEOTP} style
88 if 'TimeOtp-Secret-Base32' in properties:
89 seed = properties['TimeOtp-Secret-Base32']
90 digits = '6'
92 # KeeTrayTOTP style
93 elif 'TOTP Seed' in properties:
94 seed = properties['TOTP Seed']
95 # Special-case Steam
96 if 'TOTP Settings' in properties \
97 and properties['TOTP Settings'] == '30;S':
98 digits = 's' # Android Password Store needs digits==s
99 issuer = 'Steam' # pass-otp, via Pass::Otp, needs issuer=Steam
100 else:
101 # TODO: parse non-'30;6' settings
102 digits = '6'
103 else:
104 return None
106 # Many sites print the secret with spaces
107 seed = seed.replace(' ', '')
109 return ('otpauth://totp/totp-secret?'
110 f'secret={seed}&issuer={issuer}&digits={digits}&period=30')
112 def _subref(self, value):
113 while True:
114 match = self.reference.search(value)
115 if match is None:
116 break
117 cat, attid = match.group(1, 2)
118 if cat not in ('U', 'P'):
119 break
120 start, end = match.start(0), match.end(0)
121 kpentry = self.keepass.find_entries(
122 uuid=uuid.UUID(attid), first=True)
123 if kpentry is None:
124 value = value[:start] + value[end:]
125 else:
126 attr = 'password' if cat == 'P' else 'username'
127 if hasattr(kpentry, attr):
128 attr = getattr(kpentry, attr)
129 value = value[:start] + \
130 (attr if attr is not None else '') + value[end:]
131 else:
132 value = value[:start] + value[end:]
133 return value
135 def parse(self):
136 """Parse Keepass KDBX3 and KDBX4 files."""
137 for kpentry in self.keepass.entries:
138 if self.root not in os.sep.join(filter(None, kpentry.path)):
139 continue
140 entry = self._getentry(kpentry)
141 entry['group'] = os.path.dirname(entry.get('group', ''))
143 if kpentry.history:
144 for hentry in kpentry.history:
145 history = self._getentry(hentry)
146 history['group'] = os.path.join('History', entry['group'])
147 self.data.append(history)
149 for att in kpentry.attachments:
150 attachment = {}
151 attachment['group'] = entry['group']
152 attachment['title'] = att.filename
153 attachment['data'] = att.data
154 self.data.append(attachment)
155 if entry.get('attachments', None):
156 entry['attachments'] += f", {att.filename}"
157 else:
158 entry['attachments'] = att.filename
159 self.data.append(entry)
161 # Export methods
163 def insert(self, entry):
164 """Insert a password entry into KDBX encrypted vault file."""
165 ignore = {'password', 'path', 'title', 'group', 'data'}
166 path = os.path.join(self.root, entry.get('path'))
167 title = os.path.basename(path)
168 group = os.path.dirname(path)
170 root_group = self.keepass.root_group
171 kpgroup = self.keepass.find_groups(
172 path=os.path.split(group)) if group else root_group
173 if not kpgroup:
174 for grp in os.path.split(group):
175 # os.path.split creates an empty segment when there is nothing
176 # to split, just ignore it
177 if grp == '':
178 continue
179 kpgroup = self.keepass.find_groups(
180 group=root_group, name=grp, first=True)
181 if not kpgroup:
182 kpgroup = self.keepass.add_group(root_group, grp)
183 root_group = kpgroup
185 if not self.force:
186 pkentry = self.keepass.find_entries(title=title, group=kpgroup,
187 recursive=False, first=True)
188 if pkentry is not None:
189 raise PMError(f"An entry already exists for {path}.")
191 kpentry = self.keepass.add_entry(
192 destination_group=kpgroup,
193 title=title,
194 username=entry.pop('login', ''),
195 password=entry.pop('password', ''),
196 url=entry.pop('url', None),
197 notes=entry.pop('comments', None),
198 tags=entry.pop('tags', None),
199 expiry_time=entry.pop('expiry_time', None),
200 icon=entry.pop('icon', None),
201 force_creation=True)
203 for key, value in entry.items():
204 if key in ignore:
205 continue
206 kpentry.set_custom_property(key, str(value))
208 if 'data' in entry:
209 attid = self.keepass.add_binary(entry['data'])
210 kpentry.add_attachment(attid, title)
212 # Context manager methods
214 def open(self):
215 """Open the keepass repository."""
216 if not PYKEEPASS:
217 raise ImportError(name='pykeepass')
219 try:
220 self.keepass = PyKeePass(self.prefix,
221 password=getpassword(self.prefix),
222 keyfile=self.keyfile)
223 except (CredentialsError, PayloadChecksumError,
224 HeaderChecksumError) as error: # pragma: no cover
225 raise PMError(error) from error
227 def close(self):
228 """Close the keepass repository."""
229 self.keepass.save()
231 # Format recognition methods
233 def detecter_open(self):
234 """Enter the tryformat context manager."""
235 self.file = open(self.prefix, 'rb')
237 def detecter_close(self):
238 """Leave the tryformat context manager."""
239 self.file.close()
241 def is_format(self):
242 """Return True if the file is a KDBX file."""
243 sign = self.file.read(4)
244 if sign != self.magic:
245 return False
246 return True
248 def checkheader(self, header, only=False) -> bool:
249 """No header check."""
250 return True
252 @classmethod
253 def header(cls):
254 """No header for KDBX file."""
255 return ''
258register_detecters(KDBX)