Coverage for pass_import/__main__.py: 94%
299 statements
« prev ^ index » next coverage.py v7.4.3, created at 2024-02-26 12:11 +0000
« prev ^ index » next coverage.py v7.4.3, created at 2024-02-26 12:11 +0000
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*
3# pass import - Passwords importer swiss army knife
4# Copyright (C) 2017-2024 Alexandre PUJOL <alexandre@pujol.io>.
5# SPDX-License-Identifier: GPL-3.0-or-later
7import io
8import os
9import sys
10import traceback
11from argparse import ArgumentParser, RawDescriptionHelpFormatter
13from pass_import import Detecters, Managers, __version__
14from pass_import.auto import AutoDetect
15from pass_import.core import Cap
16from pass_import.errors import FormatError, PMError
17from pass_import.tools import Config, get_magics
19MANAGERS = Managers()
21try:
22 import jsonpath_ng.ext
23 from jsonpath_ng.exceptions import JsonPathLexerError, JsonPathParserError
24 JSONNG = True
25except ImportError:
26 JSONNG = False
29class ArgParser(ArgumentParser):
30 """Manages argument parsing and adds some defaults."""
32 def __init__(self, passwordstore=False):
33 self.passwordstore = passwordstore
34 if self.passwordstore:
35 prog = 'pass import'
36 description = """
37 Import data from most of the password manager. Passwords are imported into
38 the existing default password store; therefore, the password store must have
39 been initialised before with 'pass init'."""
40 else:
41 prog = 'pimport'
42 description = """
43 Import data from most of the password manager. Passwords are imported into
44 an existing password repository; therefore, the password repository must have
45 been initialised before."""
47 super().__init__(
48 prog=prog,
49 description=description,
50 formatter_class=RawDescriptionHelpFormatter,
51 epilog="More information may be found in the "
52 f"{prog.replace(' ', '-')}(1) man page.",
53 add_help=False)
54 self.add_arguments()
56 def add_arguments(self):
57 """Set arguments for `pass import` or `pimport`."""
58 pmarg = self.add_argument_group(title='Password managers')
59 if not self.passwordstore:
60 pmarg.add_argument(
61 'dst', type=str, nargs='?', default='',
62 help=("Destination password manager, can be: "
63 f"{', '.join(MANAGERS.names(Cap.EXPORT))}."))
65 pmarg.add_argument(
66 'src', type=str, nargs='*', default=[],
67 help='Path to the data to import. Can also be the password manager'
68 ' name followed by the path to the data to import. The passw'
69 f"ord manager name can be: {', '.join(MANAGERS.names())}.")
71 if not self.passwordstore:
72 pmarg.add_argument(
73 '-o', '--out', action='store', default='',
74 help='Where the destination password manager lives. '
75 'Can be a file, a directory or even a login depending '
76 'of the manager.')
78 # Common options
79 common = self.add_argument_group(title='Common optional arguments')
80 common.add_argument(
81 '-r', '--root', action='store', dest='sroot',
82 default='', metavar='path',
83 help='Only import the password from a specific subfolder.')
84 common.add_argument(
85 '-p', '--path', action='store', dest='droot',
86 default='', metavar='path',
87 help='Import the passwords to a specific subfolder.')
88 common.add_argument('-k', '--key', action='store', default='',
89 help='Path to a keyfile if required by a manager.')
90 common.add_argument('-a', '--all', action='store_true',
91 help='Also import all the extra data present.')
92 common.add_argument('-f', '--force', action='store_true',
93 help='Overwrite existing passwords.')
94 common.add_argument('-c', '--clean', action='store_true',
95 help='Make the paths more command line friendly.')
96 common.add_argument(
97 '-C', '--convert', action='store_true',
98 help='Convert invalid characters present in the paths.')
99 common.add_argument(
100 '-P', '--pwned', action='store_true',
101 help='Check imported passwords against haveibeenpwned.com.')
102 common.add_argument(
103 '-d', '--dry-run', action='store_true',
104 help='Do not import passwords, only show what would be imported.')
106 # Extra options
107 extra = self.add_argument_group(title='Extra optional arguments')
108 extra.add_argument(
109 '--sep', dest='separator', metavar='CHAR', default='-',
110 help="Provide a characters of replacement for the path separator. "
111 "Default: '-'")
112 extra.add_argument(
113 '--del', dest='delimiter', metavar='CHAR', default=',',
114 help="Provide an alternative CSV delimiter character. "
115 "Default: ','")
116 extra.add_argument(
117 '--cols', action='store', default='',
118 help='CSV expected columns to map columns to credential attributes'
119 '. Only used by the csv importer.')
120 extra.add_argument(
121 '--filter', dest='filter', metavar='FILTER', default=None,
122 help="""Export whole entries matching a JSONPath filter expression.
123 Default: (none)
124 This field can be:
125 - a string JSONPath expression
126 - an absolute path to a file containing a JSONPath filter
127 expression.
128 List of supported filter:
129 https://github.com/h2non/jsonpath-ng
130 Example:
131 - '$.entries[*].tags[?@="Defaults"]' : Export only entries
132 with a tag matching 'Defaults'""")
133 extra.add_argument('--config', action='store', default='',
134 help="Set a config file. Default: '.import'")
136 # Managers list
137 usage = self.add_argument_group(
138 title='Help related optional arguments')
139 if self.passwordstore:
140 usage.add_argument('-l', '--list', action='store_true',
141 help='List the supported password managers.')
142 else:
143 usage.add_argument('-l', '--list-importers', action='store_true',
144 help='List the supported password importers.')
145 usage.add_argument('-e', '--list-exporters', action='store_true',
146 help='List the supported password exporters.')
148 # Help and version
149 usage.add_argument('-h', '--help', action='store_true',
150 help='Show this help message and exit.')
151 usage.add_argument('-V', '--version', action='version',
152 version='%(prog)s ' + __version__,
153 help='Show the program version and exit.')
154 group = usage.add_mutually_exclusive_group()
155 group.add_argument('-v', '--verbose', action='count', default=0,
156 help='Set verbosity level, '
157 'can be used more than once.')
158 group.add_argument('-q', '--quiet', action='store_true',
159 help='Be quiet.')
161 def parse_args(self, args=None, namespace=None):
162 """Parse pass-import arguments & print help."""
163 if args is None:
164 sys.argv.pop(0)
165 args = sys.argv
167 arg = vars(super().parse_args(args, namespace))
168 arg['prog'] = self.prog
169 if arg['help']:
170 name = ''
171 if self.passwordstore:
172 if arg['src']:
173 name = arg['src'][0]
174 else:
175 name = arg['dst']
177 if name in MANAGERS.names():
178 self.print_help_manager(name)
179 else:
180 self.print_help()
181 sys.exit(0)
182 return arg
184 def print_help_manager(self, name):
185 """Print manager usage."""
186 print(f'Usage: {self.prog} {name} [options]\n')
187 for pm in MANAGERS.matrix().get(name):
188 print(f'{pm.description()}:')
189 usage = pm.usage()
190 if usage:
191 print(usage)
192 if pm.format != '':
193 print(f' Format: {pm.format}')
194 if pm.version != '':
195 print(f' Version: {pm.version}')
196 if pm.url != '':
197 print(f" Url: {pm.url}")
198 if pm.hexport != '':
199 print(f' Export: {pm.hexport}')
200 if pm.himport != '':
201 print(f' Import: {pm.himport}')
202 if pm.default:
203 print(f' This is the default importer for {name}.')
204 if pm.cap is Cap.IMPORT | Cap.EXPORT:
205 print(' Can be used for password import and export.')
206 print()
209def setup():
210 """Read progam arguments, configuration & sanity checks."""
211 conf = Config()
212 parser = ArgParser(conf.passwordstore)
213 arg = parser.parse_args()
214 conf.verbosity(arg['verbose'], arg['quiet'])
215 try:
216 conf.readconfig(arg)
217 except AttributeError as error:
218 conf.verbose(error)
219 conf.die("configuration file not valid.")
220 conf.currate()
222 if conf['list_importers'] or conf['list_exporters']:
223 listmanagers(conf)
225 if conf['exporter'] == '':
226 conf.die("destination password manager not present.")
228 if conf['exporter'] not in MANAGERS.names(Cap.EXPORT):
229 conf.die(f"{conf['exporter']} is not a supported "
230 "destination password manager.")
232 if not conf['src']:
233 conf.die("The source password manager or the path to import is empty.")
235 return conf
238def listmanagers(conf):
239 """List the supported password managers."""
240 cap = Cap.IMPORT if conf['list_importers'] is True else Cap.EXPORT
241 if conf.quiet:
242 print('\n'.join(MANAGERS.names(cap)))
243 sys.exit(0)
245 if cap is Cap.EXPORT:
246 msg = (f"The {len(MANAGERS.names(cap))} supported exporter "
247 "password managers are:")
248 else:
249 msg = f"The {len(MANAGERS)} supported password managers are:"
250 conf.success(msg)
252 max_res = ''
253 listing = {}
254 matrix = MANAGERS.matrix(cap)
255 for name in matrix:
256 frmts = []
257 for pm in matrix[name]:
258 res = pm.format
259 if pm.version:
260 res += f' (v{pm.version})'
261 max_res = max(max_res, res)
262 frmts.append(res)
263 listing[name] = frmts
265 padding1 = len(max(MANAGERS.names(cap), key=len)) + 1
266 if conf.verb:
267 padding2 = len(max_res) + 1
268 for name in sorted(matrix):
269 for pm, frmt in zip(matrix[name], listing[name]):
270 conf.message(conf.BOLD + name.ljust(padding1) + conf.end +
271 frmt.ljust(padding2) + pm.__name__)
272 else:
273 tmp = [', '.join(frmts) for frmts in listing.values()]
274 padding2 = len(max(tmp, key=len)) + 1
275 for name in sorted(listing):
276 conf.message(conf.BOLD + name.ljust(padding1) + conf.end +
277 ', '.join(listing[name]).ljust(padding2) +
278 matrix[name][0].url)
279 sys.exit(0)
282def decryptsource(conf):
283 """Decrypt source file if required."""
284 path = conf['src'][1] if len(conf['src']) >= 2 else conf['src'][0]
285 if os.path.isfile(path):
286 decrypters = Detecters(Cap.DECRYPT)
287 frmt, encoding = get_magics(path)
288 if encoding:
289 conf['encoding'] = encoding
290 if frmt in decrypters:
291 with decrypters[frmt](path) as file:
292 conf['plaintext'] = file.decrypt()
293 conf['decrypted'] = True
294 conf.verbose(f"Source file decrypted using {frmt}.")
297def detectmanager(conf):
298 """Detect file format and password manager."""
299 prefix = ''
300 if len(conf['src']) == 1:
301 name = conf['src'][0]
302 if name in MANAGERS.names():
303 conf.verbose("Using default manager.")
304 detect = AutoDetect(name)
305 pm = detect.default()
307 else:
308 conf.verbose("Trying to guess file format and manager name.")
309 prefix = to_detect = name
310 if conf['decrypted']:
311 to_detect = conf['plaintext']
313 detect = AutoDetect(settings=conf.getsettings())
314 pm = detect.manager(to_detect)
315 if pm is None:
316 conf.die("Unable to detect the manager. Please try with: "
317 f"{conf['prog']} <manager> {prefix}")
319 else:
320 name = conf['src'][0]
321 prefix = conf['src'][1]
322 if name in MANAGERS.names():
323 conf.verbose("Trying to guess file format.")
324 to_detect = prefix
325 if conf['decrypted']:
326 to_detect = conf['plaintext']
328 detect = AutoDetect(name, settings=conf.getsettings())
329 pm = detect.format(to_detect)
331 elif name in MANAGERS.clsnames():
332 pm = MANAGERS.get(name)
333 conf.verbose(f"Using import class: {pm.__name__}.")
335 else:
336 conf.die(f"{name} is not a supported source password manager.")
338 conf.verbose(f"Importer: {pm.name}, Format: {pm.format}, Version: "
339 f" {pm.version}")
341 if 'plaintext' in conf:
342 conf['in'] = io.StringIO(conf['plaintext'])
343 else:
344 conf['in'] = prefix
345 conf['importer'] = pm.name
346 return pm
349def zxcvbn_parse(details):
350 """Nicely print the results from zxcvbn."""
351 sequence = ''
352 for seq in details.get('sequence', []):
353 sequence += f"{seq['token']}({seq['pattern']}) "
354 res = f"Score {details['score']} ({details['guesses']} guesses). "
355 return res + f"This estimate is based on the sequence {sequence}"
358# pylint: disable=inconsistent-return-statements
359def pass_import(conf, cls_import):
360 """Import data."""
361 try:
362 settings = conf.getsettings(conf['sroot'])
363 with cls_import(conf['in'], settings=settings) as importer:
364 importer.parse()
365 if not importer.secure: # pragma: no cover
366 conf.warning(f"The password manager {conf['importer']} has "
367 "been flagged as unsecure, you should update all "
368 "your newly imported credentials.")
369 return importer.data
371 except (FormatError, AttributeError, ValueError, TypeError) as error:
372 conf.debug(traceback.format_exc())
373 conf.warning(error)
374 conf.die(
375 f"{conf['in']} is not a valid exported {conf['importer']} file.")
377 except ImportError as error:
378 conf.verbose(error)
379 err = (f"Importing {conf['importer']}, missing required dependency: "
380 f"{error.name}\n"
381 f"You can install it with:\n 'pip3 install {error.name}'")
382 if error.name not in ['pykeepass']:
383 err += f", or\n 'sudo apt-get install python3-{error.name}'"
384 conf.die(err)
386 except (PermissionError, PMError) as error:
387 conf.debug(traceback.format_exc())
388 conf.die(error)
391def pass_export(conf, cls_export, data):
392 """Insert cleaned data into the password repository."""
393 paths_imported = []
394 paths_exported = []
395 try:
396 settings = conf.getsettings(conf['droot'], Cap.EXPORT)
397 with cls_export(conf['out'], settings=settings) as exporter:
398 exporter.data = data
399 exporter.clean(conf['clean'], conf['convert'])
400 report = exporter.audit(conf['pwned'])
401 for entry in exporter.data:
402 pmpath = os.path.join(conf['droot'], entry.get(
403 'path', entry.get('title', '')))
404 conf.show(entry)
405 exported = pass_filter(conf, entry)
406 try:
407 if exported:
408 if not conf['dry_run']:
409 exporter.insert(entry)
410 except PMError as error:
411 conf.debug(traceback.format_exc())
412 conf.warning(f"Impossible to insert {pmpath} into "
413 f"{conf['exporter']}: {error}")
414 else:
415 paths_imported.append(pmpath)
416 if exported:
417 paths_exported.append(pmpath)
418 except PMError as error:
419 conf.debug(traceback.format_exc())
420 conf.die(error)
422 return paths_imported, paths_exported, report
425def pass_filter(conf, entry):
426 """Filter entry based on a JSONPath filter expression."""
427 filter_expression = conf.get('filter', None)
428 if filter_expression is None:
429 return True
431 if not JSONNG:
432 message = ("--filter requires pass-import[filter] "
433 "or pass-import[all] to be installed")
434 raise ImportError('Missing packages. ' + message)
436 # Having end users write their JSONPath filter expression as if
437 # pass-import processes/filters entries in bulk, will allow end
438 # users filter expression to continue to work when/if bulk filter is
439 # supported. Hence 'entries' being added
440 data = {'entries': [entry]}
441 try:
442 expr = jsonpath_ng.ext.parse(filter_expression)
443 matches = expr.find(data)
444 return len(matches) > 0
445 except (JsonPathLexerError, JsonPathParserError) as e:
446 conf.warning("Entry not exported due to error "
447 "most likely in FILTER expression"
448 + e.args[0])
449 if conf.verb >= 2:
450 conf.verbose("- Filter expression:" + filter_expression)
451 conf.verbose("- Entry: " + data)
452 return False
455def report(conf, paths_imported, paths_exported, audit):
456 """Print final success report."""
457 if conf['dry_run']:
458 conf.warning(f"Data would be imported from {conf['importer']} "
459 f"to {conf['exporter']}")
460 else:
461 conf.success(f"Importing passwords from {conf['importer']} "
462 f"to {conf['exporter']}")
463 conf.message(f"Passwords imported from: {conf['in']}")
464 conf.message(f"Passwords exported to: {conf['out']}")
465 if conf['sroot'] != '':
466 conf.message(f"Root path: {conf['sroot']}")
467 if conf['droot'] != '':
468 conf.message(f"Root path: {conf['droot']}")
469 conf.message(f"Number of password imported: {len(paths_imported)}")
470 if conf['convert']:
471 conf.message("Forbidden chars converted")
472 conf.message(f"Path separator used: {conf['separator']}")
473 if conf['clean']:
474 conf.message("Imported data cleaned")
475 if conf['all']:
476 conf.message("All data imported")
477 for password, count in audit['breached']:
478 conf.warning(f"Password breached {count} time(s): {password}")
479 for password, details in audit['weak']:
480 conf.warning(f"Weak password detected: {password} might be weak."
481 f" {zxcvbn_parse(details)}")
482 for entry in audit['duplicated']:
483 conf.warning(f"Duplicated passwords detected: "
484 f"{', '.join([item['path'] for item in entry])}")
486 for paths, header in [
487 (paths_imported, "Passwords imported"),
488 (paths_exported, "Passwords exported")
489 ]:
490 if paths is not None:
491 conf.message(header + ": " + str(len(paths)))
492 paths.sort()
493 for path in paths:
494 conf.echo(path)
497def main():
498 """`pimport` and `pass import` common main."""
499 conf = setup()
500 decryptsource(conf)
502 # Password managers detection
503 cls_import = detectmanager(conf)
504 cls_export = MANAGERS.get(conf['exporter'], cap=Cap.EXPORT)
505 conf.verbose(f"Importing passwords from {cls_import.__name__} "
506 f"to {cls_export.__name__}")
507 conf.verbose("Checking for breached passwords",
508 "on haveibeenpwned.com" if conf['pwned'] else '')
510 # Import & export
511 data = pass_import(conf, cls_import)
512 paths_imported, paths_exported, audit = pass_export(conf, cls_export, data)
514 # Success!
515 report(conf, paths_imported, paths_exported, audit)
518if __name__ == "__main__":
519 main()