Coverage for pass_import/__main__.py: 94%

299 statements  

« prev     ^ index     » next       coverage.py v7.4.3, created at 2024-02-26 12:11 +0000

1#!/usr/bin/env python3 

2# -*- coding: utf-8 -* 

3# pass import - Passwords importer swiss army knife 

4# Copyright (C) 2017-2024 Alexandre PUJOL <alexandre@pujol.io>. 

5# SPDX-License-Identifier: GPL-3.0-or-later 

6 

7import io 

8import os 

9import sys 

10import traceback 

11from argparse import ArgumentParser, RawDescriptionHelpFormatter 

12 

13from pass_import import Detecters, Managers, __version__ 

14from pass_import.auto import AutoDetect 

15from pass_import.core import Cap 

16from pass_import.errors import FormatError, PMError 

17from pass_import.tools import Config, get_magics 

18 

19MANAGERS = Managers() 

20 

21try: 

22 import jsonpath_ng.ext 

23 from jsonpath_ng.exceptions import JsonPathLexerError, JsonPathParserError 

24 JSONNG = True 

25except ImportError: 

26 JSONNG = False 

27 

28 

29class ArgParser(ArgumentParser): 

30 """Manages argument parsing and adds some defaults.""" 

31 

32 def __init__(self, passwordstore=False): 

33 self.passwordstore = passwordstore 

34 if self.passwordstore: 

35 prog = 'pass import' 

36 description = """ 

37 Import data from most of the password manager. Passwords are imported into 

38 the existing default password store; therefore, the password store must have 

39 been initialised before with 'pass init'.""" 

40 else: 

41 prog = 'pimport' 

42 description = """ 

43 Import data from most of the password manager. Passwords are imported into 

44 an existing password repository; therefore, the password repository must have 

45 been initialised before.""" 

46 

47 super().__init__( 

48 prog=prog, 

49 description=description, 

50 formatter_class=RawDescriptionHelpFormatter, 

51 epilog="More information may be found in the " 

52 f"{prog.replace(' ', '-')}(1) man page.", 

53 add_help=False) 

54 self.add_arguments() 

55 

56 def add_arguments(self): 

57 """Set arguments for `pass import` or `pimport`.""" 

58 pmarg = self.add_argument_group(title='Password managers') 

59 if not self.passwordstore: 

60 pmarg.add_argument( 

61 'dst', type=str, nargs='?', default='', 

62 help=("Destination password manager, can be: " 

63 f"{', '.join(MANAGERS.names(Cap.EXPORT))}.")) 

64 

65 pmarg.add_argument( 

66 'src', type=str, nargs='*', default=[], 

67 help='Path to the data to import. Can also be the password manager' 

68 ' name followed by the path to the data to import. The passw' 

69 f"ord manager name can be: {', '.join(MANAGERS.names())}.") 

70 

71 if not self.passwordstore: 

72 pmarg.add_argument( 

73 '-o', '--out', action='store', default='', 

74 help='Where the destination password manager lives. ' 

75 'Can be a file, a directory or even a login depending ' 

76 'of the manager.') 

77 

78 # Common options 

79 common = self.add_argument_group(title='Common optional arguments') 

80 common.add_argument( 

81 '-r', '--root', action='store', dest='sroot', 

82 default='', metavar='path', 

83 help='Only import the password from a specific subfolder.') 

84 common.add_argument( 

85 '-p', '--path', action='store', dest='droot', 

86 default='', metavar='path', 

87 help='Import the passwords to a specific subfolder.') 

88 common.add_argument('-k', '--key', action='store', default='', 

89 help='Path to a keyfile if required by a manager.') 

90 common.add_argument('-a', '--all', action='store_true', 

91 help='Also import all the extra data present.') 

92 common.add_argument('-f', '--force', action='store_true', 

93 help='Overwrite existing passwords.') 

94 common.add_argument('-c', '--clean', action='store_true', 

95 help='Make the paths more command line friendly.') 

96 common.add_argument( 

97 '-C', '--convert', action='store_true', 

98 help='Convert invalid characters present in the paths.') 

99 common.add_argument( 

100 '-P', '--pwned', action='store_true', 

101 help='Check imported passwords against haveibeenpwned.com.') 

102 common.add_argument( 

103 '-d', '--dry-run', action='store_true', 

104 help='Do not import passwords, only show what would be imported.') 

105 

106 # Extra options 

107 extra = self.add_argument_group(title='Extra optional arguments') 

108 extra.add_argument( 

109 '--sep', dest='separator', metavar='CHAR', default='-', 

110 help="Provide a characters of replacement for the path separator. " 

111 "Default: '-'") 

112 extra.add_argument( 

113 '--del', dest='delimiter', metavar='CHAR', default=',', 

114 help="Provide an alternative CSV delimiter character. " 

115 "Default: ','") 

116 extra.add_argument( 

117 '--cols', action='store', default='', 

118 help='CSV expected columns to map columns to credential attributes' 

119 '. Only used by the csv importer.') 

120 extra.add_argument( 

121 '--filter', dest='filter', metavar='FILTER', default=None, 

122 help="""Export whole entries matching a JSONPath filter expression. 

123 Default: (none) 

124 This field can be: 

125 - a string JSONPath expression 

126 - an absolute path to a file containing a JSONPath filter 

127 expression. 

128 List of supported filter: 

129 https://github.com/h2non/jsonpath-ng 

130 Example: 

131 - '$.entries[*].tags[?@="Defaults"]' : Export only entries 

132 with a tag matching 'Defaults'""") 

133 extra.add_argument('--config', action='store', default='', 

134 help="Set a config file. Default: '.import'") 

135 

136 # Managers list 

137 usage = self.add_argument_group( 

138 title='Help related optional arguments') 

139 if self.passwordstore: 

140 usage.add_argument('-l', '--list', action='store_true', 

141 help='List the supported password managers.') 

142 else: 

143 usage.add_argument('-l', '--list-importers', action='store_true', 

144 help='List the supported password importers.') 

145 usage.add_argument('-e', '--list-exporters', action='store_true', 

146 help='List the supported password exporters.') 

147 

148 # Help and version 

149 usage.add_argument('-h', '--help', action='store_true', 

150 help='Show this help message and exit.') 

151 usage.add_argument('-V', '--version', action='version', 

152 version='%(prog)s ' + __version__, 

153 help='Show the program version and exit.') 

154 group = usage.add_mutually_exclusive_group() 

155 group.add_argument('-v', '--verbose', action='count', default=0, 

156 help='Set verbosity level, ' 

157 'can be used more than once.') 

158 group.add_argument('-q', '--quiet', action='store_true', 

159 help='Be quiet.') 

160 

161 def parse_args(self, args=None, namespace=None): 

162 """Parse pass-import arguments & print help.""" 

163 if args is None: 

164 sys.argv.pop(0) 

165 args = sys.argv 

166 

167 arg = vars(super().parse_args(args, namespace)) 

168 arg['prog'] = self.prog 

169 if arg['help']: 

170 name = '' 

171 if self.passwordstore: 

172 if arg['src']: 

173 name = arg['src'][0] 

174 else: 

175 name = arg['dst'] 

176 

177 if name in MANAGERS.names(): 

178 self.print_help_manager(name) 

179 else: 

180 self.print_help() 

181 sys.exit(0) 

182 return arg 

183 

184 def print_help_manager(self, name): 

185 """Print manager usage.""" 

186 print(f'Usage: {self.prog} {name} [options]\n') 

187 for pm in MANAGERS.matrix().get(name): 

188 print(f'{pm.description()}:') 

189 usage = pm.usage() 

190 if usage: 

191 print(usage) 

192 if pm.format != '': 

193 print(f' Format: {pm.format}') 

194 if pm.version != '': 

195 print(f' Version: {pm.version}') 

196 if pm.url != '': 

197 print(f" Url: {pm.url}") 

198 if pm.hexport != '': 

199 print(f' Export: {pm.hexport}') 

200 if pm.himport != '': 

201 print(f' Import: {pm.himport}') 

202 if pm.default: 

203 print(f' This is the default importer for {name}.') 

204 if pm.cap is Cap.IMPORT | Cap.EXPORT: 

205 print(' Can be used for password import and export.') 

206 print() 

207 

208 

209def setup(): 

210 """Read progam arguments, configuration & sanity checks.""" 

211 conf = Config() 

212 parser = ArgParser(conf.passwordstore) 

213 arg = parser.parse_args() 

214 conf.verbosity(arg['verbose'], arg['quiet']) 

215 try: 

216 conf.readconfig(arg) 

217 except AttributeError as error: 

218 conf.verbose(error) 

219 conf.die("configuration file not valid.") 

220 conf.currate() 

221 

222 if conf['list_importers'] or conf['list_exporters']: 

223 listmanagers(conf) 

224 

225 if conf['exporter'] == '': 

226 conf.die("destination password manager not present.") 

227 

228 if conf['exporter'] not in MANAGERS.names(Cap.EXPORT): 

229 conf.die(f"{conf['exporter']} is not a supported " 

230 "destination password manager.") 

231 

232 if not conf['src']: 

233 conf.die("The source password manager or the path to import is empty.") 

234 

235 return conf 

236 

237 

238def listmanagers(conf): 

239 """List the supported password managers.""" 

240 cap = Cap.IMPORT if conf['list_importers'] is True else Cap.EXPORT 

241 if conf.quiet: 

242 print('\n'.join(MANAGERS.names(cap))) 

243 sys.exit(0) 

244 

245 if cap is Cap.EXPORT: 

246 msg = (f"The {len(MANAGERS.names(cap))} supported exporter " 

247 "password managers are:") 

248 else: 

249 msg = f"The {len(MANAGERS)} supported password managers are:" 

250 conf.success(msg) 

251 

252 max_res = '' 

253 listing = {} 

254 matrix = MANAGERS.matrix(cap) 

255 for name in matrix: 

256 frmts = [] 

257 for pm in matrix[name]: 

258 res = pm.format 

259 if pm.version: 

260 res += f' (v{pm.version})' 

261 max_res = max(max_res, res) 

262 frmts.append(res) 

263 listing[name] = frmts 

264 

265 padding1 = len(max(MANAGERS.names(cap), key=len)) + 1 

266 if conf.verb: 

267 padding2 = len(max_res) + 1 

268 for name in sorted(matrix): 

269 for pm, frmt in zip(matrix[name], listing[name]): 

270 conf.message(conf.BOLD + name.ljust(padding1) + conf.end + 

271 frmt.ljust(padding2) + pm.__name__) 

272 else: 

273 tmp = [', '.join(frmts) for frmts in listing.values()] 

274 padding2 = len(max(tmp, key=len)) + 1 

275 for name in sorted(listing): 

276 conf.message(conf.BOLD + name.ljust(padding1) + conf.end + 

277 ', '.join(listing[name]).ljust(padding2) + 

278 matrix[name][0].url) 

279 sys.exit(0) 

280 

281 

282def decryptsource(conf): 

283 """Decrypt source file if required.""" 

284 path = conf['src'][1] if len(conf['src']) >= 2 else conf['src'][0] 

285 if os.path.isfile(path): 

286 decrypters = Detecters(Cap.DECRYPT) 

287 frmt, encoding = get_magics(path) 

288 if encoding: 

289 conf['encoding'] = encoding 

290 if frmt in decrypters: 

291 with decrypters[frmt](path) as file: 

292 conf['plaintext'] = file.decrypt() 

293 conf['decrypted'] = True 

294 conf.verbose(f"Source file decrypted using {frmt}.") 

295 

296 

297def detectmanager(conf): 

298 """Detect file format and password manager.""" 

299 prefix = '' 

300 if len(conf['src']) == 1: 

301 name = conf['src'][0] 

302 if name in MANAGERS.names(): 

303 conf.verbose("Using default manager.") 

304 detect = AutoDetect(name) 

305 pm = detect.default() 

306 

307 else: 

308 conf.verbose("Trying to guess file format and manager name.") 

309 prefix = to_detect = name 

310 if conf['decrypted']: 

311 to_detect = conf['plaintext'] 

312 

313 detect = AutoDetect(settings=conf.getsettings()) 

314 pm = detect.manager(to_detect) 

315 if pm is None: 

316 conf.die("Unable to detect the manager. Please try with: " 

317 f"{conf['prog']} <manager> {prefix}") 

318 

319 else: 

320 name = conf['src'][0] 

321 prefix = conf['src'][1] 

322 if name in MANAGERS.names(): 

323 conf.verbose("Trying to guess file format.") 

324 to_detect = prefix 

325 if conf['decrypted']: 

326 to_detect = conf['plaintext'] 

327 

328 detect = AutoDetect(name, settings=conf.getsettings()) 

329 pm = detect.format(to_detect) 

330 

331 elif name in MANAGERS.clsnames(): 

332 pm = MANAGERS.get(name) 

333 conf.verbose(f"Using import class: {pm.__name__}.") 

334 

335 else: 

336 conf.die(f"{name} is not a supported source password manager.") 

337 

338 conf.verbose(f"Importer: {pm.name}, Format: {pm.format}, Version: " 

339 f" {pm.version}") 

340 

341 if 'plaintext' in conf: 

342 conf['in'] = io.StringIO(conf['plaintext']) 

343 else: 

344 conf['in'] = prefix 

345 conf['importer'] = pm.name 

346 return pm 

347 

348 

349def zxcvbn_parse(details): 

350 """Nicely print the results from zxcvbn.""" 

351 sequence = '' 

352 for seq in details.get('sequence', []): 

353 sequence += f"{seq['token']}({seq['pattern']}) " 

354 res = f"Score {details['score']} ({details['guesses']} guesses). " 

355 return res + f"This estimate is based on the sequence {sequence}" 

356 

357 

358# pylint: disable=inconsistent-return-statements 

359def pass_import(conf, cls_import): 

360 """Import data.""" 

361 try: 

362 settings = conf.getsettings(conf['sroot']) 

363 with cls_import(conf['in'], settings=settings) as importer: 

364 importer.parse() 

365 if not importer.secure: # pragma: no cover 

366 conf.warning(f"The password manager {conf['importer']} has " 

367 "been flagged as unsecure, you should update all " 

368 "your newly imported credentials.") 

369 return importer.data 

370 

371 except (FormatError, AttributeError, ValueError, TypeError) as error: 

372 conf.debug(traceback.format_exc()) 

373 conf.warning(error) 

374 conf.die( 

375 f"{conf['in']} is not a valid exported {conf['importer']} file.") 

376 

377 except ImportError as error: 

378 conf.verbose(error) 

379 err = (f"Importing {conf['importer']}, missing required dependency: " 

380 f"{error.name}\n" 

381 f"You can install it with:\n 'pip3 install {error.name}'") 

382 if error.name not in ['pykeepass']: 

383 err += f", or\n 'sudo apt-get install python3-{error.name}'" 

384 conf.die(err) 

385 

386 except (PermissionError, PMError) as error: 

387 conf.debug(traceback.format_exc()) 

388 conf.die(error) 

389 

390 

391def pass_export(conf, cls_export, data): 

392 """Insert cleaned data into the password repository.""" 

393 paths_imported = [] 

394 paths_exported = [] 

395 try: 

396 settings = conf.getsettings(conf['droot'], Cap.EXPORT) 

397 with cls_export(conf['out'], settings=settings) as exporter: 

398 exporter.data = data 

399 exporter.clean(conf['clean'], conf['convert']) 

400 report = exporter.audit(conf['pwned']) 

401 for entry in exporter.data: 

402 pmpath = os.path.join(conf['droot'], entry.get( 

403 'path', entry.get('title', ''))) 

404 conf.show(entry) 

405 exported = pass_filter(conf, entry) 

406 try: 

407 if exported: 

408 if not conf['dry_run']: 

409 exporter.insert(entry) 

410 except PMError as error: 

411 conf.debug(traceback.format_exc()) 

412 conf.warning(f"Impossible to insert {pmpath} into " 

413 f"{conf['exporter']}: {error}") 

414 else: 

415 paths_imported.append(pmpath) 

416 if exported: 

417 paths_exported.append(pmpath) 

418 except PMError as error: 

419 conf.debug(traceback.format_exc()) 

420 conf.die(error) 

421 

422 return paths_imported, paths_exported, report 

423 

424 

425def pass_filter(conf, entry): 

426 """Filter entry based on a JSONPath filter expression.""" 

427 filter_expression = conf.get('filter', None) 

428 if filter_expression is None: 

429 return True 

430 

431 if not JSONNG: 

432 message = ("--filter requires pass-import[filter] " 

433 "or pass-import[all] to be installed") 

434 raise ImportError('Missing packages. ' + message) 

435 

436 # Having end users write their JSONPath filter expression as if 

437 # pass-import processes/filters entries in bulk, will allow end 

438 # users filter expression to continue to work when/if bulk filter is 

439 # supported. Hence 'entries' being added 

440 data = {'entries': [entry]} 

441 try: 

442 expr = jsonpath_ng.ext.parse(filter_expression) 

443 matches = expr.find(data) 

444 return len(matches) > 0 

445 except (JsonPathLexerError, JsonPathParserError) as e: 

446 conf.warning("Entry not exported due to error " 

447 "most likely in FILTER expression" 

448 + e.args[0]) 

449 if conf.verb >= 2: 

450 conf.verbose("- Filter expression:" + filter_expression) 

451 conf.verbose("- Entry: " + data) 

452 return False 

453 

454 

455def report(conf, paths_imported, paths_exported, audit): 

456 """Print final success report.""" 

457 if conf['dry_run']: 

458 conf.warning(f"Data would be imported from {conf['importer']} " 

459 f"to {conf['exporter']}") 

460 else: 

461 conf.success(f"Importing passwords from {conf['importer']} " 

462 f"to {conf['exporter']}") 

463 conf.message(f"Passwords imported from: {conf['in']}") 

464 conf.message(f"Passwords exported to: {conf['out']}") 

465 if conf['sroot'] != '': 

466 conf.message(f"Root path: {conf['sroot']}") 

467 if conf['droot'] != '': 

468 conf.message(f"Root path: {conf['droot']}") 

469 conf.message(f"Number of password imported: {len(paths_imported)}") 

470 if conf['convert']: 

471 conf.message("Forbidden chars converted") 

472 conf.message(f"Path separator used: {conf['separator']}") 

473 if conf['clean']: 

474 conf.message("Imported data cleaned") 

475 if conf['all']: 

476 conf.message("All data imported") 

477 for password, count in audit['breached']: 

478 conf.warning(f"Password breached {count} time(s): {password}") 

479 for password, details in audit['weak']: 

480 conf.warning(f"Weak password detected: {password} might be weak." 

481 f" {zxcvbn_parse(details)}") 

482 for entry in audit['duplicated']: 

483 conf.warning(f"Duplicated passwords detected: " 

484 f"{', '.join([item['path'] for item in entry])}") 

485 

486 for paths, header in [ 

487 (paths_imported, "Passwords imported"), 

488 (paths_exported, "Passwords exported") 

489 ]: 

490 if paths is not None: 

491 conf.message(header + ": " + str(len(paths))) 

492 paths.sort() 

493 for path in paths: 

494 conf.echo(path) 

495 

496 

497def main(): 

498 """`pimport` and `pass import` common main.""" 

499 conf = setup() 

500 decryptsource(conf) 

501 

502 # Password managers detection 

503 cls_import = detectmanager(conf) 

504 cls_export = MANAGERS.get(conf['exporter'], cap=Cap.EXPORT) 

505 conf.verbose(f"Importing passwords from {cls_import.__name__} " 

506 f"to {cls_export.__name__}") 

507 conf.verbose("Checking for breached passwords", 

508 "on haveibeenpwned.com" if conf['pwned'] else '') 

509 

510 # Import & export 

511 data = pass_import(conf, cls_import) 

512 paths_imported, paths_exported, audit = pass_export(conf, cls_export, data) 

513 

514 # Success! 

515 report(conf, paths_imported, paths_exported, audit) 

516 

517 

518if __name__ == "__main__": 

519 main()