|
| 1 | +#!/usr/bin/env python |
| 2 | + |
| 3 | +""" |
| 4 | +@author Casey Boettcher |
| 5 | +@date 2017-09-26 |
| 6 | +
|
| 7 | +This script accepts an ip address or addresses, on the command line or via newline-delimited file, and calls the ipr |
| 8 | +method of IBM's X-Force API to retrieve data describing the address's reputation. |
| 9 | +
|
| 10 | +""" |
| 11 | + |
| 12 | +import sys |
| 13 | +import re |
1 | 14 | import pprint
|
2 | 15 | import argparse
|
3 | 16 | import requests
|
4 | 17 | import IPy
|
5 | 18 |
|
6 | 19 | XFORCE_API_BASE = 'https://api.xforce.ibmcloud.com'
|
7 | 20 | XFORCE_API_IP_REP = 'ipr'
|
| 21 | +XFORCE_CRED_PATTERN = '^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}' |
8 | 22 |
|
9 | 23 |
|
10 | 24 | def parse_args():
|
11 |
| -parser = argparse.ArgumentParser() |
12 |
| -parser.add_argument('api_authN_file', type=argparse.FileType('r'), |
| 25 | +""" |
| 26 | +Parse the command line. |
| 27 | +
|
| 28 | +:return: a Namespace object of parsed arguments |
| 29 | +""" |
| 30 | +parser = argparse.ArgumentParser(description="Use the X-Force API to check IP address reputation.") |
| 31 | +parser.add_argument('-o', '--out', metavar='output_file', |
| 32 | +type=argparse.FileType('w'), help="Write result of X-Force call to file.") |
| 33 | +parser.add_argument('authN', type=argparse.FileType('r'), |
13 | 34 | help='Path to a file containing your X-Force credentials, key and password on first and second '
|
14 | 35 | 'lines, respectively.')
|
15 |
| -parser.add_argument('address', nargs='?', metavar='ip_address', help='An IP address to be checked via X-Force. If ' |
16 |
| -'the IP address is omitted or invalid, the ' |
17 |
| -'user will be prompted for one.') |
| 36 | + |
| 37 | +# user should not be able to specify both IP on cmdline and in a file |
| 38 | +ip_group = parser.add_mutually_exclusive_group() |
| 39 | +# TODO: nargs='N' and loop through list |
| 40 | +ip_group.add_argument('-i', '--ip', nargs='?', metavar='ip_address', help='An IP address to be checked via ' |
| 41 | +'X-Force. If the IP address is omitted or invalid, the user will be prompted for one.') |
| 42 | +ip_group.add_argument('-I', '--Ips', type=argparse.FileType('r'), metavar='file_of_ip_addresses', |
| 43 | +help='A file containing IP addresses, one per line.') |
18 | 44 | return parser.parse_args()
|
19 | 45 |
|
20 | 46 |
|
21 | 47 | def request_valid_ip():
|
| 48 | +""" |
| 49 | +Prompts the user for a valid IP, then validates it, returning None if invalid |
| 50 | +
|
| 51 | +:return: |
| 52 | +a valid IP or None if the user supplied an invalid or private address |
| 53 | +""" |
22 | 54 | ip = input("Enter a valid IP address you would like to check: ")
|
23 | 55 | return validate_ip(ip)
|
24 | 56 |
|
25 | 57 |
|
26 | 58 | def validate_ip(ip):
|
| 59 | +""" |
| 60 | +Validate an address using IPy |
| 61 | +
|
| 62 | +:param ip: a string representation of an IP address |
| 63 | +:return: return None if IP is invalid or within a private network range |
| 64 | +""" |
27 | 65 | try:
|
28 | 66 | ipobj = IPy.IP(ip)
|
29 | 67 | if ipobj.iptype() == 'PRIVATE':
|
30 |
| -print("IP addresses should not be in private network ranges") |
| 68 | +print("IP addresses {} will be ignored as it is in a private network range.".format(ip)) |
31 | 69 | ip = None
|
32 | 70 | except ValueError as ve:
|
33 | 71 | print("Invalid IP: {}".format(ve.args))
|
@@ -36,33 +74,117 @@ def validate_ip(ip):
|
36 | 74 | return ip
|
37 | 75 |
|
38 | 76 |
|
| 77 | +def read_in_address_file(file): |
| 78 | +""" |
| 79 | +Reads a file of IP addresses and returns only those that are valid in a list |
| 80 | +
|
| 81 | +:param file: a plaintext file of IP addresses, one per line |
| 82 | +:return address_list: a list of valid IP addresses |
| 83 | +""" |
| 84 | +address_list = list() |
| 85 | +lines = 0 |
| 86 | +valid_ips = 0 |
| 87 | +with file as f: |
| 88 | +for n in file: |
| 89 | +lines += 1 |
| 90 | +if validate_ip(n.strip()): |
| 91 | +address_list.append(n.strip()) |
| 92 | +valid_ips += 1 |
| 93 | +if valid_ips < lines: |
| 94 | +print("Of the {} lines in the file you supplied, only {} were valid. The latter will be used to call the " |
| 95 | +"API.".format(lines, valid_ips)) |
| 96 | +if valid_ips == 0: |
| 97 | +print("Please supply a valid IP address.") |
| 98 | +address_list = None |
| 99 | +return address_list |
| 100 | + |
| 101 | + |
39 | 102 | def read_in_xforce_keys(file):
|
| 103 | +""" |
| 104 | +Read a plaintext file of two lines and return X-Force credentials in the form of a tuple, validating general form |
| 105 | +of the key and password in the process |
| 106 | +
|
| 107 | +:param file: a two-line plaintext files; the first line contains the X-Force API key and the second the password |
| 108 | +:return: a tuple of (key, password) |
| 109 | +""" |
| 110 | +matcher = re.compile(XFORCE_CRED_PATTERN) |
40 | 111 | for x in range(0, 2):
|
41 | 112 | if x == 0:
|
42 | 113 | key = file.readline().strip()
|
| 114 | +if not matcher.match(key): |
| 115 | +print("API key invalid. Exiting...") |
| 116 | +sys.exit(1) |
43 | 117 | if x == 1:
|
44 | 118 | password = file.readline().strip()
|
| 119 | +if not matcher.match(password): |
| 120 | +print("API password invalid. Exiting...") |
| 121 | +sys.exit(1) |
45 | 122 | return key, password
|
46 | 123 |
|
47 | 124 |
|
48 |
| -def main(): |
49 |
| -ip = None |
50 |
| -args = parse_args() |
| 125 | +def call_xforce_api(address_list, key, password): |
| 126 | +""" |
| 127 | +Call the ipr method of the X-Force API using the IP address(es) contained in the parameter. Results are written |
| 128 | +to a file or stdout (default). |
| 129 | +
|
| 130 | +:param address_list: a list of IP addresses |
| 131 | +:return: a list of json objects |
| 132 | +""" |
| 133 | +results = [] |
| 134 | +for a in address_list: |
| 135 | +url = "{}/{}/{}".format(XFORCE_API_BASE, XFORCE_API_IP_REP, a) |
| 136 | +results.append(requests.get(url, auth=(key, password)).json()) |
| 137 | +return results |
| 138 | + |
| 139 | + |
| 140 | +def print_json_stdout(results): |
| 141 | +""" |
| 142 | +Print a list of json objects to the console |
| 143 | +
|
| 144 | +:param results: a list of json objects |
| 145 | +""" |
| 146 | +for json in results: |
| 147 | +print("\n########## Result for IP {} ##########".format(json['ip'])) |
| 148 | +pprint.pprint(json) |
| 149 | +print('######################################') |
| 150 | +print() |
| 151 | + |
51 | 152 |
|
52 |
| -# get user-supplied IP address |
53 |
| -if args.address: |
54 |
| -ip = validate_ip(args.address) |
55 |
| -while not ip: |
56 |
| -ip = request_valid_ip() |
| 153 | +def print_json_file(results, file): |
| 154 | +""" |
| 155 | +Print a list of json objects to the console |
57 | 156 |
|
58 |
| -url = "{}/{}/{}".format(XFORCE_API_BASE, XFORCE_API_IP_REP, ip) |
| 157 | +:param results: a list of json objects |
| 158 | +:param file: the destination file for the printed list of json objects passed in |
| 159 | +""" |
| 160 | +print("Writing results to file...") |
| 161 | +for json in results: |
| 162 | +file.write("\n########## Result for IP {} ##########\n".format(json['ip'])) |
| 163 | +pprint.pprint(json, stream=file) |
| 164 | +file.write('######################################\n') |
59 | 165 |
|
60 |
| -# get X-Force API keys |
61 |
| -creds = read_in_xforce_keys(args.api_authN_file) |
62 |
| -result = requests.get(url, auth=(creds[0], creds[1])) |
63 |
| -# maybe user swapped key and password in api creds file? |
64 |
| -# if result.status_code == '401': |
65 |
| -pprint.pprint(result.json()) |
| 166 | + |
| 167 | +def main(): |
| 168 | + |
| 169 | +ip = None |
| 170 | +addresses = list() |
| 171 | +args = parse_args() |
| 172 | +if args.Ips: |
| 173 | +addresses = read_in_address_file(args.Ips) |
| 174 | +else: |
| 175 | +# get user-supplied IP address from the cmd line |
| 176 | +if args.ip: |
| 177 | +ip = validate_ip(args.ip) |
| 178 | +# prompt user for valid IP in case of typo on cmdline |
| 179 | +while not ip: |
| 180 | +ip = request_valid_ip() |
| 181 | +addresses.append(ip) |
| 182 | +creds = read_in_xforce_keys(args.authN) |
| 183 | +results = call_xforce_api(addresses, creds[0], creds[1]) |
| 184 | +if args.out: |
| 185 | +print_json_file(results, args.out) |
| 186 | +else: |
| 187 | +print_json_stdout(results) |
66 | 188 |
|
67 | 189 | return 0
|
68 | 190 |
|
|
0 commit comments