Added way to login via API and removed usage of cgi

This commit is contained in:
cryptoguard 2025-04-01 20:29:32 -04:00
parent 6311eb6ba3
commit 05e01d44b7

View file

@ -6,7 +6,6 @@
# file LICENSE or http://www.opensource.org/licenses/mit-license.php.
import os
import cgi
import json
import shlex
import secrets
@ -326,25 +325,33 @@ class HttpHandler(BaseHTTPRequestHandler):
template = env.get_template("login.html")
err_messages = []
extra_headers = []
is_json_request = "application/json" in self.headers.get("Content-Type", "")
security_warning = None
if self.server.host_name not in ("127.0.0.1", "localhost"):
security_warning = "WARNING: Server is accessible on the network. Sending password over plain HTTP is insecure. Use HTTPS (e.g., via reverse proxy) for non-local access."
if not is_json_request:
err_messages.append(security_warning)
if post_string:
from io import BytesIO
fs = cgi.FieldStorage(
fp=BytesIO(post_string),
headers=self.headers,
environ={
"REQUEST_METHOD": "POST",
"CONTENT_TYPE": self.headers["Content-Type"],
},
)
password = fs.getvalue("password")
password = None
if is_json_request:
try:
json_data = json.loads(post_string.decode("utf-8"))
password = json_data.get("password")
except Exception as e:
swap_client.log.error(f"Error parsing JSON login data: {e}")
else:
try:
form_data = parse.parse_qs(post_string.decode("utf-8"))
password = form_data.get("password", [None])[0]
except Exception as e:
swap_client.log.error(f"Error parsing form login data: {e}")
client_auth_hash = swap_client.settings.get("client_auth_hash")
if (
client_auth_hash
and password
and password is not None
and verify_rfc2440_password(client_auth_hash, password)
):
session_id = secrets.token_urlsafe(32)
@ -354,17 +361,34 @@ class HttpHandler(BaseHTTPRequestHandler):
self.server.active_sessions[session_id] = {"expires": expires}
cookie_header = self._set_session_cookie(session_id)
self.send_response(302)
self.send_header("Location", "/offers")
self.send_header(cookie_header[0], cookie_header[1])
self.end_headers()
return b""
if is_json_request:
response_data = {"success": True, "session_id": session_id}
if security_warning:
response_data["warning"] = security_warning
self.putHeaders(
200, "application/json", extra_headers=[cookie_header]
)
return json.dumps(response_data).encode("utf-8")
else:
self.send_response(302)
self.send_header("Location", "/offers")
self.send_header(cookie_header[0], cookie_header[1])
self.end_headers()
return b""
else:
err_messages.append("Invalid password.")
clear_cookie_header = self._clear_session_cookie()
extra_headers.append(clear_cookie_header)
if is_json_request:
self.putHeaders(401, "application/json")
return json.dumps({"error": "Invalid password"}).encode("utf-8")
else:
err_messages.append("Invalid password.")
clear_cookie_header = self._clear_session_cookie()
extra_headers.append(clear_cookie_header)
if swap_client.settings.get("client_auth_hash") and self.is_authenticated():
if (
not is_json_request
and swap_client.settings.get("client_auth_hash")
and self.is_authenticated()
):
self.send_response(302)
self.send_header("Location", "/offers")
self.end_headers()
@ -379,7 +403,7 @@ class HttpHandler(BaseHTTPRequestHandler):
"encrypted": False,
"locked": False,
},
status_code=401 if post_string and err_messages else 200,
status_code=401 if post_string and not is_json_request else 200,
extra_headers=extra_headers,
)
@ -663,13 +687,13 @@ class HttpHandler(BaseHTTPRequestHandler):
if page == "json":
try:
self.putHeaders(status_code, "text/plain")
self.putHeaders(status_code, "json")
func = js_url_to_function(url_split)
return func(self, url_split, post_string, is_json)
except Exception as ex:
if swap_client.debug is True:
swap_client.log.error(traceback.format_exc())
self.putHeaders(500, "text/plain")
self.putHeaders(500, "json")
return js_error(self, str(ex))
if page == "static":
@ -856,7 +880,12 @@ class HttpThread(threading.Thread, HTTPServer):
self.swap_client.log.info("HTTP server stopped.")
def run(self):
self.swap_client.log.info(
f"Starting HTTP server on {self.host_name}:{self.port_no}"
log_msg = f"Starting HTTP server on {self.host_name}:{self.port_no}"
if self.host_name not in ("127.0.0.1", "localhost"):
log_msg += " - WARNING: Server is accessible on the network. Ensure HTTPS is configured (e.g., via reverse proxy) if handling sensitive data like passwords over non-local connections."
(
self.swap_client.log.warning(log_msg)
if self.host_name not in ("127.0.0.1", "localhost")
else self.swap_client.log.info(log_msg)
)
self.serve_forever()