Send Dev Sec Compl Data to Splunk - Python
About this Python Sample App
This sample app is a very simple Python application that does the following:
- Refreshes an existing token stored on the file system in a json file using its
refresh_token
. - Downloads all results from the Device Security Compliance endpoint and then stores it in a large json file on the file system.
- Loads the results data from the json file and then breaks it into chunks to then send to Splunk.
There are additional comments in the code that describe high-level what is happening.
You can copy the sample code below. Ensure that you install the referenced imported packages that are not part of the standard library.
How to Run
Install the requirements with:
pip install requests
Run the Sample App with:
python send_dev_sec_compl_to_splunk.py
Problems Running the Code?
If you have any problems running the code then reach out to us in our Community Forum.
Sample Code
import requests import json import base64 # Insert your own info here as you defined when you created your APP # Note that in a real app you would not want to hard-code these values # Instead you would want to import them from the environment or even use # a more secure solution like a keystore. CLIENT_ID = "your-client-id" CLIENT_SECRET = "your-client-secret" REDIRECT_URI = "http://127.0.0.1:5000/" STATE = "yourstatestring" # note that in a true production app you would use state to protect against cross site attacks HOSTNAME = 'daas.api.hp.com' SPLUNK_TOKEN = 'your-spunk-auth-token' SPLUNK_HOST = 'your-splunk-hostname-and-port' def refresh_token(): '''refresh existing token for a new one''' with open('creds.json', 'r') as f: creds = json.load(f) refresh_token = creds['refresh_token'] base64_encoded_clientid_clientsecret = base64.b64encode(str.encode(f'{CLIENT_ID}:{CLIENT_SECRET}')) # concatenate with : and encode in base64 base64_encoded_clientid_clientsecret = base64_encoded_clientid_clientsecret.decode('ascii') # turn bytes object into ascii string base_url = f'https://{HOSTNAME}' url = f"{base_url}/oauth/v1/token" headers = { 'Content-Type': "application/x-www-form-urlencoded", 'Authorization': f'Basic {base64_encoded_clientid_clientsecret}' } data = {'grant_type': 'refresh_token', 'redirect_uri': REDIRECT_URI, 'refresh_token': refresh_token } r = requests.post(url, headers=headers, data=data) response = r.json() if response.get('access_token'): # don't store creds in plaintext in a real app obviously with open('creds.json', 'w') as f: json.dump(response, f, indent=4) else: print('There was an error refreshing your access token') print(r.text) def get_device_security_compliance(): # load credentials from file with open('creds.json', 'r') as f: creds = json.load(f) access_token = creds['access_token'] # load access token base_url = f'https://{HOSTNAME}' url = f"{base_url}/analytics/v1/reports/devicesec/twentyFourHrSummary/type/grid" headers = { 'Content-Type': "application/json", 'Authorization': f"Bearer {access_token}", } # get total results for this query so we know how much data we need response = requests.request("POST", url, headers=headers, params={'count': 1}) total_results = int(response.json()['totalResults']) # Now time to get all the results from the API start_index = 1 results = [] current_results = 0 while current_results < total_results: querystring = {"startIndex": f"{start_index}", "count": "1000"} response = requests.post( url, headers=headers, params=querystring) print(response.text) response = response.json() results += response['resources'] current_results = len(results) start_index += 1 print(f'Results Downloaded: {len(results)}') print(f'Total results downloaded: {len(results)}') with open('results.json', 'w+') as f: f.write(json.dumps(results, indent=4, sort_keys=True)) return 0 def chunks(l, n): """Yield successive n-sized chunks from l.""" for i in range(0, len(l), n): yield l[i:i + n] def send_json_to_splunk(filename): '''Function to post the json results to splunk after splitting them up into chunks of 1000''' with open('inventory.json', 'r') as f: results = json.load(f) print(len(results)) split_up = chunks(results, 1000) for result in split_up: url = f"https://{SPLUNK_HOST}/services/collector/raw" querystring = {"channel": "6ec257d3-9e37-4ddf-9f35-6783ac50e276"} headers = { 'Authorization': f"Splunk {SPLUNK_TOKEN}", 'Content-Type': "text/plain", 'Host': f"{SPLUNK_HOST}", } response = requests.post(url, json=result, headers=headers, params=querystring, verify=False) print(response.text) if __name__ == '__main__': refresh_token() # refresh an existing token (we are assuming you have one stored in creds.json) get_device_security_compliance() # get the data we want send_json_to_splunk('results.json') # store it in Splunk