import warnings
from copy import deepcopy
from datetime import datetime, timedelta
from typing import Callable, Dict, List, Optional, Union
import dateutil
import pandas as pd
from dateutil import parser as dateparser
from github.Commit import Commit
from github.CommitStats import CommitStats
from github.GitAuthor import GitAuthor
from github.GitCommit import GitCommit
from github.GitRelease import GitRelease
from github.Issue import Issue
from github.NamedUser import NamedUser
from github.Repository import Repository
from github.Stargazer import Stargazer
from projectreport.analyzer.ts.base import TimeSeriesAnalysis
from projectreport.analyzer.ts.types import DictList
from projectreport.tools.monkey_patch_github import (
    NoMorePagesAllowedException,
    monkey_patch_github_obj_for_throttling,
)
from projectreport.version import (
    add_major_minor_patch_changed_to_df,
    add_major_minor_patch_to_df,
)
[docs]class GithubAnalysis(TimeSeriesAnalysis):
    analysis_attrs = ["repo"]
[docs]    def __init__(self, repo: Repository, auto_throttle: bool = True):
        self.repo = deepcopy(repo)
        self.auto_throttle = auto_throttle
        if self.auto_throttle:
            monkey_patch_github_obj_for_throttling(self.repo) 
    @property
    def event_functions(self) -> Dict[str, Callable[[Repository], DictList]]:
        funcs: Dict[str, Callable[[Repository], DictList]] = dict(
            commits=commit_stats_from_repo,
            issues=issue_stats_from_repo,
            stars=stars_from_repo,
            releases=releases_from_repo,
        )
        return funcs
    @property
    def count_functions(self) -> Dict[str, Callable[[DictList, str], DictList]]:
        funcs: Dict[str, Callable[[DictList, str], DictList]] = dict(
            commits=commit_loc_counts_from_commit_events,
            issues=issue_counts_from_issue_events,
            stars=star_counts_from_star_events,
            releases=release_counts_from_release_events,
        )
        return funcs 
[docs]def commit_stats_from_repo(repo: Repository, author_stats: bool = True) -> DictList:
    all_data = []
    commit: Commit
    try:
        for commit in repo.get_commits():
            stats: CommitStats = commit.stats
            author: Optional[Union[NamedUser, GitAuthor]] = _get_author_from_commit(
                commit
            )
            committer: Optional[
                Union[NamedUser, GitAuthor]
            ] = _get_committer_from_commit(commit)
            data_dict = dict(
                sha=commit.sha,
                last_modified=dateparser.parse(commit.last_modified)
                if commit.last_modified is not None
                else None,
                additions=stats.additions,
                deletions=stats.deletions,
                url=commit.html_url,
            )
            if author_stats:
                if author is not None:
                    data_dict.update(_get_data_from_named_user_or_git_author(author))
                if committer is not None:
                    data_dict.update(
                        _get_data_from_named_user_or_git_author(
                            committer, is_committer=True
                        )
                    )
            all_data.append(data_dict)
    except NoMorePagesAllowedException:
        warnings.warn(
            f"Could not collect full history for {repo.name} commits as Github "
            f"limits the amount of history than can be pulled"
        )
    return all_data  # type: ignore 
[docs]def commit_loc_counts_from_commit_events(
    commits: DictList, freq: str = "d"
) -> DictList:
    event_df = pd.DataFrame(commits)
    event_df["net"] = event_df["additions"] - event_df["deletions"]
    event_df["change"] = event_df["additions"] + event_df["deletions"]
    start = _get_end_of_period(event_df["last_modified"].min(), freq)
    end = event_df["last_modified"].max()
    dates = pd.date_range(start=start, end=end, freq=freq)
    count_data = []
    for date in dates:
        until_time_df = event_df[event_df["last_modified"] < date]
        commit_counts = len(until_time_df)
        loc = until_time_df["net"].sum()
        loc_changed = until_time_df["change"].sum()
        count_data.append(
            dict(date=date, commits=commit_counts, loc=loc, loc_changed=loc_changed)
        )
    return count_data 
[docs]def issue_stats_from_repo(repo: Repository) -> DictList:
    all_data = []
    issue: Issue
    try:
        for issue in repo.get_issues(state="all"):
            data_dict = dict(
                number=issue.number,
                created_at=issue.created_at,
                updated_at=issue.updated_at,
                closed_at=issue.closed_at,
                comments_count=issue.comments,
                state=issue.state,
                is_pull_issue=issue.pull_request is not None,
            )
            all_data.append(data_dict)
    except NoMorePagesAllowedException:
        warnings.warn(
            f"Could not collect full history for {repo.name} issues as Github "
            f"limits the amount of history than can be pulled"
        )
    return all_data  # type: ignore 
[docs]def issue_counts_from_issue_events(issues: DictList, freq: str = "d") -> DictList:
    event_df = pd.DataFrame(issues)
    start = _get_end_of_period(event_df["created_at"].min(), freq)
    end = event_df["updated_at"].max()
    dates = pd.date_range(start=start, end=end, freq=freq)
    count_data = []
    for date in dates:
        until_time_df = event_df.loc[event_df["created_at"] <= date]
        # Mark issues which are closed now as open if they were not closed by this time
        until_time_df.loc[until_time_df["closed_at"] > date, "state"] = "open"
        pull_df = until_time_df[until_time_df["is_pull_issue"]]
        issue_df = until_time_df[~until_time_df["is_pull_issue"]]
        all_issues = len(until_time_df)
        closed_issues = len(issue_df[issue_df["state"] == "closed"])
        closed_pull_issues = len(pull_df[pull_df["state"] == "closed"])
        open_issues = len(issue_df[issue_df["state"] == "open"])
        open_pull_issues = len(pull_df[pull_df["state"] == "open"])
        count_data.append(
            dict(
                date=date,
                all_issues=all_issues,
                closed_issues=closed_issues,
                closed_pull_issues=closed_pull_issues,
                open_issues=open_issues,
                open_pull_issues=open_pull_issues,
            )
        )
    return count_data 
[docs]def stars_from_repo(repo: Repository) -> DictList:
    all_data = []
    stars: Stargazer
    try:
        for stars in repo.get_stargazers_with_dates():
            user: NamedUser = stars.user
            data_dict = dict(
                date=stars.starred_at,
                user_name=user.name,
                user_login=user.login,
            )
            all_data.append(data_dict)
    except NoMorePagesAllowedException:
        warnings.warn(
            f"Could not collect full history for {repo.name} stars as Github "
            f"limits the amount of history than can be pulled"
        )
    return all_data  # type: ignore 
[docs]def star_counts_from_star_events(stars: DictList, freq: str = "d") -> DictList:
    event_df = pd.DataFrame(stars)
    start = _get_end_of_period(event_df["date"].min(), freq)
    end = event_df["date"].max()
    dates = pd.date_range(start=start, end=end, freq=freq)
    count_data = []
    for date in dates:
        until_time_df = event_df[event_df["date"] < date]
        star_count = len(until_time_df)
        count_data.append(dict(date=date, stars=star_count))
    return count_data 
[docs]def releases_from_repo(repo: Repository) -> DictList:
    all_data = []
    releases: GitRelease
    try:
        for release in repo.get_releases():
            author = release.author
            author_name: Optional[str] = None
            author_login: Optional[str] = None
            if author is not None:
                author_name = author.name
                author_login = author.login
            data_dict = dict(
                created_at=release.created_at,
                published_at=release.published_at,
                url=release.html_url,
                id=release.id,
                tag_name=release.tag_name,
                draft=release.draft,
                prerelease=release.prerelease,
                body=release.body,
                author_name=author_name,
                author_login=author_login,
            )
            all_data.append(data_dict)
    except NoMorePagesAllowedException:
        warnings.warn(
            f"Could not collect full history for {repo.name} releases as Github "
            f"limits the amount of history than can be pulled"
        )
    return all_data  # type: ignore 
[docs]def release_counts_from_release_events(
    releases: DictList,
    freq: str = "d",
    date_var: str = "published_at",
) -> DictList:
    event_df = pd.DataFrame(releases)
    event_df.sort_values(date_var, inplace=True)
    start = _get_end_of_period(event_df[date_var].min(), freq)
    end = event_df[date_var].max()
    add_major_minor_patch_to_df(event_df)
    semver_df = event_df.loc[event_df["Version"].apply(lambda ver: ver.is_semver)]
    if not semver_df.empty:
        add_major_minor_patch_changed_to_df(semver_df)
    dates = pd.date_range(start=start, end=end, freq=freq)
    count_data = []
    for date in dates:
        until_time_df = event_df[event_df[date_var] < date]
        release_count = len(until_time_df)
        until_time_df = semver_df[semver_df[date_var] < date]
        if not until_time_df.empty:
            major_df = until_time_df[until_time_df["Major Changed"]]
            major_count = len(major_df)
            minor_df = until_time_df[until_time_df["Minor Changed"]]
            minor_count = len(minor_df)
            patch_df = until_time_df[until_time_df["Patch Changed"]]
            patch_count = len(patch_df)
        else:
            major_count, minor_count, patch_count = 0, 0, 0
        count_data.append(
            dict(
                date=date,
                releases=release_count,
                major_releases=major_count,
                minor_releases=minor_count,
                patch_releases=patch_count,
            )
        )
    return count_data 
def _get_data_from_named_user_or_git_author(
    user: Union[NamedUser, GitAuthor], is_committer: bool = False
) -> Dict[str, str]:
    if is_committer:
        key_base = "committer"
    else:
        key_base = "author"
    data: Dict[str, str] = {
        f"{key_base}_name": user.name,
        f"{key_base}_email": user.email,
    }
    if isinstance(user, NamedUser):
        data.update(
            {
                f"{key_base}_login": user.login,
            }
        )
    return data
def _get_author_from_commit(commit: Commit) -> Optional[Union[NamedUser, GitAuthor]]:
    if commit.author is not None:
        # NamedUser
        return commit.author
    git_commit: GitCommit = commit.commit
    # GitAuthor
    return git_commit.author
def _get_committer_from_commit(commit: Commit) -> Optional[Union[NamedUser, GitAuthor]]:
    if commit.committer is not None:
        # NamedUser
        return commit.committer
    git_commit: GitCommit = commit.commit
    # GitAuthor
    return git_commit.committer
def _get_end_of_period(date: pd.Timestamp, freq: str) -> pd.Timestamp:
    # TODO [#16]: get _get_end_of_period working correctly for all frequencies
    #
    # Works correctly for month, day, hour, and weeks starting on a different day.
    # Currently gets beginning of period for weeks starting with the same day.
    try:
        return date.ceil(freq)
    except ValueError as e:
        if "is a non-fixed frequency" in str(e):
            return date.to_period(freq).to_timestamp(freq).tz_localize("UTC")
        else:
            raise e