import os
from copy import deepcopy
from multiprocessing.pool import ThreadPool
from pathlib import Path
from typing import Optional, Any, List, cast, Union
from urllib.parse import urlparse
import pandas as pd
import requests
from tqdm import tqdm
from cryptocompsdk.logger import logger
from cryptocompsdk.response import ResponseException, ResponseAPIBase
from cryptocompsdk.general.parse import from_int, from_none, from_union, from_float, from_str, to_float, from_bool, \
from_dict, to_class, is_type, from_int_or_str, from_na, from_str_number, from_list, from_stringified_bool, \
from_plain_dict
[docs]class SourceInfo:
name: Optional[str]
lang: Optional[str]
img: Optional[str]
[docs] def __init__(self, name: Optional[str], lang: Optional[str], img: Optional[str]) -> None:
self.name = name
self.lang = lang
self.img = img
[docs] @staticmethod
def from_dict(obj: Any) -> 'SourceInfo':
assert isinstance(obj, dict)
name = from_union([from_str, from_none], obj.get("name"))
lang = from_union([from_str, from_none], obj.get("lang"))
img = from_union([from_str, from_none], obj.get("img"))
return SourceInfo(name, lang, img)
[docs] def to_dict(self) -> dict:
result: dict = {}
result["name"] = from_union([from_str, from_none], self.name)
result["lang"] = from_union([from_str, from_none], self.lang)
result["img"] = from_union([from_str, from_none], self.img)
return result
[docs]class NewsRecord:
id: Optional[int]
guid: Optional[str]
published_on: Optional[int]
imageurl: Optional[str]
title: Optional[str]
url: Optional[str]
source: Optional[str]
body: Optional[str]
tags: Optional[str]
categories: Optional[str]
upvotes: Optional[int]
downvotes: Optional[int]
lang: Optional[str]
source_info: Optional[SourceInfo]
[docs] def __init__(self, id: Optional[int], guid: Optional[str], published_on: Optional[int], imageurl: Optional[str], title: Optional[str], url: Optional[str], source: Optional[str], body: Optional[str], tags: Optional[str], categories: Optional[str], upvotes: Optional[int], downvotes: Optional[int], lang: Optional[str], source_info: Optional[SourceInfo]) -> None:
self.id = id
self.guid = guid
self.published_on = published_on
self.imageurl = imageurl
self.title = title
self.url = url
self.source = source
self.body = body
self.tags = tags
self.categories = categories
self.upvotes = upvotes
self.downvotes = downvotes
self.lang = lang
self.source_info = source_info
[docs] @staticmethod
def from_dict(obj: Any) -> 'NewsRecord':
assert isinstance(obj, dict)
id = from_union([from_none, lambda x: int(from_str(x))], obj.get("id"))
guid = from_union([from_str, from_none], obj.get("guid"))
published_on = from_union([from_int, from_none], obj.get("published_on"))
imageurl = from_union([from_str, from_none], obj.get("imageurl"))
title = from_union([from_str, from_none], obj.get("title"))
url = from_union([from_str, from_none], obj.get("url"))
source = from_union([from_str, from_none], obj.get("source"))
body = from_union([from_str, from_none], obj.get("body"))
tags = from_union([from_str, from_none], obj.get("tags"))
categories = from_union([from_str, from_none], obj.get("categories"))
upvotes = from_union([from_none, lambda x: int(from_str(x))], obj.get("upvotes"))
downvotes = from_union([from_none, lambda x: int(from_str(x))], obj.get("downvotes"))
lang = from_union([from_str, from_none], obj.get("lang"))
source_info = from_union([SourceInfo.from_dict, from_none], obj.get("source_info"))
return NewsRecord(id, guid, published_on, imageurl, title, url, source, body, tags, categories, upvotes, downvotes, lang, source_info)
[docs] def to_dict(self) -> dict:
result: dict = {}
result["id"] = from_union([lambda x: from_none((lambda x: is_type(type(None), x))(x)), lambda x: from_str((lambda x: str((lambda x: is_type(int, x))(x)))(x))], self.id)
result["guid"] = from_union([from_str, from_none], self.guid)
result["published_on"] = from_union([from_int, from_none], self.published_on)
result["imageurl"] = from_union([from_str, from_none], self.imageurl)
result["title"] = from_union([from_str, from_none], self.title)
result["url"] = from_union([from_str, from_none], self.url)
result["source"] = from_union([from_str, from_none], self.source)
result["body"] = from_union([from_str, from_none], self.body)
result["tags"] = from_union([from_str, from_none], self.tags)
result["categories"] = from_union([from_str, from_none], self.categories)
result["upvotes"] = from_union([lambda x: from_none((lambda x: is_type(type(None), x))(x)), lambda x: from_str((lambda x: str((lambda x: is_type(int, x))(x)))(x))], self.upvotes)
result["downvotes"] = from_union([lambda x: from_none((lambda x: is_type(type(None), x))(x)), lambda x: from_str((lambda x: str((lambda x: is_type(int, x))(x)))(x))], self.downvotes)
result["lang"] = from_union([from_str, from_none], self.lang)
result["source_info"] = from_union([lambda x: to_class(SourceInfo, x), from_none], self.source_info)
return result
@property
def is_empty(self) -> bool:
is_empty_cols = [
'id',
'guid',
'published_on',
'imageurl',
'title',
'url',
'source',
'body',
'tags',
'categories',
'upvotes',
'downvotes',
'lang',
'source_info',
]
for col in is_empty_cols:
if getattr(self, col) != 0:
return False
return True
[docs] def download_article(self, use_alt_url: bool = False) -> str:
"""
Download and return the HTML of this news article
:param use_alt_url: The default is to use the url given in the guid attribute,
if True then use url attribute instead of guid
:return:
"""
if not use_alt_url:
url = self.guid
alt_url = self.url
else:
url = self.url
alt_url = None
if pd.isnull(url):
valid_url = False
else:
parsed = urlparse(url)
valid_url = bool(parsed.scheme and parsed.netloc)
if not valid_url:
if alt_url is not None:
return self.download_article(use_alt_url=True)
raise NoValidNewsURLException(f'Url {url} is invalid')
headers = {
'user-agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.106 Safari/537.36'
}
if url is None:
# Should not happen, for typing purposes
raise ValueError('must provide a url')
resp = requests.get(url, headers=headers)
status_code = resp.status_code
text = resp.text
if alt_url is not None and status_code != 200:
return self.download_article(use_alt_url=True)
elif status_code != 200:
raise InvalidNewsResponseException(f'Got status code {status_code} for request to {url}. Response: {text}')
return text
[docs]class RateLimit:
pass
[docs] def __init__(self, ) -> None:
pass
[docs] @staticmethod
def from_dict(obj: Any) -> 'RateLimit':
assert isinstance(obj, dict)
return RateLimit()
[docs] def to_dict(self) -> dict:
result: dict = {}
return result
[docs]class NewsData(ResponseAPIBase):
type: Optional[int]
message: Optional[str]
promoted: Optional[List[Any]]
data: Optional[List[NewsRecord]]
rate_limit: Optional[RateLimit]
has_warning: Optional[bool]
[docs] def __init__(self, type: Optional[int], message: Optional[str], promoted: Optional[List[Any]], data: Optional[List[NewsRecord]], rate_limit: Optional[RateLimit], has_warning: Optional[bool]) -> None:
self.type = type
self.message = message
self.promoted = promoted
self.data = data
self.rate_limit = rate_limit
self.has_warning = has_warning
[docs] @staticmethod
def from_dict(obj: Any) -> 'NewsData':
assert isinstance(obj, dict)
type = from_union([from_int, from_none], obj.get("Type"))
message = from_union([from_str, from_none], obj.get("Message"))
promoted = from_union([lambda x: from_list(lambda x: x, x), from_none], obj.get("Promoted"))
data = from_union([lambda x: from_list(NewsRecord.from_dict, x), from_none], obj.get("Data"))
rate_limit = from_union([RateLimit.from_dict, from_none], obj.get("RateLimit"))
has_warning = from_union([from_bool, from_none], obj.get("HasWarning"))
return NewsData(type, message, promoted, data, rate_limit, has_warning)
[docs] def to_dict(self) -> dict:
result: dict = {}
result["Type"] = from_union([from_int, from_none], self.type)
result["Message"] = from_union([from_str, from_none], self.message)
result["Promoted"] = from_union([lambda x: from_list(lambda x: x, x), from_none], self.promoted)
result["Data"] = from_union([lambda x: from_list(lambda x: to_class(NewsRecord, x), x), from_none], self.data)
result["RateLimit"] = from_union([lambda x: to_class(RateLimit, x), from_none], self.rate_limit)
result["HasWarning"] = from_union([from_bool, from_none], self.has_warning)
return result
[docs] def to_df(self) -> pd.DataFrame:
if not self.data:
return pd.DataFrame()
df = pd.DataFrame(self.to_dict()['Data'])
df['published_on'] = df['published_on'].apply(pd.Timestamp.fromtimestamp)
all_sources = []
for record in self.data:
si = record.source_info
if si is not None:
source_series = pd.Series(si.to_dict())
all_sources.append(source_series)
source_df = pd.concat(all_sources, axis=1).T
df.drop('source_info', axis=1, inplace=True)
new_cols = [col for col in source_df.columns if col not in df.columns]
df = pd.concat([df, source_df[new_cols]], axis=1)
return df
@property
def has_error(self) -> bool:
# No response object in this API
return self.message != 'News list successfully returned'
# Pagination methods
@property
def is_empty(self) -> bool:
if self.data is None:
return True
for record in self.data:
if not record.is_empty:
return False
return True
def __add__(self, other):
out_obj = deepcopy(self)
out_obj.data += other.data
return out_obj
def __radd__(self, other):
out_obj = deepcopy(other)
out_obj.data += self.data
return out_obj
@property
def time_from(self) -> int:
if self.data is None:
raise ValueError('cannot determine time from as there is no data')
times = [record.published_on for record in self.data if record.published_on is not None]
if not times:
raise ValueError('could not calculate time from as there is no data')
min_times = min(times)
min_times = cast(int, min_times) # for mypy
return min_times
[docs] def delete_record_matching_time(self, time: int):
# not a problem with this API, no overlapping time
pass
[docs] def trim_empty_records_at_beginning(self):
# Earliest records are at the end of data
# Delete, starting from end, oldest record
for i, record in reversed(list(enumerate(self.data))):
if record.is_empty:
del self.data[i]
else:
# First non-empty record from end, we have now hit the actual data section, stop deleting
break
[docs] def download_articles(self, out_folder: Union[str, Path] = 'articles', num_threads: int = 20,
restart: bool = False):
"""
Download the HTML of all news articles in this collection and save to files in a folder
:param out_folder: Where to save the articles
:param num_threads: How many concurrent requests to execute
:param restart: False to not download where an article already exists, True to re-download in that case
:return:
"""
if self.data is None:
raise ValueError('Cannot download articles as the data attribute is None')
out_folder = Path(out_folder)
if not os.path.exists(out_folder):
os.makedirs(out_folder)
with ThreadPool(num_threads) as pool:
results = []
for article in self.data:
res = pool.apply_async(_download_and_save_article, (article, out_folder, restart))
results.append(res)
for result in tqdm(results):
result.get()
def _download_and_save_article(article: NewsRecord, out_folder: Path, restart: bool = False):
out_path = out_folder / f'{article.id}.html'
error_dir = out_folder / 'Error Responses'
error_path = error_dir / f'{article.id}.txt'
if not restart and out_path.exists():
logger.info(f'Found existing text for article {article.id} and restart=False, skipping download')
return
try:
text = article.download_article()
except (
requests.ConnectionError,
requests.TooManyRedirects,
NoValidNewsURLException,
InvalidNewsResponseException
) as e:
logger.error(f'Got error while downloading {article.id} from urls: {article.guid} and {article.url}. See {error_path}')
if not os.path.exists(error_dir):
try:
os.makedirs(error_dir)
except FileExistsError:
pass # created by another thread
error_path.write_text(str(e))
return
# Got valid result
logger.debug(f'Downloaded text for article {article.id}')
out_path.write_text(text)
[docs]def news_from_dict(s: Any) -> NewsData:
return NewsData.from_dict(s)
[docs]def news_to_dict(x: NewsData) -> Any:
return to_class(NewsData, x)
[docs]class CouldNotGetNewsException(ResponseException):
pass
[docs]class NoValidNewsURLException(Exception):
pass
[docs]class InvalidNewsResponseException(ResponseException):
pass