Conversation
Getting outputs from vuls, chkrootkit and lynis- transfer them to ndjson and uploading to kibana.
| from pprint import pprint | ||
| from elasticsearch import Elasticsearch | ||
| import time | ||
| from art import * |
There was a problem hiding this comment.
avoid wildcard imports
| return ansi_escape.sub('', line) | ||
|
|
||
|
|
||
| def vuls(vuls_root, sudo_password): |
There was a problem hiding this comment.
consider removing the sudo_password argument.
I think it is better to require the script to be run as a privileged user from the beginning (sudo python3 ELK.py)
| commands = ["cd /", "cd " + vuls_root, sudo_password + vuls_scan] | ||
| to_execute = "" # the string that will run in the terminal at the end | ||
| for i in commands: | ||
| to_execute += i + ';' # merging the commands into one line |
There was a problem hiding this comment.
you are using a lot of "+" to concut strings it is much better to use format strings like this:
to_execute = f"cd /; cd {vuls_root}; {sudo_password}{vuls_scan};"
https://realpython.com/python-string-formatting/#3-string-interpolation-f-strings-python-36
| # running the scan and then the report- in order to get just the report output. | ||
| commands = ["cd /", "cd " + vuls_root, sudo_password + vuls_report] | ||
| to_execute = "" | ||
| for i in commands: |
There was a problem hiding this comment.
the for loop is not necessary if using a format string as mentioned above.
| output = subprocess.getoutput(to_execute) | ||
| # getting the data from the new json file: | ||
| directory = "/" + vuls_root + "/results" | ||
| output = subprocess.getoutput("sudo " + " chmod -R 777 " + directory) # giving access |
There was a problem hiding this comment.
- again use format strings
- security-wise giving everyone full permissions to the dir is not recommended. I believe you used it to read the results files with a normal (non-sudo) user for testing.
| # line = line.replace(" ", "") | ||
| line = line.strip() | ||
| # jdoc = {"hostname": hostname, "ipaddr": ipaddr, "type": type_of, "data": json.loads(line)} | ||
| if type_of != "lynis": |
There was a problem hiding this comment.
you can initiate a base dict:
mydict = {"instance_id": instance_id, "time": time, "account_id": account_id,
"session_id": session_id,
"type_of_scan": type_of}
at the beginning of each loop interval and use: mydict.update({"added_key": "value"})
to populate more data to the dict.
this way you have the initial format once - maintainable
| def main(): | ||
| tprint("ELK EC2 SCAN") | ||
| link = input("insert your Elk URL (e.g: localhost:9200) : ") | ||
| username = input("insert your Elk username for auth(if there is no auth, click ENTER): ") |
|
|
||
| begin_time = datetime.datetime.now() | ||
|
|
||
| vuls_directory = "home/ubuntu/idannos" |
There was a problem hiding this comment.
read directories path from user input with default to a sub dir in users home folder
| date = temp[0] # getting the date only without hours | ||
|
|
||
|
|
||
| # need to fill this before running: |
There was a problem hiding this comment.
boto3 has a built-in default to read these arguments from environment variables if set or read them from user input
| - install lynis: apt-get install lynis | ||
|
|
||
| - Helping with setting auth to ELK: https://github.com/deviantony/docker-elk | ||
|
|
There was a problem hiding this comment.
add a requirements.txt file to the repo.
https://realpython.com/lessons/using-requirement-files/
No description provided.