check for links in embeds, updated partalert link checker

This commit is contained in:
Giulio De Pasquale 2021-06-21 02:35:27 +01:00
parent 67bec0508e
commit 6be34d5305

62
app.py
View File

@ -10,7 +10,8 @@ from os import getenv
PRICE_REGEX = re.compile( PRICE_REGEX = re.compile(
"(?:(?P<currency>[GBP|EUR|£|€])(?P<price>[0-9]+(?:\.[0-9]{1,2})))") "(?:(?P<currency>[GBP|EUR|£|€])(?P<price>[0-9]+(?:\.[0-9]{1,2})))")
MODEL_REGEX = re.compile("[Rr][Tt][Xx] ?(?P<model>30[6789]0( [Tt][Ii])?).?") MODEL_REGEX = re.compile("[Rr][Tt][Xx] ?(?P<model>30[6789]0( [Tt][Ii])?).?")
URL_REGEX = re.compile("(?:(?:https?|ftp):\/\/|\b(?:[a-z\d]+\.))(?:(?:[^\s()<>]+|\((?:[^\s()<>]+|(?:\([^\s()<>]+\)))?\))+(?:\((?:[^\s()<>]+|(?:\(?:[^\s()<>]+\)))?\)|[^\s`!()\[\]{};:'.,<>?«»“”‘’]))?") URL_REGEX = re.compile(
"(?:(?:https?|ftp):\/\/|\b(?:[a-z\d]+\.))(?:(?:[^\s()<>]+|\((?:[^\s()<>]+|(?:\([^\s()<>]+\)))?\))+(?:\((?:[^\s()<>]+|(?:\(?:[^\s()<>]+\)))?\)|[^\s`!()\[\]{};:'.,<>?«»“”‘’]))?")
PARTALERT_ASIN = re.compile("asin=(?P<asin>[0-9a-zA-Z]{1,10})") PARTALERT_ASIN = re.compile("asin=(?P<asin>[0-9a-zA-Z]{1,10})")
PARTALERT_TLD = re.compile("tld=(?P<tld>\.(?:it|es|de|fr|co\.uk))") PARTALERT_TLD = re.compile("tld=(?P<tld>\.(?:it|es|de|fr|co\.uk))")
@ -39,28 +40,47 @@ bot = discum.Client(token=token, log={"console": False, "file": False})
######################################## ########################################
def get_stockinformer_url(url: str) -> str: def get_soup(url: str):
r = requests.get(url) r = requests.get(url)
b = BeautifulSoup(r.text) return BeautifulSoup(r.text)
for a in b.find_all("a"):
def get_stockinformer_url(url: str) -> str:
bs = get_soup(url)
for a in bs.find_all("a"):
if "view at" in a.text.lower(): if "view at" in a.text.lower():
return f"https://stockinformer.co.uk/{a.get('href')}" return f"https://stockinformer.co.uk/{a.get('href')}"
return None return None
def get_partalert_url(url: str) -> str:
ret_url = None
bs = get_soup(url)
for a in bs.find_all("a"):
if "amazon" in a.text.lower():
amazon_url = a.get("href")
try:
asin = PARTALERT_ASIN.search(amazon_url).group('asin')
tld = PARTALERT_TLD.search(amazon_url).group('tld')
ret_url = f"https://amazon{tld}/dp/{asin}"
except Exception as e:
print(f"Exception: {e}")
return ret_url
def check_urls(urls: List[str]): def check_urls(urls: List[str]):
for url in urls: for url in urls:
print(f"Received {url}") print(f"Received {url}")
if "partalert" in url: if "partalert" in url:
try: url = get_partalert_url(url)
asin = PARTALERT_ASIN.search(url).group('asin')
tld = PARTALERT_TLD.search(url).group('tld')
url = f"https://amazon{tld}/dp/{asin}" if not url:
except Exception as e:
print(f"Exception: {e}")
continue continue
elif "stockinformer" in url: elif "stockinformer" in url:
url = get_stockinformer_url(url) url = get_stockinformer_url(url)
@ -108,25 +128,29 @@ def check_price(message: str) -> bool:
@bot.gateway.command @bot.gateway.command
def on_message(resp): def on_message(resp):
urls = []
if resp.event.ready_supplemental: if resp.event.ready_supplemental:
bot.gateway.subscribeToGuildEvents(wait=1) bot.gateway.subscribeToGuildEvents(wait=1)
if resp.event.message: if resp.event.message:
m = resp.parsed.auto() m = resp.parsed.auto()
# because DMs are technically channels too
guildID = m['guild_id'] if 'guild_id' in m else None
channelID = int(m['channel_id'])
username = m['author']['username']
discriminator = m['author']['discriminator']
content = m['content']
if channelID in MONITORED_CHANNELS: channel_id = int(m['channel_id'])
urls = URL_REGEX.findall(content) content = m['content']
embeds = m['embeds']
if channel_id in MONITORED_CHANNELS:
# search for urls in message text
urls.append(URL_REGEX.findall(content))
# search for urls in embeds
for e in embeds:
urls.append(URL_REGEX.findall(e['value']))
if (urls): if (urls):
check_urls(urls) check_urls(urls)
print("Initialized.") print("Initialized.")
bot.gateway.run(auto_reconnect=True) bot.gateway.run(auto_reconnect=True)