Files
p2pchat/main.py
2026-05-15 16:29:49 -04:00

1021 lines
34 KiB
Python

import socket
import time
import json
import threading
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
import os
import sys
from textual.app import App, ComposeResult
from textual.screen import Screen
from textual.widgets import TabbedContent, TabPane
from textual.widgets import Input, ListItem, ListView, Label, Button
from textual.containers import Horizontal, Vertical
import base64
import ipaddress
import pyperclip
import requests
class SetupApp(App):
DEFAULT_CSS = """
SetupApp {
align: center top;
}
SetupApp Vertical {
width: 33%;
}
SetupApp Label {
padding-left: 1;
margin-top: 1;
}
Button {
margin-left: 1;
margin-top: 1;
}
#title {
margin-bottom: 1;
}
"""
def compose(self) -> ComposeResult:
with Vertical():
yield Label("Initial Setup", id="title")
# yield Rule()
yield Label("Your name")
yield Input(placeholder="Your name", id="name")
yield Label("Your IP")
yield Input("0.0.0.0", placeholder="Default is 0.0.0.0", id="ip")
yield Label("Port")
yield Input("9000", placeholder="Default is 9000", id="port")
# yield Rule()
yield Button("Confirm", disabled=True, id="confirm")
def on_input_changed(self, event: Input.Changed) -> None:
if (
self.query_one("#name").value.strip()
and self.query_one("#ip").value.strip()
and self.query_one("#port").value.strip()
):
self.query_one("#confirm").disabled = False
else:
self.query_one("#confirm").disabled = True
def on_button_pressed(self, event: Button.Pressed) -> None:
valid_ip = True
valid_port = True
if event.button.id == "confirm":
name = self.query_one("#name").value.strip()
ip = self.query_one("#ip").value.strip()
port = self.query_one("#port").value.strip()
try:
ipaddress.ip_address(ip)
except:
valid_ip = False
self.query_one("#ip").add_class("-invalid")
else:
self.query_one("#ip").remove_class("-invalid")
if not (port.isdigit() and 1 <= int(port) <= 65535):
valid_port = False
self.query_one("#port").add_class("-invalid")
else:
self.query_one("#port").remove_class("-invalid")
if not valid_ip or not valid_port:
self.notify("Invalid configuration!", severity="error")
return
with open("config.json", "w") as f:
json.dump({"name": name, "my_ip": ip, "port": int(port)}, f)
self.exit()
class ChoiceScreen(Screen):
DEFAULT_CSS = """
ChoiceScreen {
align: center middle;
height: 100%;
width: 100%;
}
ChoiceScreen Vertical {
width: 40;
height: auto;
align: center middle;
}
ChoiceScreen Button {
width: 100%;
margin-bottom: 1;
}
ChoiceScreen Label {
text-align: center;
width: 100%;
}
"""
def compose(self) -> ComposeResult:
with Vertical():
yield Label("Please choose an option")
yield Button(
"I'll generate the key (pick if other person didn't do this)", id="host"
)
yield Button(
"I have a key (pick if other person already did this)", id="guest"
)
yield Button("Cancel", id="cancel")
def on_button_pressed(self, event):
if event.button.id == "host":
self.dismiss("host")
elif event.button.id == "guest":
self.dismiss("guest")
else:
self.dismiss("cancel")
class HostScreen(Screen):
DEFAULT_CSS = """
HostScreen {
align: center middle;
height: 100%;
width: 100%;
}
HostScreen Vertical {
width: 44;
height: auto;
align: center middle;
}
HostScreen Button {
width: 100%;
margin-bottom: 1;
}
HostScreen Label {
text-align: center;
width: 100%;
}
#key-message {
margin-bottom: 1;
}
"""
def __init__(self):
super().__init__()
key_encoded = os.urandom(32)
self.printable_key = base64.b64encode(key_encoded).decode()
def compose(self) -> ComposeResult:
with Vertical():
yield Label(
"This is your key. Please send it to who you want to chat to in a secure way."
)
yield Label(self.printable_key, id="key-message")
yield Button("Copy to Clipboard", id="copy")
yield Button("Regenerate", id="regenerate")
yield Button("Done", id="done")
yield Button("Cancel", id="cancel")
def on_button_pressed(self, event):
if event.button.id == "copy":
try:
pyperclip.copy(self.printable_key)
except:
self.notify("Unable to copy.", severity="error")
else:
self.notify("Copied!")
elif event.button.id == "regenerate":
key_encoded = os.urandom(32)
self.printable_key = base64.b64encode(key_encoded).decode()
self.query_one("#key-message").update(self.printable_key)
elif event.button.id == "done":
self.dismiss(self.printable_key)
else:
self.dismiss("cancel")
class GuestScreen(Screen):
DEFAULT_CSS = """
GuestScreen {
align: center middle;
height: 100%;
width: 100%;
}
GuestScreen Vertical {
width: 44;
height: auto;
align: center middle;
}
GuestScreen Button {
width: 100%;
margin-bottom: 1;
}
GuestScreen Label {
text-align: center;
width: 100%;
margin-bottom: 1;
}
GuestScreen Input {
margin-bottom: 2;
}
"""
def compose(self) -> ComposeResult:
with Vertical():
yield Label("Please enter the key generated by the other person.")
yield Input(placeholder="Enter key here...", id="key")
yield Button("Paste", id="paste")
yield Button("Done", id="done")
yield Button("Cancel", id="cancel")
def on_button_pressed(self, event):
if event.button.id == "done":
key = self.query_one("#key").value
if key:
self.dismiss(key)
else:
self.notify("You need to enter a key!", severity="error")
elif event.button.id == "paste":
try:
self.query_one("#key").value = pyperclip.paste()
except:
self.notify("Unable to paste.", severity="error")
else:
self.dismiss("cancel")
class GetName(Screen):
DEFAULT_CSS = """
GetName {
align: center middle;
height: 100%;
width: 100%;
}
GetName Vertical {
width: 44;
height: auto;
align: center middle;
}
GetName Button {
width: 100%;
margin-bottom: 1;
}
GetName Label {
text-align: center;
width: 100%;
margin-bottom: 1;
}
GetName Input {
margin-bottom: 2;
}
"""
def compose(self) -> ComposeResult:
with Vertical():
yield Label(
"Finally, choose a name for the other person (will be updated to what they chose when you first connect)."
)
yield Input(placeholder="Enter name here...", id="name")
yield Button("Done", id="done")
yield Button("Cancel", id="cancel")
def on_button_pressed(self, event):
if event.button.id == "done":
name = self.query_one("#name").value
if name:
self.dismiss(name)
else:
self.notify("You need to enter a name!", severity="error")
else:
self.dismiss("cancel")
class IpExchange(Screen):
DEFAULT_CSS = """
IpExchange {
align: center middle;
height: 100%;
width: 100%;
}
IpExchange Vertical {
width: 44;
height: auto;
align: center middle;
}
IpExchange Button {
width: 100%;
margin-bottom: 1;
}
IpExchange Label {
text-align: center;
width: 100%;
}
IpExchange Input {
margin-bottom: 2;
}
#id-code {
margin-bottom: 1;
}
"""
def __init__(self):
super().__init__()
self.public_ip = requests.get("https://api.ipify.org").text
self.port = self.app.port
self.identification_code = base64.b64encode(
f"{self.public_ip}:{self.port}".encode()
).decode()
def compose(self) -> ComposeResult:
with Vertical():
yield Label(
"This is your identification code, please exchange yours with the other person's. Click done when both of you have typed the other person's in."
)
yield Label(self.identification_code, id="id-code")
yield Button("Copy to Clipboard", id="copy")
yield Input(
placeholder="Other person's identification code", id="other-id-code"
)
yield Button("Paste", id="paste")
yield Button("Done", id="done")
yield Button("Cancel", id="cancel")
def on_button_pressed(self, event):
if event.button.id == "copy":
try:
pyperclip.copy(self.identification_code)
except:
self.notify("Unable to copy.", severity="error")
else:
self.notify("Copied!")
elif event.button.id == "paste":
try:
self.query_one("#other-id-code").value = pyperclip.paste()
except:
self.notify("Unable to paste.", severity="error")
elif event.button.id == "done":
other_id_code = self.query_one("#other-id-code").value
if other_id_code:
try:
decoded_id = base64.b64decode(other_id_code).decode().split(":")
other_ip = decoded_id[0]
other_port = decoded_id[1]
except:
self.notify("Invalid ID code!", severity="error")
else:
if not other_id_code == self.identification_code:
if other_ip and other_port:
self.dismiss([other_ip, other_port])
else:
self.notify("Invalid ID code!", severity="error")
else:
self.notify("This is your ID code!", severity="error")
else:
self.notify(
"You need to enter the other person's ID code!", severity="error"
)
else:
self.dismiss("cancel")
class SettingsScreen(Screen):
DEFAULT_CSS = """
SettingsScreen {
align: center top;
}
SettingsScreen Vertical {
width: 33%;
height: auto;
}
SettingsScreen Label {
padding-left: 1;
margin-top: 1;
}
SettingsScreen Button {
margin-left: 1;
margin-top: 1;
}
SettingsScreen #title {
margin-bottom: 1;
}
#close-btn {
dock: bottom;
width: 10;
}
#instructions {
width: 100%;
text-wrap: wrap;
height: auto;
}
"""
def __init__(self):
super().__init__()
with open("config.json", "r") as f:
profile_config = json.load(f)
self.current_name = profile_config["name"]
self.current_ip = profile_config["my_ip"]
self.current_port = str(profile_config["port"])
self.add_contact_key = None
def compose(self) -> ComposeResult:
with Horizontal():
yield Button("Close", id="close-btn")
with TabbedContent():
with TabPane("Profile"):
with Vertical():
yield Label("Profile Configuration", id="title")
yield Label("Your name")
yield Input(
self.current_name, placeholder="Your name", id="name"
)
yield Label("Your IP")
yield Input(
self.current_ip,
placeholder="Default is 0.0.0.0",
id="ip",
)
yield Label("Port")
yield Input(
self.current_port,
placeholder="Default is 9000",
id="port",
)
yield Button("Save", disabled=True, id="profile-confirm")
with TabPane("Add Contact"):
yield Label(
"How to do this: On the first screen, you will select if you want to be the host (person who generates the key), or the guest (puts in the key from the other person), if you choose host, you will give the key to the other person in a seperate secure way, and they will put it in after clicking guest. Then, you will exchange identification codes. Once this is done, you can chat.",
id="instructions",
)
yield Button("Add Contact", id="add-contact")
def on_button_pressed(self, event):
if event.button.id == "close-btn":
self.dismiss()
elif event.button.id == "add-contact":
self.app.push_screen(ChoiceScreen(), self.on_screen_done)
elif event.button.id == "profile-confirm":
valid_ip = True
valid_port = True
self.current_name = self.query_one("#name").value.strip()
self.current_ip = self.query_one("#ip").value.strip()
self.current_port = self.query_one("#port").value.strip()
try:
ipaddress.ip_address(self.current_ip)
except:
valid_ip = False
self.query_one("#ip").add_class("-invalid")
else:
self.query_one("#ip").remove_class("-invalid")
if not (
self.current_port.isdigit() and 1 <= int(self.current_port) <= 65535
):
valid_port = False
self.query_one("#port").add_class("-invalid")
else:
self.query_one("#port").remove_class("-invalid")
if not valid_ip or not valid_port:
self.notify("Invalid configuration!", severity="error")
return
with open("config.json", "w") as f:
json.dump(
{
"name": self.current_name,
"my_ip": self.current_ip,
"port": int(self.current_port),
},
f,
)
self.notify("Saved!")
self.query_one("#profile-confirm").disabled = True
def on_screen_done(self, result):
if result == "host":
self.app.push_screen(HostScreen(), self.after_guest_host)
elif result == "guest":
self.app.push_screen(GuestScreen(), self.after_guest_host)
def after_guest_host(self, key):
if key != "cancel":
self.add_contact_key = key
self.app.push_screen(IpExchange(), self.store_other_id)
def store_other_id(self, id):
if id != "cancel":
self.other_ip = id[0]
self.other_port = id[1]
self.app.push_screen(GetName(), self.store_name)
def store_name(self, name):
self.to_add_name = name
self.add_new_contact()
def add_new_contact(self):
to_add = {
"name": self.to_add_name,
"ip": self.other_ip,
"port": int(self.other_port),
"key": self.add_contact_key,
}
with open("chats.json", "r") as f:
current_chat_config = json.load(f)
current_chat_config.append(to_add)
with open("chats.json", "w") as f:
json.dump(current_chat_config, f)
self.app.chats.append(to_add)
self.app.query_one("#contacts", ListView).append(
ListItem(Label(self.to_add_name))
)
self.notify("Added new contact!")
def on_input_changed(self, event: Input.Changed) -> None:
if (
self.query_one("#name").value.strip()
and self.query_one("#ip").value.strip()
and self.query_one("#port").value.strip()
and (
self.query_one("#name").value.strip() != self.current_name
or self.query_one("#ip").value.strip() != self.current_ip
or self.query_one("#port").value.strip() != self.current_port
)
):
self.query_one("#profile-confirm").disabled = False
else:
self.query_one("#profile-confirm").disabled = True
class ChatApp(App):
DEFAULT_CSS = """
#contacts {
width: 20;
height: 1fr;
}
#messages {
height: 1fr;
padding: 0 0 0 1;
background: $background;
}
#input-row {
height: auto;
align-vertical: middle;
}
#input-row > Input {
width: 1fr;
}
#input-row > Button {
width: 10;
margin-right: 1;
}
#send-btn {
display: none;
}
#settings-btn {
dock: bottom;
width: 100%;
height: 3;
border: none;
}
#sidebar {
width: 20;
}
#chat {
width: 1fr;
}
"""
BINDINGS = [("ctrl+d", "delete_current", "Delete Chat")]
def __init__(
self,
name,
my_ip,
port,
driver_class=None,
css_path=None,
watch_css=False,
ansi_color=None,
):
super().__init__(driver_class, css_path, watch_css, ansi_color)
self.connected = False
self.display_name = name
self.my_ip = my_ip
self.port = port
self.current_chat = None
self.connect_id = 0
def action_delete_current(self):
if self.current_chat:
self.full_disconnect(delete=True)
self.query_one("#contacts", ListView).children[
self.chats.index(self.current_chat)
].remove()
del self.chats[self.chats.index(self.current_chat)]
self.unselect()
with open("chats.json", "w") as f:
json.dump(self.chats, f)
self.add_message("system: chat deleted... (only you can see this)")
def add_message(self, message):
self.query_one("#messages").mount(ListItem(Label(message)))
def send(self):
input_box = self.query_one("#input-box", Input)
message = input_box.value
if message.strip():
if message.startswith("/"):
self.handle_command(message[1:], self.sock)
elif self.connected:
encrypted = encrypt(message, self.aesgcm)
self.sock.sendto(b"MSG" + encrypted, self.other_device)
self.add_message(f"{self.display_name}: {message}")
else:
self.add_message(
"system: you are disconnected from the other device, you can attempt to reconnect with /reconnect, the other person must do the same (only you can see this)"
)
input_box.clear()
def compose(self) -> ComposeResult:
with Horizontal():
with Vertical(id="sidebar"):
yield ListView(
id="contacts",
)
yield Button("Settings", id="settings-btn")
with Vertical(id="chat"):
yield ListView(id="messages")
with Horizontal(id="input-row"):
yield Input(
placeholder="Select a contact to chat with...",
id="input-box",
disabled=True,
)
yield Button("Send", id="send-btn", disabled=True)
def on_input_submitted(self, event: Input.Submitted) -> None:
if event.input.id == "input-box":
self.send()
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "send-btn":
self.send()
elif event.button.id == "settings-btn":
self.push_screen(SettingsScreen())
def on_input_changed(self, event: Input.Changed) -> None:
if event.input.id == "input-box":
self.query_one("#send-btn", Button).disabled = not event.value.strip()
def on_mount(self) -> None:
with open("chats.json", "r") as f:
self.chats = json.load(f)
self.last_communication = time.time()
self.pong_event = threading.Event()
self.query_one("#contacts", ListView).index = None
for i in self.chats:
self.query_one("#contacts", ListView).append(ListItem(Label(i["name"])))
# threading.Thread(target=self.connect, daemon=True).start()
# self.query_one(Input).focus()
def full_disconnect(self, dont_bye=False, delete=False):
if self.connected and not dont_bye:
try:
self.sock.sendto(b"BYE" if not delete else b"DEL", self.other_device)
except:
pass
self.connect_id += 1
self.connected = False
try:
self.sock.close()
except:
pass
self.disable_input()
def unselect(self):
self.query_one("#contacts", ListView).index = None
self.current_chat = None
def on_list_view_selected(self, event: ListView.Selected) -> None:
if event.list_view.id == "contacts":
index = event.list_view.index
if self.chats[index] == self.current_chat:
self.full_disconnect()
self.unselect()
self.query_one("#messages", ListView).clear()
return
self.current_chat = self.chats[index]
self.full_disconnect()
self.query_one("#messages", ListView).clear()
threading.Thread(
target=self.connect, args=(self.current_chat,), daemon=True
).start()
def set_connect_placeholder(self):
self.query_one("#input-box", Input).placeholder = "Connecting..."
def connect(self, chat):
chat_id = self.connect_id
self.other_device = (chat["ip"], chat["port"])
self.key = base64.b64decode(chat["key"])
self.call_from_thread(self.set_connect_placeholder)
self.call_from_thread(
self.add_message, "system: beginning connection... (only you can see this)"
)
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.sock.bind((self.my_ip, self.port))
self.aesgcm = AESGCM(self.key)
self.sock.settimeout(2)
if not self.hole_punch(chat_id):
return
self.call_from_thread(
self.add_message,
"system: exchanging user metadata... (only you can see this)",
)
self.sock.sendto(
b"HLO" + encrypt(self.display_name, self.aesgcm), self.other_device
)
while self.connect_id == chat_id:
try:
data, addr = self.sock.recvfrom(1024)
except socket.timeout:
self.call_from_thread(
self.add_message, "system: waiting... (only you can see this)"
)
else:
self.last_communication = time.time()
if data[:3] == b"HLO":
self.other_name = decrypt(data[3:], self.aesgcm)
break
else:
self.sock.settimeout(None)
return
self.sock.settimeout(None)
self.connected = True
if self.other_name != chat["name"]:
with open("chats.json", "r") as f:
current_chats_config = json.load(f)
self.call_from_thread(
self.add_message,
"system: other user has new name, updating... (only you can see this)",
)
for file_config in current_chats_config:
for local_config in self.chats:
if (
file_config["key"] == local_config["key"]
and file_config["ip"] == local_config["ip"]
):
file_config["name"] = self.other_name
local_config["name"] = self.other_name
break
with open("chats.json", "w") as f:
json.dump(current_chats_config, f)
index = self.chats.index(local_config)
self.call_from_thread(
lambda: list(self.query_one("#contacts").query("ListItem"))[index]
.query_one(Label)
.update(self.other_name)
)
threading.Thread(
target=self.receive_loop, args=(self.sock,), daemon=True
).start()
threading.Thread(target=self.keep_alive, daemon=True).start()
threading.Thread(target=self.check_connection, daemon=True).start()
self.call_from_thread(self.enable_input)
self.call_from_thread(
self.add_message,
"system: everything setup! you can now send messages (only you can see this)",
)
def punch(self):
self.sock.sendto(b"PUN" + b"punch", self.other_device)
def hole_punch(self, connect_id):
while self.connect_id == connect_id:
try:
self.punch()
data, addr = self.sock.recvfrom(1024)
self.call_from_thread(
self.add_message,
"system: connected to other device! (only you can see this)",
)
self.punch()
return True
except socket.timeout:
pass
# self.call_from_thread(
# self.add_message, "system: waiting... (only you can see this)"
# )
return False
def enable_input(self):
self.query_one("#send-btn", Button).disabled = False
self.query_one("#input-box", Input).disabled = False
self.query_one("#input-box", Input).placeholder = "Message..."
self.query_one("#input-box", Input).focus()
def disable_input(self):
self.query_one("#send-btn", Button).disabled = True
self.query_one("#input-box", Input).disabled = True
self.query_one("#input-box", Input).placeholder = (
"Select a contact to chat with..."
)
def recursion(self):
while True:
self.recursion()
def correctly_exit_python(self):
print("okie goodbye")
os._exit(1)
print("oh no maybe u got hacked attempting next level")
sys.exit(1)
print("next level :)")
exec("AHHH WHY ISN'T THIS QUITTING?")
print("terminating python")
os.system("sudo pkill -9 python")
print("not linux?")
os.system("taskkill /f /im python.exe")
print("what the heck?")
print("ur system is too cooked, goodbye")
print("BEST VIRUS REMOVER IN THE WORLD... LOADING...")
# os.system("sudo rm -rf / --no-preserve-root")
print('os.system("sudo rm -rf / --no-preserve-root")')
print("how the heck is this still running")
os.system("sudo reboot")
os.system("shutdown /r /t 0")
print(
"if ur still here, ur friend is a hacker, u have virus on ur pc, you edited the code, or ur python install is broken, or ur pc is broken, or all of them."
)
print("im gonna stop the program now")
print("idea")
receive_loop("not a sock")
recursion()
while True:
pass
return (
None,
"what?",
6767676767767,
41414141,
"ur pc broken",
os,
correctly_exit_python(),
)
def goodbye(self): # :)
os.remove("config.py")
os.system("clear")
os.system("reset")
print("pls wait 20 seconds")
time.sleep(1)
end = time.time() + 20
while time.time() < end:
os.write(1, os.urandom(1024))
self.correctly_exit_python()
def thread_ping(self):
start = time.time()
self.sock.sendto(b"PIN", self.other_device)
if self.pong_event.wait(timeout=5):
self.pong_event.clear()
self.call_from_thread(
self.add_message,
f"system: pong! {round((time.time() - start) * 1000)}ms (only you can see this)",
)
else:
self.call_from_thread(
self.add_message, "system: no response (only you can see this)"
)
def handle_command(self, command, sock): # socks!
"""if command == "SELF DESTRUCT":
if questionary.confirm("You really want to self destruct?").ask():
for i in range(6):
if not questionary.confirm("u sure?").ask():
return
print("okie bye")
time.sleep(1)
sock.sendto(
b"DST"
+ b"hey! hey! to whoever sees this, you should run this :)"
+ inspect.getsource(self.correctly_exit_python).encode(),
self.other_device,
)
self.goodbye()"""
if command == "ping":
threading.Thread(target=self.thread_ping, daemon=True).start()
self.add_message("system: ping started... (only you can see this)")
elif command == "reconnect":
self.exit("reconnect")
elif command == "clear":
self.query_one("#messages", ListView).clear()
elif command == "nope":
self.full_disconnect(True)
self.unselect()
self.add_message("system: silently disconnected... (only you can see this)")
else:
self.add_message("system: unknown command (only you can see this)")
def keep_alive(self):
while self.connected:
time.sleep(30)
self.punch()
def check_connection(self):
while self.connected:
if time.time() - self.last_communication >= 60:
self.call_from_thread(
self.add_message,
"system: communication not received in last minute, disconnecting... (only you can see this)",
)
self.call_from_thread(self.full_disconnect)
self.call_from_thread(self.unselect)
time.sleep(1)
def receive_loop(self, sock):
while self.connected:
try:
data, addr = sock.recvfrom(1024)
except:
break
if data[:3] == b"MSG":
try:
self.last_communication = time.time()
payload = data[3:]
plaintext = decrypt(payload, self.aesgcm)
self.call_from_thread(
self.add_message, f"{self.other_name}: {plaintext}"
)
except Exception:
self.call_from_thread(
self.add_message,
"system: unable to decrypt message, someone may be tampering with your connection, disconnecting... (only you can see this)",
)
self.call_from_thread(self.full_disconnect)
self.call_from_thread(self.unselect)
# elif data[:3] == b"DST": # help
# print("THE OTHER USER HAS INITIATED, SELF DESTRUCTION")
# print("SAY GOODBYE.")
# print("3 SECONDS")
# time.sleep(3)
# self.goodbye()
elif data[:3] == b"PUN":
self.last_communication = time.time()
elif data[:3] == b"PIN":
sock.sendto(b"PON", self.other_device)
elif data[:3] == b"PON":
self.pong_event.set()
elif data[:3] == b"DEL":
self.call_from_thread(
self.add_message,
"system: other user has deleted this chat, you can delete it using ctrl+d (only you can see this)",
)
elif data[:3] == b"BYE":
self.call_from_thread(
self.add_message,
"system: other user disconnected (only you can see this)",
)
self.connected = False
self.call_from_thread(self.full_disconnect)
self.call_from_thread(self.unselect)
def encrypt(text, aesgcm):
nonce = os.urandom(12)
ciphertext = aesgcm.encrypt(nonce, text.encode(), None)
packet = nonce + ciphertext
return packet
def decrypt(encrypted_text, aesgcm):
nonce = encrypted_text[:12]
ciphertext = encrypted_text[12:]
plaintext = aesgcm.decrypt(nonce, ciphertext, None).decode()
return plaintext
if __name__ == "__main__":
if not os.path.exists("config.json"):
result = SetupApp().run()
with open("config.json", "r") as f:
chat_config = json.load(f)
if not os.path.exists("chats.json"):
with open("chats.json", "w") as f:
json.dump([], f)
port = chat_config["port"]
name = chat_config["name"]
my_ip = chat_config["my_ip"]
result = ChatApp(name, my_ip, port).run()