Source code for projectreport.analyzer.ts.github

import warnings
from copy import deepcopy
from datetime import datetime, timedelta
from typing import Callable, Dict, List, Optional, Union

import dateutil
import pandas as pd
from dateutil import parser as dateparser
from github.Commit import Commit
from github.CommitStats import CommitStats
from github.GitAuthor import GitAuthor
from github.GitCommit import GitCommit
from github.GitRelease import GitRelease
from github.Issue import Issue
from github.NamedUser import NamedUser
from github.Repository import Repository
from github.Stargazer import Stargazer

from projectreport.analyzer.ts.base import TimeSeriesAnalysis
from projectreport.analyzer.ts.types import DictList
from projectreport.tools.monkey_patch_github import (
    NoMorePagesAllowedException,
    monkey_patch_github_obj_for_throttling,
)
from projectreport.version import (
    add_major_minor_patch_changed_to_df,
    add_major_minor_patch_to_df,
)


[docs]class GithubAnalysis(TimeSeriesAnalysis): analysis_attrs = ["repo"]
[docs] def __init__(self, repo: Repository, auto_throttle: bool = True): self.repo = deepcopy(repo) self.auto_throttle = auto_throttle if self.auto_throttle: monkey_patch_github_obj_for_throttling(self.repo)
@property def event_functions(self) -> Dict[str, Callable[[Repository], DictList]]: funcs: Dict[str, Callable[[Repository], DictList]] = dict( commits=commit_stats_from_repo, issues=issue_stats_from_repo, stars=stars_from_repo, releases=releases_from_repo, ) return funcs @property def count_functions(self) -> Dict[str, Callable[[DictList, str], DictList]]: funcs: Dict[str, Callable[[DictList, str], DictList]] = dict( commits=commit_loc_counts_from_commit_events, issues=issue_counts_from_issue_events, stars=star_counts_from_star_events, releases=release_counts_from_release_events, ) return funcs
[docs]def commit_stats_from_repo(repo: Repository, author_stats: bool = True) -> DictList: all_data = [] commit: Commit try: for commit in repo.get_commits(): stats: CommitStats = commit.stats author: Optional[Union[NamedUser, GitAuthor]] = _get_author_from_commit( commit ) committer: Optional[ Union[NamedUser, GitAuthor] ] = _get_committer_from_commit(commit) data_dict = dict( sha=commit.sha, last_modified=dateparser.parse(commit.last_modified) if commit.last_modified is not None else None, additions=stats.additions, deletions=stats.deletions, url=commit.html_url, ) if author_stats: if author is not None: data_dict.update(_get_data_from_named_user_or_git_author(author)) if committer is not None: data_dict.update( _get_data_from_named_user_or_git_author( committer, is_committer=True ) ) all_data.append(data_dict) except NoMorePagesAllowedException: warnings.warn( f"Could not collect full history for {repo.name} commits as Github " f"limits the amount of history than can be pulled" ) return all_data # type: ignore
[docs]def commit_loc_counts_from_commit_events( commits: DictList, freq: str = "d" ) -> DictList: event_df = pd.DataFrame(commits) event_df["net"] = event_df["additions"] - event_df["deletions"] event_df["change"] = event_df["additions"] + event_df["deletions"] start = _get_end_of_period(event_df["last_modified"].min(), freq) end = event_df["last_modified"].max() dates = pd.date_range(start=start, end=end, freq=freq) count_data = [] for date in dates: until_time_df = event_df[event_df["last_modified"] < date] commit_counts = len(until_time_df) loc = until_time_df["net"].sum() loc_changed = until_time_df["change"].sum() count_data.append( dict(date=date, commits=commit_counts, loc=loc, loc_changed=loc_changed) ) return count_data
[docs]def issue_stats_from_repo(repo: Repository) -> DictList: all_data = [] issue: Issue try: for issue in repo.get_issues(state="all"): data_dict = dict( number=issue.number, created_at=issue.created_at, updated_at=issue.updated_at, closed_at=issue.closed_at, comments_count=issue.comments, state=issue.state, is_pull_issue=issue.pull_request is not None, ) all_data.append(data_dict) except NoMorePagesAllowedException: warnings.warn( f"Could not collect full history for {repo.name} issues as Github " f"limits the amount of history than can be pulled" ) return all_data # type: ignore
[docs]def issue_counts_from_issue_events(issues: DictList, freq: str = "d") -> DictList: event_df = pd.DataFrame(issues) start = _get_end_of_period(event_df["created_at"].min(), freq) end = event_df["updated_at"].max() dates = pd.date_range(start=start, end=end, freq=freq) count_data = [] for date in dates: until_time_df = event_df.loc[event_df["created_at"] <= date] # Mark issues which are closed now as open if they were not closed by this time until_time_df.loc[until_time_df["closed_at"] > date, "state"] = "open" pull_df = until_time_df[until_time_df["is_pull_issue"]] issue_df = until_time_df[~until_time_df["is_pull_issue"]] all_issues = len(until_time_df) closed_issues = len(issue_df[issue_df["state"] == "closed"]) closed_pull_issues = len(pull_df[pull_df["state"] == "closed"]) open_issues = len(issue_df[issue_df["state"] == "open"]) open_pull_issues = len(pull_df[pull_df["state"] == "open"]) count_data.append( dict( date=date, all_issues=all_issues, closed_issues=closed_issues, closed_pull_issues=closed_pull_issues, open_issues=open_issues, open_pull_issues=open_pull_issues, ) ) return count_data
[docs]def stars_from_repo(repo: Repository) -> DictList: all_data = [] stars: Stargazer try: for stars in repo.get_stargazers_with_dates(): user: NamedUser = stars.user data_dict = dict( date=stars.starred_at, user_name=user.name, user_login=user.login, ) all_data.append(data_dict) except NoMorePagesAllowedException: warnings.warn( f"Could not collect full history for {repo.name} stars as Github " f"limits the amount of history than can be pulled" ) return all_data # type: ignore
[docs]def star_counts_from_star_events(stars: DictList, freq: str = "d") -> DictList: event_df = pd.DataFrame(stars) start = _get_end_of_period(event_df["date"].min(), freq) end = event_df["date"].max() dates = pd.date_range(start=start, end=end, freq=freq) count_data = [] for date in dates: until_time_df = event_df[event_df["date"] < date] star_count = len(until_time_df) count_data.append(dict(date=date, stars=star_count)) return count_data
[docs]def releases_from_repo(repo: Repository) -> DictList: all_data = [] releases: GitRelease try: for release in repo.get_releases(): author = release.author author_name: Optional[str] = None author_login: Optional[str] = None if author is not None: author_name = author.name author_login = author.login data_dict = dict( created_at=release.created_at, published_at=release.published_at, url=release.html_url, id=release.id, tag_name=release.tag_name, draft=release.draft, prerelease=release.prerelease, body=release.body, author_name=author_name, author_login=author_login, ) all_data.append(data_dict) except NoMorePagesAllowedException: warnings.warn( f"Could not collect full history for {repo.name} releases as Github " f"limits the amount of history than can be pulled" ) return all_data # type: ignore
[docs]def release_counts_from_release_events( releases: DictList, freq: str = "d", date_var: str = "published_at", ) -> DictList: event_df = pd.DataFrame(releases) event_df.sort_values(date_var, inplace=True) start = _get_end_of_period(event_df[date_var].min(), freq) end = event_df[date_var].max() add_major_minor_patch_to_df(event_df) semver_df = event_df.loc[event_df["Version"].apply(lambda ver: ver.is_semver)] if not semver_df.empty: add_major_minor_patch_changed_to_df(semver_df) dates = pd.date_range(start=start, end=end, freq=freq) count_data = [] for date in dates: until_time_df = event_df[event_df[date_var] < date] release_count = len(until_time_df) until_time_df = semver_df[semver_df[date_var] < date] if not until_time_df.empty: major_df = until_time_df[until_time_df["Major Changed"]] major_count = len(major_df) minor_df = until_time_df[until_time_df["Minor Changed"]] minor_count = len(minor_df) patch_df = until_time_df[until_time_df["Patch Changed"]] patch_count = len(patch_df) else: major_count, minor_count, patch_count = 0, 0, 0 count_data.append( dict( date=date, releases=release_count, major_releases=major_count, minor_releases=minor_count, patch_releases=patch_count, ) ) return count_data
def _get_data_from_named_user_or_git_author( user: Union[NamedUser, GitAuthor], is_committer: bool = False ) -> Dict[str, str]: if is_committer: key_base = "committer" else: key_base = "author" data: Dict[str, str] = { f"{key_base}_name": user.name, f"{key_base}_email": user.email, } if isinstance(user, NamedUser): data.update( { f"{key_base}_login": user.login, } ) return data def _get_author_from_commit(commit: Commit) -> Optional[Union[NamedUser, GitAuthor]]: if commit.author is not None: # NamedUser return commit.author git_commit: GitCommit = commit.commit # GitAuthor return git_commit.author def _get_committer_from_commit(commit: Commit) -> Optional[Union[NamedUser, GitAuthor]]: if commit.committer is not None: # NamedUser return commit.committer git_commit: GitCommit = commit.commit # GitAuthor return git_commit.committer def _get_end_of_period(date: pd.Timestamp, freq: str) -> pd.Timestamp: # TODO [#16]: get _get_end_of_period working correctly for all frequencies # # Works correctly for month, day, hour, and weeks starting on a different day. # Currently gets beginning of period for weeks starting with the same day. try: return date.ceil(freq) except ValueError as e: if "is a non-fixed frequency" in str(e): return date.to_period(freq).to_timestamp(freq).tz_localize("UTC") else: raise e