import fastapi from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import PlainTextResponse from apscheduler.schedulers.background import BackgroundScheduler import onesignal from onesignal.model.rate_limit_error import RateLimitError from onesignal.model.generic_error import GenericError from onesignal.model.notification import Notification from onesignal.model.create_notification_success_response import CreateNotificationSuccessResponse from onesignal.api import default_api import sqlite3 from typing import Annotated import datetime from pydantic import BaseModel import secrets from enum import Enum, IntEnum from dotenv import load_dotenv from os import getenv import yaml # ## API, db, and scheduler initialisation app = fastapi.FastAPI(title="Victoria Hall LaundryWeb", description="LaundryWeb Backend API", version="0.1") conn = sqlite3.connect("db.db", check_same_thread=False) cursor = conn.cursor() scheduler = BackgroundScheduler() scheduler.start() origins = [ "http://localhost", "http://localhost:998", "http://localhost:5173", "http://127.0.0.1", "http://127.0.0.1:998", "http://127.0.0.1:5173", "https://laundryweb.altafcreator.com" ] app.add_middleware( CORSMiddleware, allow_origins=origins, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) cursor.execute(""" CREATE TABLE IF NOT EXISTS timers ( timer_id INTEGER PRIMARY KEY, user_id VARCHAR(64) NOT NULL, start_time TEXT NOT NULL, end_time TEXT NOT NULL, block INT NOT NULL, machine INT NOT NULL, status TEXT NOT NULL CHECK(status IN ('RUNNING', 'FINISHED')), subscription_id TEXT NOT NULL );""") # block is either 1 or 2, machine (1-4), odd is dryer, even is machine. class RowIndices(IntEnum): TIMER_ID = 0, USER_ID = 1, START_TIME = 2, END_TIME = 3, BLOCK = 4, MACHINE = 5, STATUS = 6, SUBSCRIPTION_ID = 7 # ## yaml configuration initialisation qr_uri = {} stream = open("config.yaml", 'r') yaml_dict = yaml.load(stream, yaml.Loader) # inverting the key-value pair for the dict to act as a map # easy matching for the users' scanned obscure uri and the supposed for k, v in yaml_dict["qr_uri"]["h1"].items(): qr_uri[v] = f"h1-{k}" for k, v in yaml_dict["qr_uri"]["h2"].items(): qr_uri[v] = f"h2-{k}" print("config.yaml loaded, qr_uri:") for k, v in qr_uri.items(): print(k, v) # ## onesignal initialisation load_dotenv() onesignal_configuration = onesignal.Configuration( rest_api_key=getenv("REST_API_KEY"), organization_api_key=getenv("ORGANIZATION_API_KEY"), ) api_client = onesignal.ApiClient(onesignal_configuration) api_instance = default_api.DefaultApi(api_client) ONESIGNAL_APP_ID = "83901cc7-d964-475a-90ec-9f854df4ba52" # ## class / data struct definitions class RequestData(BaseModel): duration: int machine_id: str onesignal_subscription_id: str class InformationRequestData(BaseModel): machine_id: str class BlockRequestData(BaseModel): block: int class FinishRequestData(BaseModel): id: int class Status(Enum): EMPTY = 0, FINISHED = 1, RUNNING = 2, OUTOFSERVICE = 3, URI_TO_MACHINES = { "h1-status": [1, None], "h1-dryer1": [1, 1], "h1-washer1": [1, 2], "h1-dryer2": [1, 3], "h1-washer2": [1, 4], "h2-status": [2, None], "h2-dryer1": [2, 1], "h2-washer1": [2, 2], "h2-dryer2": [2, 3], "h2-washer2": [2, 4], } # ## global vars for user-end machine_status = [[Status.EMPTY.name, Status.EMPTY.name, Status.EMPTY.name, Status.EMPTY.name], [Status.EMPTY.name, Status.EMPTY.name, Status.EMPTY.name, Status.EMPTY.name]] machine_times = [[None, None, None, None], [None, None, None, None]] machine_endings = [[None, None, None, None], [None, None, None, None]] # ## some non-api endpoint method definitions # this method checks for any entry, and starts the previously-terminated schedules # useful if you're restarting the server def restart_terminated_schedules(): cursor.execute("SELECT * FROM timers;") out = cursor.fetchall() print("unfinished timers: " + str(len(out))) for row in out: print(row) end_date = datetime.datetime.fromisoformat(row[RowIndices.END_TIME]) now = datetime.datetime.now() timer_id = row[RowIndices.TIMER_ID] if now > end_date: print("unfinished timer was long gone", timer_id) scheduler.add_job(final_timer_finished, 'date', run_date=(now + datetime.timedelta(seconds=1)), id=str(timer_id), args=[timer_id]) elif now + datetime.timedelta(minutes=5) > end_date: print("unfinished timer ends in less than five mins", timer_id) scheduler.add_job(final_timer_finished, 'date', run_date=end_date, id=str(timer_id), args=[timer_id]) else: print("unfinished timer scheduler started", timer_id) scheduler.add_job(reminder_timer_finished, 'date', run_date=end_date - datetime.timedelta(minutes=5), id=str(timer_id), args=[timer_id]) print("setting internal array information") machine_status[row[RowIndices.BLOCK] - 1][row[RowIndices.MACHINE] - 1] = Status.RUNNING.name machine_times[row[RowIndices.BLOCK] - 1][row[RowIndices.MACHINE] - 1] = row[2] machine_endings[row[RowIndices.BLOCK] - 1][row[RowIndices.MACHINE] - 1] = row[3] print(machine_status, machine_times, machine_endings) def reminder_timer_finished(timer_id): print("timer almost finished", timer_id) cursor.execute(f"SELECT * FROM timers WHERE timer_id = '{timer_id}'") out = cursor.fetchall() scheduler.add_job(final_timer_finished, 'date', run_date=out[0][RowIndices.END_TIME], id=str(timer_id), args=[timer_id]) notification = Notification(app_id=ONESIGNAL_APP_ID, include_subscription_ids=[out[0][RowIndices.SUBSCRIPTION_ID]], contents={'en': 'get ready to get your bloody laundry'}, headings={'en': 'Laundry Reminder'}, priority=10) try: api_response = api_instance.create_notification(notification) except Exception as e: print(e) def final_timer_finished(timer_id): print("timer finished!1", timer_id) cursor.execute(f"SELECT * FROM timers WHERE timer_id = '{timer_id}'") out = cursor.fetchall() notification = Notification(app_id=ONESIGNAL_APP_ID, include_subscription_ids=[out[0][RowIndices.SUBSCRIPTION_ID]], contents={'en': 'get your bloody laundry'}, headings={'en': 'Laundry Finished'}, priority=10) try: api_response = api_instance.create_notification(notification) except Exception as e: print(e) for row in out: machine_status[row[RowIndices.BLOCK] - 1][row[RowIndices.MACHINE] - 1] = Status.FINISHED.name # sec min hrs days COOKIE_MAX_AGE = 60 * 60 * 24 * 30 # 30 days def create_session(response: fastapi.Response): cookie = secrets.token_hex(32) response.set_cookie(key="session_key", value=cookie, secure=True, max_age=COOKIE_MAX_AGE) return cookie def authenticate_block(response: fastapi.Response, machine_id: str = None, block: int = None): if machine_id: blk = URI_TO_MACHINES[qr_uri[machine_id]][0] response.set_cookie(key="auth_block", value=blk, secure=True, max_age=COOKIE_MAX_AGE) return blk elif block: blk = block response.set_cookie(key="auth_block", value=blk, secure=True, max_age=COOKIE_MAX_AGE) return block else: return "FAIL" # ## beginning print("Hello, world!") restart_terminated_schedules() # ## api endpoints # --- starting new timer # eugh. too complex. TODO: refactor perhaps # it's so complex even the linter is complaining @app.post("/start", response_class=PlainTextResponse) def start_new_timer(data: RequestData, response: fastapi.Response, session_key: Annotated[str | None, fastapi.Cookie()] = None, auth_block: Annotated[str | None, fastapi.Cookie()] = None): now = datetime.datetime.now() try: if not session_key: print("no session key, creating.") session_key = create_session(response) except Exception as exception: print("err @ key creation //", exception) response.status_code = fastapi.status.HTTP_500_INTERNAL_SERVER_ERROR return "something went wrong during session key creation" try: block = URI_TO_MACHINES[qr_uri[data.machine_id]][0] machine = URI_TO_MACHINES[qr_uri[data.machine_id]][1] except KeyError: response.status_code = fastapi.status.HTTP_401_UNAUTHORIZED return "invalid uri; you are unauthorised" if auth_block: if str(auth_block) != str(block): response.status_code = fastapi.status.HTTP_403_FORBIDDEN return "mismatch in block authentication cookie and requested block machine. forbidden." else: authenticate_block(response, machine_id=data.machine_id) try: print("session key valid", session_key) end_date = now + datetime.timedelta(minutes=(data.duration * 30)) cursor.execute(f""" INSERT INTO timers (user_id, start_time, end_time, block, machine, status, subscription_id) VALUES ('{session_key}', '{now.isoformat()}', '{end_date.isoformat()}', {block}, {machine}, 'RUNNING', '{data.onesignal_subscription_id}') """) conn.commit() cursor.execute("SELECT * FROM timers;") out = cursor.fetchall() for row in out: print(row) timer_id = str(out[len(out) - 1][0]) print("timer id", timer_id) except Exception as exception: print("err @ sql stuff //", exception) response.status_code = fastapi.status.HTTP_500_INTERNAL_SERVER_ERROR return "something went wrong during sql stuff" try: scheduler.add_job(reminder_timer_finished, 'date', run_date=end_date - datetime.timedelta(minutes=5), id=timer_id, args=[timer_id]) except Exception as exception: print("err @ scheduler //", exception) response.status_code = fastapi.status.HTTP_500_INTERNAL_SERVER_ERROR return "something went wrong during scheduler stuff" try: machine_status[block - 1][machine - 1] = Status.RUNNING.name machine_times[block - 1][machine - 1] = now.isoformat() machine_endings[block - 1][machine - 1] = end_date except Exception as exception: print("err @ machine_status //", exception) response.status_code = fastapi.status.HTTP_500_INTERNAL_SERVER_ERROR return "something went wrong during machine_status setting somehow" # HTTP 200 return "all good bro timer started" # --- check whether user has laundry or not @app.post("/check", response_class=PlainTextResponse) def check_status(response: fastapi.Response, session_key: Annotated[str | None, fastapi.Cookie()] = None): if not session_key: print("no session key, creating.") session_key = create_session(response) cursor.execute(f"SELECT * FROM timers WHERE user_id = '{session_key}'") out = cursor.fetchall() for row in out: print(row) if len(out) > 0: return "you got laundry" else: return "no got laundry" # --- fetch machine status for block @app.post("/status") def get_machine_status(response: fastapi.Response, auth_block: Annotated[str | None, fastapi.Cookie()] = None): if auth_block: block = int(auth_block) - 1 return [machine_status[block], machine_times[block], machine_endings[block]] else: response.status_code = fastapi.status.HTTP_401_UNAUTHORIZED return "block cookie needed. unauthorised" # --- get laundr(y/ies) information of user @app.post("/laundry") def get_laundry_info(response: fastapi.Response, session_key: Annotated[str | None, fastapi.Cookie()] = None, auth_block: Annotated[str | None, fastapi.Cookie()] = None): if session_key: result = [] cursor.execute(f"SELECT * FROM timers WHERE user_id = '{session_key}'") out = cursor.fetchall() for row in out: curr_timer = { "end_time": 0, "start_time": 0, "machine": 0, "status": "RUNNING", "id": -1, } curr_timer["end_time"] = row[RowIndices.END_TIME] curr_timer["start_time"] = row[RowIndices.START_TIME] curr_timer["machine"] = row[RowIndices.MACHINE] curr_timer["status"] = row[RowIndices.STATUS] curr_timer["id"] = row[RowIndices.TIMER_ID] result.append(curr_timer) return result else: response.status_code = fastapi.status.HTTP_401_UNAUTHORIZED return "you got no session key cookie how am i supposed to identify you" # --- finish one's laundry @app.post("/finish", response_class=PlainTextResponse) def finish_laundry(data: FinishRequestData, response: fastapi.Response, session_key: Annotated[str | None, fastapi.Cookie()] = None): if session_key: cursor.execute(f"SELECT * FROM timers WHERE timer_id = '{data.id}'") row = cursor.fetchall()[0] if datetime.datetime.now() < datetime.datetime.fromisoformat(row[RowIndices.END_TIME]): response.status_code = fastapi.status.HTTP_400_BAD_REQUEST return "timer has not finished yet" machine_status[row[RowIndices.BLOCK] - 1][row[RowIndices.MACHINE] - 1] = Status.EMPTY.name machine_times[row[RowIndices.BLOCK] - 1][row[RowIndices.MACHINE] - 1] = None machine_endings[row[RowIndices.BLOCK] - 1][row[RowIndices.MACHINE] - 1] = None cursor.execute(f"DELETE FROM timers WHERE timer_id = {row[0]}") conn.commit() print(f"timer of id {data.id} has been finished by {session_key}") return "laundry finished" if session_key != row[1]: response.status_code = fastapi.status.HTTP_403_FORBIDDEN return "session key mismatch with timer id, dubious!" else: response.status_code = fastapi.status.HTTP_401_UNAUTHORIZED return "you got no session key, cannot" # --- get information from uri search query @app.post("/info") def uri_to_information(data: InformationRequestData, response: fastapi.Response, auth_block: Annotated[str | None, fastapi.Cookie()] = None): info = None try: if len(data.machine_id) > 0: info = URI_TO_MACHINES[qr_uri[data.machine_id]] except KeyError: response.status_code = fastapi.status.HTTP_404_NOT_FOUND return "INVALID machine ID/URI" print(auth_block) if auth_block: if info: if str(auth_block) != str(info[0]): response.status_code = fastapi.status.HTTP_403_FORBIDDEN return "UNAUTHORISED to view information of block " + str(info[0]) else: info = [auth_block, None] else: if info: authenticate_block(response, block=info[0]) else: response.status_code = fastapi.status.HTTP_401_UNAUTHORIZED return "NO INFORMATION PROVIDED. NO AUTH COOKIE." return {"block": info[0], "machine": info[1]}