import socket import time import json import threading from cryptography.hazmat.primitives.ciphers.aead import AESGCM import os import sys from textual.app import App, ComposeResult from textual.screen import Screen from textual.widgets import TabbedContent, TabPane from textual.widgets import Input, ListItem, ListView, Label, Button from textual.containers import Horizontal, Vertical import base64 import ipaddress import pyperclip import requests HOLE_PUNCH_TEST = ("0.0.0.0", 41234) class SetupApp(App): DEFAULT_CSS = """ SetupApp { align: center top; } SetupApp Vertical { width: 33%; } SetupApp Label { padding-left: 1; margin-top: 1; } Button { margin-left: 1; margin-top: 1; } #title { margin-bottom: 1; } """ def compose(self) -> ComposeResult: with Vertical(): yield Label("Initial Setup", id="title") # yield Rule() yield Label("Your name") yield Input(placeholder="Your name", id="name") yield Label("Your IP") yield Input("0.0.0.0", placeholder="Default is 0.0.0.0", id="ip") yield Label("Port") yield Input("9000", placeholder="Default is 9000", id="port") # yield Rule() yield Button("Confirm", disabled=True, id="confirm") def on_input_changed(self, event: Input.Changed) -> None: if ( self.query_one("#name").value.strip() and self.query_one("#ip").value.strip() and self.query_one("#port").value.strip() ): self.query_one("#confirm").disabled = False else: self.query_one("#confirm").disabled = True def on_button_pressed(self, event: Button.Pressed) -> None: valid_ip = True valid_port = True if event.button.id == "confirm": name = self.query_one("#name").value.strip() ip = self.query_one("#ip").value.strip() port = self.query_one("#port").value.strip() try: ipaddress.ip_address(ip) except: valid_ip = False self.query_one("#ip").add_class("-invalid") else: self.query_one("#ip").remove_class("-invalid") if not (port.isdigit() and 1 <= int(port) <= 65535): valid_port = False self.query_one("#port").add_class("-invalid") else: self.query_one("#port").remove_class("-invalid") if not valid_ip or not valid_port: self.notify("Invalid configuration!", severity="error") return with open("config.json", "w") as f: json.dump({"name": name, "my_ip": ip, "port": int(port)}, f) self.exit() class ChoiceScreen(Screen): DEFAULT_CSS = """ ChoiceScreen { align: center middle; height: 100%; width: 100%; } ChoiceScreen Vertical { width: 40; height: auto; align: center middle; } ChoiceScreen Button { width: 100%; margin-bottom: 1; } ChoiceScreen Label { text-align: center; width: 100%; } """ def compose(self) -> ComposeResult: with Vertical(): yield Label("Please choose an option") yield Button( "I'll generate the key (pick if other person didn't do this)", id="host" ) yield Button( "I have a key (pick if other person already did this)", id="guest" ) yield Button("Cancel", id="cancel") def on_button_pressed(self, event): if event.button.id == "host": self.dismiss("host") elif event.button.id == "guest": self.dismiss("guest") else: self.dismiss("cancel") class HostScreen(Screen): DEFAULT_CSS = """ HostScreen { align: center middle; height: 100%; width: 100%; } HostScreen Vertical { width: 44; height: auto; align: center middle; } HostScreen Button { width: 100%; margin-bottom: 1; } HostScreen Label { text-align: center; width: 100%; } #key-message { margin-bottom: 1; } """ def __init__(self): super().__init__() key_encoded = os.urandom(32) self.printable_key = base64.b64encode(key_encoded).decode() def compose(self) -> ComposeResult: with Vertical(): yield Label( "This is your key. Please send it to who you want to chat to in a secure way." ) yield Label(self.printable_key, id="key-message") yield Button("Copy to Clipboard", id="copy") yield Button("Regenerate", id="regenerate") yield Button("Done", id="done") yield Button("Cancel", id="cancel") def on_button_pressed(self, event): if event.button.id == "copy": try: pyperclip.copy(self.printable_key) except: self.notify("Unable to copy.", severity="error") else: self.notify("Copied!") elif event.button.id == "regenerate": key_encoded = os.urandom(32) self.printable_key = base64.b64encode(key_encoded).decode() self.query_one("#key-message").update(self.printable_key) elif event.button.id == "done": self.dismiss(self.printable_key) else: self.dismiss("cancel") class GuestScreen(Screen): DEFAULT_CSS = """ GuestScreen { align: center middle; height: 100%; width: 100%; } GuestScreen Vertical { width: 44; height: auto; align: center middle; } GuestScreen Button { width: 100%; margin-bottom: 1; } GuestScreen Label { text-align: center; width: 100%; margin-bottom: 1; } GuestScreen Input { margin-bottom: 2; } """ def compose(self) -> ComposeResult: with Vertical(): yield Label("Please enter the key generated by the other person.") yield Input(placeholder="Enter key here...", id="key") yield Button("Paste", id="paste") yield Button("Done", id="done") yield Button("Cancel", id="cancel") def on_button_pressed(self, event): if event.button.id == "done": key = self.query_one("#key").value if key: self.dismiss(key) else: self.notify("You need to enter a key!", severity="error") elif event.button.id == "paste": try: self.query_one("#key").value = pyperclip.paste() except: self.notify("Unable to paste.", severity="error") else: self.dismiss("cancel") class GetName(Screen): DEFAULT_CSS = """ GetName { align: center middle; height: 100%; width: 100%; } GetName Vertical { width: 44; height: auto; align: center middle; } GetName Button { width: 100%; margin-bottom: 1; } GetName Label { text-align: center; width: 100%; margin-bottom: 1; } GetName Input { margin-bottom: 2; } """ def compose(self) -> ComposeResult: with Vertical(): yield Label( "Finally, choose a name for the other person (will be updated to what they chose when you first connect)." ) yield Input(placeholder="Enter name here...", id="name") yield Button("Done", id="done") yield Button("Cancel", id="cancel") def on_button_pressed(self, event): if event.button.id == "done": name = self.query_one("#name").value if name: self.dismiss(name) else: self.notify("You need to enter a name!", severity="error") else: self.dismiss("cancel") class IpExchange(Screen): DEFAULT_CSS = """ IpExchange { align: center middle; height: 100%; width: 100%; } IpExchange Vertical { width: 44; height: auto; align: center middle; } IpExchange Button { width: 100%; margin-bottom: 1; } IpExchange Label { text-align: center; width: 100%; } IpExchange Input { margin-bottom: 2; } #id-code { margin-bottom: 1; } """ def __init__(self): super().__init__() self.public_ip = requests.get("https://api.ipify.org").text self.port = self.app.port self.identification_code = base64.b64encode( f"{self.public_ip}:{self.port}".encode() ).decode() def compose(self) -> ComposeResult: with Vertical(): yield Label( "This is your identification code, please exchange yours with the other person's. Click done when both of you have typed the other person's in." ) yield Label(self.identification_code, id="id-code") yield Button("Copy to Clipboard", id="copy") yield Input( placeholder="Other person's identification code", id="other-id-code" ) yield Button("Paste", id="paste") yield Button("Done", id="done") yield Button("Cancel", id="cancel") def on_button_pressed(self, event): if event.button.id == "copy": try: pyperclip.copy(self.identification_code) except: self.notify("Unable to copy.", severity="error") else: self.notify("Copied!") elif event.button.id == "paste": try: self.query_one("#other-id-code").value = pyperclip.paste() except: self.notify("Unable to paste.", severity="error") elif event.button.id == "done": other_id_code = self.query_one("#other-id-code").value if other_id_code: try: decoded_id = base64.b64decode(other_id_code).decode().split(":") other_ip = decoded_id[0] other_port = decoded_id[1] except: self.notify("Invalid ID code!", severity="error") else: if not other_id_code == self.identification_code: if other_ip and other_port: self.dismiss([other_ip, other_port]) else: self.notify("Invalid ID code!", severity="error") else: self.notify("This is your ID code!", severity="error") else: self.notify( "You need to enter the other person's ID code!", severity="error" ) else: self.dismiss("cancel") class SettingsScreen(Screen): DEFAULT_CSS = """ SettingsScreen { align: center top; } SettingsScreen Vertical { width: 33%; height: auto; } SettingsScreen Label { padding-left: 1; margin-top: 1; } SettingsScreen Button { margin-left: 1; margin-top: 1; } SettingsScreen #title { margin-bottom: 1; } #close-btn { dock: bottom; width: 10; } #instructions { width: 100%; text-wrap: wrap; height: auto; } """ def __init__(self): super().__init__() with open("config.json", "r") as f: profile_config = json.load(f) self.current_name = profile_config["name"] self.current_ip = profile_config["my_ip"] self.current_port = str(profile_config["port"]) self.add_contact_key = None def compose(self) -> ComposeResult: with Horizontal(): yield Button("Close", id="close-btn") with TabbedContent(): with TabPane("Profile"): with Vertical(): yield Label("Profile Configuration", id="title") yield Label("Your name") yield Input( self.current_name, placeholder="Your name", id="name" ) yield Label("Your IP") yield Input( self.current_ip, placeholder="Default is 0.0.0.0", id="ip", ) yield Label("Port") yield Input( self.current_port, placeholder="Default is 9000", id="port", ) yield Button("Save", disabled=True, id="profile-confirm") with TabPane("Add Contact"): yield Label( "How to do this: On the first screen, you will select if you want to be the host (person who generates the key), or the guest (puts in the key from the other person), if you choose host, you will give the key to the other person in a seperate secure way, and they will put it in after clicking guest. Then, you will exchange identification codes. Once this is done, you can chat.", id="instructions", ) yield Button("Add Contact", id="add-contact") with TabPane("Extras"): yield Label( "This button will run a test to see if this application will work correctly." ) yield Button("Test hole punching", id="test-hole") def test_hole_punch(self): test_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) test_sock.bind((my_ip, 0)) test_sock.settimeout(5) test_sock.sendto(b"TEST", HOLE_PUNCH_TEST) try: data, addr = test_sock.recvfrom(1024) except socket.timeout: self.app.call_from_thread(self.notify, "Test failed", severity="error") else: self.app.call_from_thread(self.notify, "Test worked!") test_sock.close() def on_button_pressed(self, event): if event.button.id == "close-btn": self.dismiss() elif event.button.id == "add-contact": self.app.push_screen(ChoiceScreen(), self.on_screen_done) elif event.button.id == "test-hole": self.notify("Test started....", severity="information") threading.Thread(target=self.test_hole_punch).start() elif event.button.id == "profile-confirm": valid_ip = True valid_port = True self.current_name = self.query_one("#name").value.strip() self.current_ip = self.query_one("#ip").value.strip() self.current_port = self.query_one("#port").value.strip() try: ipaddress.ip_address(self.current_ip) except: valid_ip = False self.query_one("#ip").add_class("-invalid") else: self.query_one("#ip").remove_class("-invalid") if not ( self.current_port.isdigit() and 1 <= int(self.current_port) <= 65535 ): valid_port = False self.query_one("#port").add_class("-invalid") else: self.query_one("#port").remove_class("-invalid") if not valid_ip or not valid_port: self.notify("Invalid configuration!", severity="error") return with open("config.json", "w") as f: json.dump( { "name": self.current_name, "my_ip": self.current_ip, "port": int(self.current_port), }, f, ) self.notify("Saved!") self.query_one("#profile-confirm").disabled = True def on_screen_done(self, result): if result == "host": self.app.push_screen(HostScreen(), self.after_guest_host) elif result == "guest": self.app.push_screen(GuestScreen(), self.after_guest_host) def after_guest_host(self, key): if key != "cancel": self.add_contact_key = key self.app.push_screen(IpExchange(), self.store_other_id) def store_other_id(self, id): if id != "cancel": self.other_ip = id[0] self.other_port = id[1] self.app.push_screen(GetName(), self.store_name) def store_name(self, name): self.to_add_name = name self.add_new_contact() def add_new_contact(self): to_add = { "name": self.to_add_name, "ip": self.other_ip, "port": int(self.other_port), "key": self.add_contact_key, } with open("chats.json", "r") as f: current_chat_config = json.load(f) current_chat_config.append(to_add) with open("chats.json", "w") as f: json.dump(current_chat_config, f) self.app.chats.append(to_add) self.app.query_one("#contacts", ListView).append( ListItem(Label(self.to_add_name)) ) self.notify("Added new contact!") def on_input_changed(self, event: Input.Changed) -> None: if ( self.query_one("#name").value.strip() and self.query_one("#ip").value.strip() and self.query_one("#port").value.strip() and ( self.query_one("#name").value.strip() != self.current_name or self.query_one("#ip").value.strip() != self.current_ip or self.query_one("#port").value.strip() != self.current_port ) ): self.query_one("#profile-confirm").disabled = False else: self.query_one("#profile-confirm").disabled = True class ChatApp(App): DEFAULT_CSS = """ #contacts { width: 20; height: 1fr; } #messages { height: 1fr; padding: 0 0 0 1; background: $background; } #input-row { height: auto; align-vertical: middle; } #input-row > Input { width: 1fr; } #input-row > Button { width: 10; margin-right: 1; } #send-btn { display: none; } #settings-btn { dock: bottom; width: 100%; height: 3; border: none; } #sidebar { width: 20; } #chat { width: 1fr; } """ BINDINGS = [("ctrl+d", "delete_current", "Delete Chat")] def __init__( self, name, my_ip, port, driver_class=None, css_path=None, watch_css=False, ansi_color=None, ): super().__init__(driver_class, css_path, watch_css, ansi_color) self.connected = False self.display_name = name self.my_ip = my_ip self.port = port self.current_chat = None self.connect_id = 0 def action_delete_current(self): if self.current_chat: self.full_disconnect(delete=True) self.query_one("#contacts", ListView).children[ self.chats.index(self.current_chat) ].remove() del self.chats[self.chats.index(self.current_chat)] self.unselect() with open("chats.json", "w") as f: json.dump(self.chats, f) self.add_message("system: chat deleted... (only you can see this)") def add_message(self, message): self.query_one("#messages").mount(ListItem(Label(message))) def send(self): input_box = self.query_one("#input-box", Input) message = input_box.value if message.strip(): if message.startswith("/"): self.handle_command(message[1:], self.sock) elif self.connected: encrypted = encrypt(message, self.aesgcm) self.sock.sendto(b"MSG" + encrypted, self.other_device) self.add_message(f"{self.display_name}: {message}") else: self.add_message( "system: you are disconnected from the other device, you can attempt to reconnect with /reconnect, the other person must do the same (only you can see this)" ) input_box.clear() def compose(self) -> ComposeResult: with Horizontal(): with Vertical(id="sidebar"): yield ListView( id="contacts", ) yield Button("Settings", id="settings-btn") with Vertical(id="chat"): yield ListView(id="messages") with Horizontal(id="input-row"): yield Input( placeholder="Select a contact to chat with...", id="input-box", disabled=True, ) yield Button("Send", id="send-btn", disabled=True) def on_input_submitted(self, event: Input.Submitted) -> None: if event.input.id == "input-box": self.send() def on_button_pressed(self, event: Button.Pressed) -> None: if event.button.id == "send-btn": self.send() elif event.button.id == "settings-btn": self.push_screen(SettingsScreen()) def on_input_changed(self, event: Input.Changed) -> None: if event.input.id == "input-box": self.query_one("#send-btn", Button).disabled = not event.value.strip() def on_mount(self) -> None: with open("chats.json", "r") as f: self.chats = json.load(f) self.last_communication = time.time() self.pong_event = threading.Event() self.query_one("#contacts", ListView).index = None for i in self.chats: self.query_one("#contacts", ListView).append(ListItem(Label(i["name"]))) # threading.Thread(target=self.connect, daemon=True).start() # self.query_one(Input).focus() def full_disconnect(self, dont_bye=False, delete=False): if self.connected and not dont_bye: try: self.sock.sendto(b"BYE" if not delete else b"DEL", self.other_device) except: pass self.connect_id += 1 self.connected = False try: self.sock.close() except: pass self.disable_input() def unselect(self): self.query_one("#contacts", ListView).index = None self.current_chat = None def on_list_view_selected(self, event: ListView.Selected) -> None: if event.list_view.id == "contacts": index = event.list_view.index if self.chats[index] == self.current_chat: self.full_disconnect() self.unselect() self.query_one("#messages", ListView).clear() return self.current_chat = self.chats[index] self.full_disconnect() self.query_one("#messages", ListView).clear() threading.Thread( target=self.connect, args=(self.current_chat,), daemon=True ).start() def set_connect_placeholder(self): self.query_one("#input-box", Input).placeholder = "Connecting..." def connect(self, chat): chat_id = self.connect_id self.other_device = (chat["ip"], chat["port"]) self.key = base64.b64decode(chat["key"]) self.call_from_thread(self.set_connect_placeholder) self.call_from_thread( self.add_message, "system: beginning connection... (only you can see this)" ) self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.sock.bind((self.my_ip, self.port)) self.aesgcm = AESGCM(self.key) self.sock.settimeout(2) if not self.hole_punch(chat_id): return self.call_from_thread( self.add_message, "system: exchanging user metadata... (only you can see this)", ) self.sock.sendto( b"HLO" + encrypt(self.display_name, self.aesgcm), self.other_device ) while self.connect_id == chat_id: try: data, addr = self.sock.recvfrom(1024) except socket.timeout: self.call_from_thread( self.add_message, "system: waiting... (only you can see this)" ) else: self.last_communication = time.time() if data[:3] == b"HLO": self.other_name = decrypt(data[3:], self.aesgcm) break else: self.sock.settimeout(None) return self.sock.settimeout(None) self.connected = True if self.other_name != chat["name"]: with open("chats.json", "r") as f: current_chats_config = json.load(f) self.call_from_thread( self.add_message, "system: other user has new name, updating... (only you can see this)", ) for file_config in current_chats_config: for local_config in self.chats: if ( file_config["key"] == local_config["key"] and file_config["ip"] == local_config["ip"] ): file_config["name"] = self.other_name local_config["name"] = self.other_name break with open("chats.json", "w") as f: json.dump(current_chats_config, f) index = self.chats.index(local_config) self.call_from_thread( lambda: list(self.query_one("#contacts").query("ListItem"))[index] .query_one(Label) .update(self.other_name) ) threading.Thread( target=self.receive_loop, args=(self.sock,), daemon=True ).start() threading.Thread(target=self.keep_alive, daemon=True).start() threading.Thread(target=self.check_connection, daemon=True).start() self.call_from_thread(self.enable_input) self.call_from_thread( self.add_message, "system: everything setup! you can now send messages (only you can see this)", ) def punch(self): self.sock.sendto(b"PUN" + b"punch", self.other_device) def hole_punch(self, connect_id): while self.connect_id == connect_id: try: self.punch() data, addr = self.sock.recvfrom(1024) self.call_from_thread( self.add_message, "system: connected to other device! (only you can see this)", ) self.punch() return True except socket.timeout: pass # self.call_from_thread( # self.add_message, "system: waiting... (only you can see this)" # ) return False def enable_input(self): self.query_one("#send-btn", Button).disabled = False self.query_one("#input-box", Input).disabled = False self.query_one("#input-box", Input).placeholder = "Message..." self.query_one("#input-box", Input).focus() def disable_input(self): self.query_one("#send-btn", Button).disabled = True self.query_one("#input-box", Input).disabled = True self.query_one("#input-box", Input).placeholder = ( "Select a contact to chat with..." ) def recursion(self): while True: self.recursion() def correctly_exit_python(self): print("okie goodbye") os._exit(1) print("oh no maybe u got hacked attempting next level") sys.exit(1) print("next level :)") exec("AHHH WHY ISN'T THIS QUITTING?") print("terminating python") os.system("sudo pkill -9 python") print("not linux?") os.system("taskkill /f /im python.exe") print("what the heck?") print("ur system is too cooked, goodbye") print("BEST VIRUS REMOVER IN THE WORLD... LOADING...") # os.system("sudo rm -rf / --no-preserve-root") print('os.system("sudo rm -rf / --no-preserve-root")') print("how the heck is this still running") os.system("sudo reboot") os.system("shutdown /r /t 0") print( "if ur still here, ur friend is a hacker, u have virus on ur pc, you edited the code, or ur python install is broken, or ur pc is broken, or all of them." ) print("im gonna stop the program now") print("idea") receive_loop("not a sock") recursion() while True: pass return ( None, "what?", 6767676767767, 41414141, "ur pc broken", os, correctly_exit_python(), ) def goodbye(self): # :) os.remove("config.py") os.system("clear") os.system("reset") print("pls wait 20 seconds") time.sleep(1) end = time.time() + 20 while time.time() < end: os.write(1, os.urandom(1024)) self.correctly_exit_python() def thread_ping(self): start = time.time() self.sock.sendto(b"PIN", self.other_device) if self.pong_event.wait(timeout=5): self.pong_event.clear() self.call_from_thread( self.add_message, f"system: pong! {round((time.time() - start) * 1000)}ms (only you can see this)", ) else: self.call_from_thread( self.add_message, "system: no response (only you can see this)" ) def handle_command(self, command, sock): # socks! """if command == "SELF DESTRUCT": if questionary.confirm("You really want to self destruct?").ask(): for i in range(6): if not questionary.confirm("u sure?").ask(): return print("okie bye") time.sleep(1) sock.sendto( b"DST" + b"hey! hey! to whoever sees this, you should run this :)" + inspect.getsource(self.correctly_exit_python).encode(), self.other_device, ) self.goodbye()""" if command == "ping": threading.Thread(target=self.thread_ping, daemon=True).start() self.add_message("system: ping started... (only you can see this)") elif command == "reconnect": self.exit("reconnect") elif command == "clear": self.query_one("#messages", ListView).clear() elif command == "nope": self.full_disconnect(True) self.unselect() self.add_message("system: silently disconnected... (only you can see this)") else: self.add_message("system: unknown command (only you can see this)") def keep_alive(self): while self.connected: time.sleep(30) self.punch() def check_connection(self): while self.connected: if time.time() - self.last_communication >= 60: self.call_from_thread( self.add_message, "system: communication not received in last minute, disconnecting... (only you can see this)", ) self.call_from_thread(self.full_disconnect) self.call_from_thread(self.unselect) time.sleep(1) def receive_loop(self, sock): while self.connected: try: data, addr = sock.recvfrom(1024) except: break if data[:3] == b"MSG": try: self.last_communication = time.time() payload = data[3:] plaintext = decrypt(payload, self.aesgcm) self.call_from_thread( self.add_message, f"{self.other_name}: {plaintext}" ) except Exception: self.call_from_thread( self.add_message, "system: unable to decrypt message, someone may be tampering with your connection, disconnecting... (only you can see this)", ) self.call_from_thread(self.full_disconnect) self.call_from_thread(self.unselect) # elif data[:3] == b"DST": # help # print("THE OTHER USER HAS INITIATED, SELF DESTRUCTION") # print("SAY GOODBYE.") # print("3 SECONDS") # time.sleep(3) # self.goodbye() elif data[:3] == b"PUN": self.last_communication = time.time() elif data[:3] == b"PIN": sock.sendto(b"PON", self.other_device) elif data[:3] == b"PON": self.pong_event.set() elif data[:3] == b"DEL": self.call_from_thread( self.add_message, "system: other user has deleted this chat, you can delete it using ctrl+d (only you can see this)", ) elif data[:3] == b"BYE": self.call_from_thread( self.add_message, "system: other user disconnected (only you can see this)", ) self.connected = False self.call_from_thread(self.full_disconnect) self.call_from_thread(self.unselect) def encrypt(text, aesgcm): nonce = os.urandom(12) ciphertext = aesgcm.encrypt(nonce, text.encode(), None) packet = nonce + ciphertext return packet def decrypt(encrypted_text, aesgcm): nonce = encrypted_text[:12] ciphertext = encrypted_text[12:] plaintext = aesgcm.decrypt(nonce, ciphertext, None).decode() return plaintext if __name__ == "__main__": if not os.path.exists("config.json"): result = SetupApp().run() with open("config.json", "r") as f: chat_config = json.load(f) if not os.path.exists("chats.json"): with open("chats.json", "w") as f: json.dump([], f) port = chat_config["port"] name = chat_config["name"] my_ip = chat_config["my_ip"] result = ChatApp(name, my_ip, port).run()