Coverage for pass_audit/audit.py: 100%

76 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2022-11-16 11:35 +0000

1# -*- encoding: utf-8 -*- 

2# pass audit - Password Store Extension (https://www.passwordstore.org/) 

3# Copyright (C) 2018-2022 Alexandre PUJOL <alexandre@pujol.io>. 

4# 

5 

6import os 

7import hashlib 

8 

9import requests 

10try: 

11 from zxcvbn import zxcvbn 

12 ZXCVBN = True 

13except ImportError: 

14 ZXCVBN = False 

15 

16from pass_audit import __version__ 

17 

18 

19class PwnedAPI(): 

20 """Simple wrapper for https://haveibeenpwned.com API.""" 

21 

22 def __init__(self): 

23 self.headers = {'user-agent': f"pass-audit/{__version__}"} 

24 

25 def password_range(self, prefix): 

26 """Query the haveibeenpwned API to retrieve the bucket ``prefix``.""" 

27 url = f"https://api.pwnedpasswords.com/range/{prefix}" 

28 res = requests.get(url, headers=self.headers, verify=True) 

29 res.raise_for_status() 

30 

31 hashes = [] 

32 counts = [] 

33 for item in res.text.split('\r\n'): 

34 (partialhash, count) = item.split(':') 

35 hashes.append(prefix + partialhash) 

36 counts.append(int(count)) 

37 return (hashes, counts) 

38 

39 

40class PassAudit(): 

41 """Pass audit main class.""" 

42 

43 def __init__(self, data, verbose): 

44 self.data = data 

45 self.verbose = verbose 

46 

47 def password(self): 

48 """K-anonymity password breach detection on haveibeenpwned.com.""" 

49 # Generate the list of hashes and prefixes to query. 

50 data = [] 

51 api = PwnedAPI() 

52 buckets = {} 

53 for path, entry in self.data.items(): 

54 if self.verbose: 

55 print(f"Getting the prefix of {path}") 

56 if entry.get('password', '') == '': 

57 continue 

58 password = entry['password'].encode("utf8") 

59 phash = hashlib.sha1(password).hexdigest().upper() # nosec 

60 prefix = phash[0:5] 

61 data.append((path, entry, phash, prefix)) 

62 if prefix not in buckets: 

63 buckets[prefix] = api.password_range(prefix) 

64 

65 # Compare the data and return the breached passwords. 

66 breached = [] 

67 for path, entry, phash, prefix in data: 

68 if phash in buckets[prefix][0]: 

69 index = buckets[prefix][0].index(phash) 

70 count = buckets[prefix][1][index] 

71 breached.append((path, entry.get('password', ''), count)) 

72 return breached 

73 

74 def zxcvbn(self): 

75 """Password strength estimation using Dropbox' zxcvbn.""" 

76 if not ZXCVBN: 

77 raise ImportError(name='zxcvbn') 

78 

79 weak = [] 

80 for path, entry in self.data.items(): 

81 if self.verbose: 

82 print(f"Checking {path}") 

83 if entry.get('password', '') == '': 

84 continue 

85 password = entry['password'] 

86 user_input = list(entry.values()) + path.split(os.sep) 

87 if password in user_input: 

88 user_input.remove(password) 

89 results = zxcvbn(password, user_inputs=user_input) 

90 if results['score'] <= 2: 

91 weak.append((path, password, results)) 

92 return weak 

93 

94 def duplicates(self): 

95 """Check for duplicated passwords.""" 

96 seen = {} 

97 for path, entry in self.data.items(): 

98 if entry.get('password', '') == '': 

99 continue 

100 password = entry['password'] 

101 if password in seen: 

102 seen[password].append(path) 

103 else: 

104 seen[password] = [path] 

105 

106 duplicated = [] 

107 for paths in seen.values(): 

108 if len(paths) > 1: 

109 duplicated.append(paths) 

110 return duplicated