Coverage for pass_audit/audit.py: 100%
76 statements
« prev ^ index » next coverage.py v6.5.0, created at 2022-11-16 11:35 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2022-11-16 11:35 +0000
1# -*- encoding: utf-8 -*-
2# pass audit - Password Store Extension (https://www.passwordstore.org/)
3# Copyright (C) 2018-2022 Alexandre PUJOL <alexandre@pujol.io>.
4#
6import os
7import hashlib
9import requests
10try:
11 from zxcvbn import zxcvbn
12 ZXCVBN = True
13except ImportError:
14 ZXCVBN = False
16from pass_audit import __version__
19class PwnedAPI():
20 """Simple wrapper for https://haveibeenpwned.com API."""
22 def __init__(self):
23 self.headers = {'user-agent': f"pass-audit/{__version__}"}
25 def password_range(self, prefix):
26 """Query the haveibeenpwned API to retrieve the bucket ``prefix``."""
27 url = f"https://api.pwnedpasswords.com/range/{prefix}"
28 res = requests.get(url, headers=self.headers, verify=True)
29 res.raise_for_status()
31 hashes = []
32 counts = []
33 for item in res.text.split('\r\n'):
34 (partialhash, count) = item.split(':')
35 hashes.append(prefix + partialhash)
36 counts.append(int(count))
37 return (hashes, counts)
40class PassAudit():
41 """Pass audit main class."""
43 def __init__(self, data, verbose):
44 self.data = data
45 self.verbose = verbose
47 def password(self):
48 """K-anonymity password breach detection on haveibeenpwned.com."""
49 # Generate the list of hashes and prefixes to query.
50 data = []
51 api = PwnedAPI()
52 buckets = {}
53 for path, entry in self.data.items():
54 if self.verbose:
55 print(f"Getting the prefix of {path}")
56 if entry.get('password', '') == '':
57 continue
58 password = entry['password'].encode("utf8")
59 phash = hashlib.sha1(password).hexdigest().upper() # nosec
60 prefix = phash[0:5]
61 data.append((path, entry, phash, prefix))
62 if prefix not in buckets:
63 buckets[prefix] = api.password_range(prefix)
65 # Compare the data and return the breached passwords.
66 breached = []
67 for path, entry, phash, prefix in data:
68 if phash in buckets[prefix][0]:
69 index = buckets[prefix][0].index(phash)
70 count = buckets[prefix][1][index]
71 breached.append((path, entry.get('password', ''), count))
72 return breached
74 def zxcvbn(self):
75 """Password strength estimation using Dropbox' zxcvbn."""
76 if not ZXCVBN:
77 raise ImportError(name='zxcvbn')
79 weak = []
80 for path, entry in self.data.items():
81 if self.verbose:
82 print(f"Checking {path}")
83 if entry.get('password', '') == '':
84 continue
85 password = entry['password']
86 user_input = list(entry.values()) + path.split(os.sep)
87 if password in user_input:
88 user_input.remove(password)
89 results = zxcvbn(password, user_inputs=user_input)
90 if results['score'] <= 2:
91 weak.append((path, password, results))
92 return weak
94 def duplicates(self):
95 """Check for duplicated passwords."""
96 seen = {}
97 for path, entry in self.data.items():
98 if entry.get('password', '') == '':
99 continue
100 password = entry['password']
101 if password in seen:
102 seen[password].append(path)
103 else:
104 seen[password] = [path]
106 duplicated = []
107 for paths in seen.values():
108 if len(paths) > 1:
109 duplicated.append(paths)
110 return duplicated