Coverage for pass_import/tools.py: 96%
136 statements
« prev ^ index » next coverage.py v7.4.3, created at 2024-02-26 12:11 +0000
« prev ^ index » next coverage.py v7.4.3, created at 2024-02-26 12:11 +0000
1# -*- encoding: utf-8 -*-
2# pass import - Passwords importer swiss army knife
3# Copyright (C) 2017-2024 Alexandre PUJOL <alexandre@pujol.io>.
4#
6import getpass
7import os
8import sys
9from typing import Tuple, Dict, Union
11try:
12 import magic
13 MAGIC = True
14except ImportError:
15 MAGIC = False
17import yaml
19from pass_import import clean
20from pass_import.core import Cap
23def getpassword(path, name='Password') -> str:
24 """Get the master password."""
25 return getpass.getpass(f"{name} for {path}: ")
28def get_magics(path) -> Tuple[str, str]:
29 """Get file format and encoding.
31 The magic library is not really good at detecting text file-based format
32 like CSV, JSON, YAML or, XML so we only use it to detect binary format and
33 the encoding.
35 Support both file-magic and python-magic as both are shipped under the same
36 name in various distributions.
38 """
39 if not MAGIC:
40 return None, None
42 with open(path, 'rb') as file:
43 header = file.read(2048)
45 if hasattr(magic, 'detect_from_content'): # file-magic
46 res = magic.detect_from_content(header)
47 mime_type = res.mime_type
48 magic_name = res.name
49 else: # python-magic
50 mime_type = magic.from_buffer(header, mime=True)
51 magic_name = magic.from_buffer(header)
53 mime_to_format = {
54 'application/pgp': 'gpg',
55 'application/x-sqlite3': 'sqlite3'
56 }
57 name_to_format = {'KDBX': 'kdbx', 'openssl': 'openssl', 'PGP': 'gpg'}
59 frmt = mime_to_format.get(mime_type, None)
60 for name, plain_format in name_to_format.items():
61 if name in magic_name:
62 frmt = plain_format
64 encoding = None
65 if 'UTF-8 Unicode (with BOM)' in magic_name:
66 encoding = 'utf-8-sig'
68 return frmt, encoding
71class Config(dict):
72 """Manage configuration, settings, and output messages.
74 **Order of precedence of the settings:**
76 1. Program options,
77 2. Config file,
78 3. Default values.
80 :param bool passwordstore: If ``True`` consider pass-import is run as
81 the password-store extension. Use :func:`~currate` to preset
82 password-store settings.
83 :param int verb:
84 Set the verbosity mode:
86 - ``0`` No verbose output,
87 - ``1`` Default verbose, enable :func:`~verbose`,
88 - ``2`` Enable :func:`~show`,
89 - ``3`` Enable :func:`~debug`.
90 :param bool quiet: If ``True`` suppress all non-error messages. Takes
91 precedence over ``verbose``.
93 """
94 # Normal colors
95 green = '\033[32m'
96 yellow = '\033[33m'
97 magenta = '\033[35m'
98 end = '\033[0m'
100 # Bold colors
101 RED = '\033[1m\033[91m'
102 GREEN = '\033[1m\033[92m'
103 YELLOW = '\033[1m\033[93m'
104 MAGENTA = '\033[1m\033[95m'
105 BOLD = '\033[1m'
107 def __init__(self):
108 defaults = {'delimiter': ',', 'decrypted': False}
109 super().__init__(defaults)
110 self.verb = 0
111 self.quiet = False
113 self.passwordstore = bool(
114 os.environ.get('_PASSWORD_STORE_EXTENSION', '') == 'import')
116 def verbosity(self, verbose=0, quiet=False):
117 """Set program verbosity."""
118 self.verb = verbose
119 self.quiet = quiet
120 if self.quiet:
121 self.verb = 0
123 def readconfig(self, args: Dict[str, str]):
124 """Read and merge config from args, config file and default."""
125 configs = {}
126 if os.path.isfile(args.get('config', '')):
127 configpath = args['config']
128 elif self.passwordstore:
129 configpath = os.path.join(os.environ.get('PASSWORD_STORE_DIR', ''),
130 '.import')
131 else:
132 configpath = '.import'
134 if os.path.isfile(configpath):
135 with open(configpath, 'r') as file:
136 configs = yaml.safe_load(file)
138 filter_file = args.get('filter', None)
139 if filter_file is not None and os.path.isfile(filter_file):
140 with open(filter_file, mode='r') as f:
141 args['filter'] = f.read()
143 self.merge(configs)
144 self.merge(args)
145 self.setclean()
147 def setclean(self):
148 """Set the cleaning variables."""
149 cleaning = ['separator', 'cleans', 'protocols', 'invalids']
150 for key in cleaning:
151 if key in self:
152 setattr(clean, key.upper(), self[key])
154 if 'separator' in self:
155 clean.CLEANS[' '] = self['separator']
157 def currate(self):
158 """Generate curated config from pass-import and pimport arguments."""
159 self['exporter'] = self.pop('dst', '')
160 if self.passwordstore:
161 self['exporter'] = 'pass'
162 self['out'] = os.environ['PASSWORD_STORE_DIR']
163 self['list_importers'] = self.get('list', False)
164 self['list_exporters'] = False
166 def getsettings(self, root='', action=Cap.IMPORT
167 ) -> Dict[str, Union[str, bool]]:
168 """Return a curated setting dict for use in a manager class."""
169 settings = {'action': action, 'root': root}
170 keep = {
171 'all', 'force', 'delimiter', 'cols', '1password', 'lastpass',
172 'key', 'decrypted'
173 }
174 for key in self:
175 if key in keep:
176 settings[key] = self[key]
177 return settings
179 def merge(self, other):
180 """Update the dictionary only if the value is not null."""
181 for key, value in other.items():
182 if value is not None:
183 self[key] = value
185 def show(self, entry):
186 """Show a password entry."""
187 if self.verb >= 2:
188 ignore = {'data', 'password', 'title', 'group', 'path'}
189 path = os.path.join(self.get('droot', ''), entry.get(
190 'path', entry.get('title', '')))
191 self.verbose("Path", path)
192 res = entry.get('password', '') + '\n'
193 for key, value in entry.items():
194 if key in ignore:
195 continue
196 res += f"{key}: {value}\n"
197 self.verbose("Data", res.replace('\n', '\n '))
199 def verbose(self, title='', msg=''):
200 """Verbose method, takes title and msg. msg can be empty."""
201 if self.verb >= 1 and msg == '':
202 out = (f"{self.MAGENTA} . {self.end}{self.magenta}"
203 f"{title}{self.end}")
204 print(out, file=sys.stdout)
205 elif self.verb >= 1:
206 out = (f"{self.MAGENTA} . {self.end}{self.magenta}"
207 f"{title}: {self.end}{msg}")
208 print(out, file=sys.stdout)
210 def debug(self, title='', msg=''):
211 """Debug method."""
212 if self.verb >= 3:
213 self.verbose(title, msg)
215 def message(self, msg=''):
216 """Message method."""
217 if not self.quiet:
218 out = f"{self.BOLD} . {self.end}{msg}"
219 print(out, file=sys.stdout)
221 def echo(self, msg=''):
222 """Echo a message after a tab."""
223 if not self.quiet:
224 print(f"\t{msg}", file=sys.stdout)
226 def success(self, msg=''):
227 """Success method."""
228 if not self.quiet:
229 out = f"{self.GREEN} (*) {self.end}{self.green}{msg}{self.end}"
230 print(out, file=sys.stdout)
232 def warning(self, msg=''):
233 """Warning method."""
234 if not self.quiet:
235 out = f"{self.YELLOW} w {self.end}{self.yellow}{msg}{self.end}"
236 print(out, file=sys.stdout)
238 def error(self, msg=''):
239 """Error method."""
240 err = f"{self.RED} [x] {self.end}{self.BOLD}Error: {self.end}{msg}"
241 print(err, file=sys.stderr)
243 def die(self, msg=''):
244 """Show an error and exit the program."""
245 self.error(msg)
246 sys.exit(1)