155 lines
4.1 KiB
Python
155 lines
4.1 KiB
Python
import discum
|
|
from typing import List
|
|
import webbrowser
|
|
import re
|
|
from bs4 import BeautifulSoup
|
|
import requests
|
|
from dotenv import load_dotenv
|
|
from os import getenv
|
|
|
|
PRICE_REGEX = re.compile(
|
|
"(?:(?P<currency>[GBP|EUR|£|€])(?P<price>[0-9]+(?:\.[0-9]{1,2})))")
|
|
MODEL_REGEX = re.compile("[Rr][Tt][Xx] ?(?P<model>30[6789]0( [Tt][Ii])?).?")
|
|
URL_REGEX = re.compile(
|
|
"(?:(?:https?|ftp):\/\/|\b(?:[a-z\d]+\.))(?:(?:[^\s()<>]+|\((?:[^\s()<>]+|(?:\([^\s()<>]+\)))?\))+(?:\((?:[^\s()<>]+|(?:\(?:[^\s()<>]+\)))?\)|[^\s`!()\[\]{};:'.,<>?«»“”‘’]))?")
|
|
|
|
PARTALERT_ASIN = re.compile("asin=(?P<asin>[0-9a-zA-Z]{1,10})")
|
|
PARTALERT_TLD = re.compile("tld=(?P<tld>\.(?:it|es|de|fr|co\.uk))")
|
|
|
|
MONITORED_CHANNELS = [802674527850725377, 802674601519611925, 755728127069519913,
|
|
802674384120446996, 755727814912901170, 783425011116539964,
|
|
783682409635250187, 761002102804709448, 802674800786145311,
|
|
758224323843850250, 760577787332788315, 802674552541806662,
|
|
706910639959834738, 802674584473567303, 839507735531749446,
|
|
833700313819119676, 713321461124694056, 721009876893040682]
|
|
|
|
load_dotenv()
|
|
|
|
# load env vars from .env file
|
|
token = getenv("TOKEN")
|
|
|
|
if not token:
|
|
print("Could not load environmental variables. Make sure TOKEN is set in .env")
|
|
exit(0)
|
|
|
|
|
|
bot = discum.Client(token=token, log=False)
|
|
|
|
########################################
|
|
# Callbacks
|
|
########################################
|
|
|
|
|
|
def get_soup(url: str):
|
|
r = requests.get(url)
|
|
return BeautifulSoup(r.text, features="lxml")
|
|
|
|
|
|
def get_stockinformer_url(url: str) -> str:
|
|
bs = get_soup(url)
|
|
|
|
for a in bs.find_all("a"):
|
|
if "view at" in a.text.lower():
|
|
return f"https://stockinformer.co.uk/{a.get('href')}"
|
|
return None
|
|
|
|
|
|
def get_partalert_url(url: str) -> str:
|
|
ret_url = None
|
|
bs = get_soup(url)
|
|
|
|
for a in bs.find_all("a"):
|
|
if "amazon" in a.text.lower():
|
|
# remove tags and referrals
|
|
amazon_url = a.get("href").split("?")[0]
|
|
|
|
ret_url = amazon_url
|
|
|
|
return ret_url
|
|
|
|
|
|
def check_urls(urls: List[str]):
|
|
for url in urls:
|
|
print(f"Received {url}")
|
|
|
|
if "partalert" in url:
|
|
url = get_partalert_url(url)
|
|
|
|
if not url:
|
|
continue
|
|
elif "stockinformer" in url:
|
|
url = get_stockinformer_url(url)
|
|
|
|
if not url:
|
|
continue
|
|
|
|
print(f'Opening {url}')
|
|
webbrowser.open(url)
|
|
|
|
|
|
def check_price(message: str) -> bool:
|
|
prices = {
|
|
"3060 ti": {"gbp": 500, "eur": 600},
|
|
"3070": {"gbp": 680, "eur": 850},
|
|
"3080": {"gbp": 880, "eur": 1000},
|
|
"3090": {"gbp": 1450, "eur": 1650}
|
|
}
|
|
|
|
try:
|
|
price = float(PRICE_REGEX.search(message).group("price"))
|
|
currency = PRICE_REGEX.search(message).group("currency")
|
|
model = MODEL_REGEX.search(message).group("model").lower()
|
|
|
|
if currency == "£":
|
|
currency = "gbp"
|
|
elif currency == "€":
|
|
currency = "eur"
|
|
else:
|
|
currency = currency.lower()
|
|
|
|
if model == "3060":
|
|
return False
|
|
|
|
print(f"Model: {model}, Price: {currency}{price:0.2f}")
|
|
if price <= prices[model][currency]:
|
|
return True
|
|
|
|
except Exception as e:
|
|
return False
|
|
|
|
return False
|
|
|
|
|
|
@bot.gateway.command
|
|
def on_message(resp):
|
|
urls = []
|
|
|
|
if resp.event.ready_supplemental:
|
|
bot.gateway.subscribeToGuildEvents(wait=1)
|
|
|
|
if resp.event.message:
|
|
m = resp.parsed.auto()
|
|
|
|
channel_id = int(m['channel_id'])
|
|
content = m['content']
|
|
embeds = m['embeds']
|
|
|
|
if channel_id in MONITORED_CHANNELS:
|
|
# search for urls in message text
|
|
urls.extend(URL_REGEX.findall(content))
|
|
|
|
# search for urls in embeds
|
|
for e in embeds:
|
|
for f in [x['value'] for x in e['fields']]:
|
|
urls.extend(URL_REGEX.findall(f))
|
|
|
|
if not urls:
|
|
print(m)
|
|
|
|
if (urls):
|
|
check_urls(urls)
|
|
|
|
|
|
print("Initialized.")
|
|
bot.gateway.run(auto_reconnect=True)
|