Coverage for pass_import/managers/passwordstore.py: 99%
158 statements
« prev ^ index » next coverage.py v7.4.3, created at 2024-02-26 12:11 +0000
« prev ^ index » next coverage.py v7.4.3, created at 2024-02-26 12:11 +0000
1# -*- encoding: utf-8 -*-
2# pass import - Passwords importer swiss army knife
3# Copyright (C) 2017-2024 Alexandre PUJOL <alexandre@pujol.io>.
4#
6import os
7import shutil
8from pathlib import Path
10from pass_import.core import Cap, register_detecters, register_managers
11from pass_import.detecter import Formatter
12from pass_import.errors import FormatError, PMError
13from pass_import.formats.cli import CLI
16class PasswordStore(CLI, Formatter):
17 """Importer & Exporter for password-store.
19 If ``prefix`` is not specified in the constructor, the environment variable
20 ``PASSWORD_STORE_DIR`` is required. The constructor will raise an exception
21 if it is not present.
23 This class supports all the environment variables supported by ''pass'',
24 including ``GNUPGHOME``.
26 :param dict env: Environment variables used by ``pass``.
28 """
29 cap = Cap.FORMAT | Cap.IMPORT | Cap.EXPORT
30 name = 'pass'
31 format = 'pass'
32 command = 'pass'
33 url = 'https://passwordstore.org'
34 himport = 'pass import pass path/to/store'
36 def __init__(self, prefix=None, settings=None):
37 self._gpgbinary = shutil.which('gpg2') or shutil.which('gpg')
38 super().__init__(prefix, settings)
39 self._setenv('PASSWORD_STORE_DIR')
40 self._setenv('PASSWORD_STORE_KEY')
41 self._setenv('PASSWORD_STORE_GIT', 'GIT_DIR')
42 self._setenv('PASSWORD_STORE_GPG_OPTS')
43 self._setenv('PASSWORD_STORE_X_SELECTION', 'X_SELECTION')
44 self._setenv('PASSWORD_STORE_CLIP_TIME', 'CLIP_TIME')
45 self._setenv('PASSWORD_STORE_UMASK')
46 self._setenv('PASSWORD_STORE_GENERATED_LENGTH', 'GENERATED_LENGTH')
47 self._setenv('PASSWORD_STORE_CHARACTER_SET', 'CHARACTER_SET')
48 self._setenv('PASSWORD_STORE_CHARACTER_SET_NO_SYMBOLS',
49 'CHARACTER_SET_NO_SYMBOLS')
50 self._setenv('PASSWORD_STORE_ENABLE_EXTENSIONS')
51 self._setenv('PASSWORD_STORE_EXTENSIONS_DIR', 'EXTENSIONS')
52 self._setenv('PASSWORD_STORE_SIGNING_KEY')
53 self._setenv('GNUPGHOME')
55 if prefix:
56 self.prefix = prefix
57 if 'PASSWORD_STORE_DIR' not in self.env or self.prefix is None:
58 raise PMError(f"{self.name} prefix unknown")
60 @property
61 def prefix(self):
62 """Get password store prefix from PASSWORD_STORE_DIR."""
63 return self.env['PASSWORD_STORE_DIR']
65 @prefix.setter
66 def prefix(self, value):
67 self.env['PASSWORD_STORE_DIR'] = value
69 # Import methods
71 def list(self, path=''):
72 """List the paths in the password store repository.
74 :param str path: Root path to the password repository to list.
75 :return list: Return the list of paths in a store.
77 """
78 prefix = Path(self.prefix) / path
79 if Path(str(prefix) + '.gpg').is_file():
80 paths = [path]
81 else:
82 paths = []
83 hiddens = []
84 for ppath in prefix.rglob('.*'):
85 if ppath.is_dir():
86 hiddens.extend(list(ppath.rglob('*.gpg')))
87 else:
88 hiddens.append(ppath)
89 for ppath in prefix.rglob('*.gpg'):
90 if ppath in hiddens:
91 continue
92 passname = ppath.relative_to(self.prefix).with_suffix('')
93 paths.append(str(passname))
94 paths.sort()
95 return paths
97 def show(self, path):
98 """Decrypt path and read the credentials in the password file.
100 :param str path: Path to the password entry to decrypt.
101 :return dict: Return a dictionary with of the password entry.
102 :raise PMError: If path not in the store.
103 """
104 entry = {}
105 entry['group'] = os.path.dirname(path)
106 entry['title'] = os.path.basename(path)
107 try:
108 data = self._command(['show', path]).split('\n')
109 except UnicodeDecodeError:
110 entry['data'] = self._command(['show', path], nline=False)
111 return entry
113 data.pop()
114 if data:
115 line = data.pop(0)
116 if ': ' in line:
117 (key, value) = line.split(': ', 1)
118 entry[key] = value
119 else:
120 entry['password'] = line
121 for line in data:
122 if ': ' in line:
123 (key, value) = line.split(': ', 1)
124 entry[key] = value
125 elif line.startswith('otpauth://'):
126 entry['otpauth'] = line
127 elif 'comments' in entry:
128 entry['comments'] += '\n' + line
129 return entry
131 def parse(self):
132 """Parse a password-store repository."""
133 paths = self.list()
134 if not paths:
135 raise FormatError('empty password store.')
136 for path in paths:
137 if self.root not in path:
138 continue
139 try:
140 entry = self.show(path)
141 except PMError as error: # pragma: no cover
142 raise FormatError(error) from error
143 self.data.append(entry)
145 # Export methods
147 def insert(self, entry):
148 """Insert a password entry into the password repository.
150 :param dict entry: The password entry to insert.
151 :raises PMError: If the entry already exists or in case
152 of a password manager error.
154 The entry is converted into the following format:
156 .. code-block:: console
158 <password>
159 <key>: <value>
161 If ``PasswordManager.all`` is true, all the entry values are printed.
162 Otherwise, only the key present in ``PasswordManager.keyslist`` are
163 printed following the order from this list. The title, path, and group
164 keys are ignored.
166 If ``PasswordManager.force`` is true, it will overwrite previous entry.
168 If the 'data' key is present, the entry is considered as a binary
169 attachment and return the binary data.
171 """
172 path = os.path.join(self.root, entry.get('path'))
173 if not self.force:
174 if os.path.isfile(os.path.join(self.prefix, path + '.gpg')):
175 raise PMError(f"An entry already exists for {path}.")
177 if 'data' in entry:
178 data = entry['data']
179 else:
180 seen = {'password', 'path', 'title', 'group'}
181 data = entry.get('password', '') + '\n'
182 for key in self.keyslist:
183 if key in seen:
184 continue
185 if key in entry:
186 if 'otpauth' in key:
187 data += f"{entry.get(key)}\n"
188 else:
189 data += f"{key}: {entry.get(key)}\n"
190 seen.add(key)
192 if self.all:
193 for key, value in entry.items():
194 if key in seen:
195 continue
196 data += f"{key}: {value}\n"
198 arg = ['insert', '--multiline', '--force', '--', path]
199 return self._command(arg, data)
201 # Context manager methods
203 def exist(self):
204 """Check if the password store is initialized.
206 :return bool:
207 ``True`` if the password store is initialized, ``False`` otherwise.
208 """
209 return os.path.isfile(os.path.join(self.prefix, '.gpg-id'))
211 # pylint: disable=arguments-differ
212 def isvalid(self):
213 """Ensure the GPG keyring is usable.
215 This function ensures that:
217 - All the public gpgids are present in the keyring.
218 - All the public gpgids are trusted enough.
219 - At least one private key is present in the keyring.
221 :return bool:
222 ``True`` or ``False`` either or not the GPG keyring is usable.
223 """
224 trusted = ['m', 'f', 'u', 'w', 's']
225 with open(os.path.join(self.prefix, '.gpg-id'), 'r') as file:
226 gpgids = file.read().split('\n')
227 if gpgids[len(gpgids) - 1] == '':
228 gpgids.pop()
230 cmd = [
231 self._gpgbinary,
232 '--with-colons',
233 '--batch',
234 '--list-keys',
235 '--',
236 ]
237 for gpgid in gpgids:
238 res, out, _ = self._call(cmd + [gpgid])
239 if res:
240 return False
241 for line in out.split('\n'):
242 record = line.split(':')
243 if record[0] == 'pub':
244 trust = record[1]
245 if trust not in trusted:
246 return False
248 cmd = [
249 self._gpgbinary,
250 '--with-colons',
251 '--batch',
252 '--list-secret-keys',
253 '--',
254 ]
255 for gpgid in gpgids:
256 res, _, _ = self._call(cmd + [gpgid])
257 if res == 0:
258 return True
259 return False
261 def open(self):
262 """Ensure prefix is a path to a password repository."""
263 if not os.path.isdir(self.prefix):
264 raise PMError(f"{self.prefix} is not a password repository.")
266 def close(self):
267 """There is no file to close."""
269 # Format recognition methods
271 def is_format(self):
272 """Ensure the prefix is a directory than contain a .gpg-id file."""
273 if os.path.isdir(self.prefix):
274 path = os.path.join(self.prefix, '.gpg-id')
275 if os.path.isfile(path):
276 return True
277 return False
279 def checkheader(self, header, only=False):
280 """No header check is needed."""
281 return True
283 @classmethod
284 def header(cls):
285 """No header for pass."""
286 return ''
289register_managers(PasswordStore)
290register_detecters(PasswordStore)