from __future__ import annotations
from typing import Union, Tuple, List
import warnings
import numpy as np
class Question:
"""Question is a thershold/matching concept for splitting the node of the Decision Tree
Args:
column_index (int): Column index to be chosen from the array passed at the matching time.
value (Union[int, str, float, np.int64, np.float64]): Threshold value/ matching value.
header (str): column/header name.
"""
def __init__(
self,
column_index: int,
value: Union[int, str, float, np.int64, np.float64],
header: str,
):
"""Constructor"""
self.column_index = column_index
self.value = value
self.header = header
def match(self, example: Union[list, np.ndarray]) -> bool:
"""Matching function to decide based on example whether result is true or false.
Args:
example (Union[list, np.ndarray]): Example to compare with question parameters.
Returns:
bool: if the example is in threshold or value matches then results true or false.
"""
if isinstance(example, list):
example = np.array(example, dtype="O")
val = example[self.column_index]
# adding numpy int and float data types as well
if isinstance(val, (int, float, np.int64, np.float64)):
# a condition for question to return True or False for numeric value
return float(val) >= float(self.value)
else:
return str(val) == str(self.value) # categorical data comparison
def __repr__(self):
condition = "=="
if isinstance(self.value, (int, float, np.int64, np.float64)):
condition = ">="
return f"Is {self.header} {condition} {self.value} ?"
class Node:
"""A Tree node either Decision Node or Leaf Node
Args:
question (Question, optional): question object. Defaults to None.
true_branch (Node, optional): connection to node at true side of the branch. Defaults to None.
false_branch (Node, optional): connection to node at false side of the branch. Defaults to None.
uncertainty (float, optional): Uncertainty value like gini,entropy,variance etc. Defaults to None.
leaf_value (Union[dict,int,float], optional): Leaf node/final node's value. Defaults to None.
"""
def __init__(
self,
question: Question = None,
true_branch: Node = None,
false_branch: Node = None,
uncertainty: float = None,
*,
leaf_value: Union[dict, int, float] = None,
):
"""Constructor"""
self.question = question
self.true_branch = true_branch
self.false_branch = false_branch
self.uncertainty = uncertainty
self.leaf_value = leaf_value
@property
def _is_leaf_node(self) -> bool:
"""Check if this node is leaf node or not.
Returns:
bool: True if leaf node else false.
"""
return self.leaf_value is not None
[docs]
class DecisionTreeClassifier:
"""Decision Tree Based Classification Model
Args:
max_depth (int, optional): max depth of the tree. Defaults to 100.
min_samples_split (int, optional): min size of the sample at the time of split. Defaults to 2.
criteria (str, optional): what criteria to use for information. Defaults to 'gini'. available 'gini','entropy'.
"""
def __init__(
self, max_depth: int = 100, min_samples_split: int = 2, criteria: str = "gini"
):
"""Constructor"""
self._X = None
self._y = None
self._feature_names = None
self._target_name = None
self._tree = None
self.max_depth = max_depth
self.min_samples_split = min_samples_split
self.criteria = criteria
def _count_dict(self, a: np.ndarray) -> dict:
"""Count class frequecies and get a dictionary from it
Args:
a (np.ndarray): input array. shape should be (m,1) for m samples.
Returns:
dict: categories/classes freq dictionary.
"""
unique_values = np.unique(a, return_counts=True)
zipped = zip(*unique_values)
dict_obj = dict(zipped)
return dict_obj
def _gini_impurity(self, arr: np.ndarray) -> float:
"""Calculate Gini Impurity
Args:
arr (np.ndarray): input array.
Returns:
float: gini impurity value.
"""
classes, counts = np.unique(arr, return_counts=True)
gini_score = 1 - np.square(counts / arr.shape[0]).sum(axis=0)
return gini_score
def _entropy(self, arr: np.ndarray) -> float:
"""Calculate Entropy
Args:
arr (np.ndarray): input array.
Returns:
float: entropy result.
"""
classes, counts = np.unique(arr, return_counts=True)
p = counts / arr.shape[0]
entropy_score = (-p * np.log2(p)).sum(axis=0)
return entropy_score
def _uncertainty(self, a: np.ndarray) -> float:
"""calcualte uncertainty
Args:
a (np.ndarray): input array
Returns:
float: uncertainty value
"""
if self.criteria == "entropy":
value = self._entropy(a)
elif self.criteria == "gini":
value = self._gini_impurity(a)
else:
warnings.warn(f"{self.criteria} is not coded yet. returning to gini.")
value = self._gini_impurity(a)
return value
def _partition(
self, rows: np.ndarray, question: Union[Question, None]
) -> Tuple[list, list]:
"""partition the rows based on the question
Args:
rows (np.ndarray): input array to split.
question (Question): question object containing spltting concept.
Returns:
Tuple[list,list]: true idxs and false idxs.
"""
true_idx, false_idx = [], []
for idx, row in enumerate(rows):
if question.match(row):
true_idx.append(idx)
else:
false_idx.append(idx)
return true_idx, false_idx
def _info_gain(
self, left: np.ndarray, right: np.ndarray, parent_uncertainty: float
) -> float:
"""Calculate information gain after splitting
Args:
left (np.ndarray): left side array.
right (np.ndarray): right side array.
parent_uncertainty (float): parent node Uncertainity.
Returns:
flaot: information gain value.
"""
# calculating portion/ partition/ weightage
pr = left.shape[0] / (left.shape[0] + right.shape[0])
# calcualte child uncertainity
child_uncertainty = pr * self._uncertainty(left) - (1 - pr) * self._uncertainty(
right
)
# calculate information gain
info_gain_value = parent_uncertainty - child_uncertainty
return info_gain_value
def _find_best_split(
self, X: np.ndarray, y: np.ndarray
) -> Tuple[float, Union[Question, None], float]:
"""method to find best split possible for the sample
Args:
X (np.ndarray): Feature matrix.
y (np.ndarray): target matrix.
Returns:
Tuple[float,Union[Question,None],float]: maximum gain from the split, best question of it, and parent node uncertainty.
"""
max_gain = -1
best_split_question = None
parent_uncertainty = self._uncertainty(y)
m_samples, n_labels = X.shape
for col_index in range(n_labels): # iterate over feature columns
# get unique values from the feature
unique_values = np.unique(X[:, col_index])
for (
val
) in unique_values: # check for every value and find maximum info gain
ques = Question(
column_index=col_index,
value=val,
header=self._feature_names[col_index],
)
t_idx, f_idx = self._partition(X, ques)
# if it does not split the data
# skip it
if len(t_idx) == 0 or len(f_idx) == 0:
continue
true_y = y[t_idx, :]
false_y = y[f_idx, :]
# get information gain
gain = self._info_gain(true_y, false_y, parent_uncertainty)
if gain > max_gain:
max_gain, best_split_question = gain, ques
return max_gain, best_split_question, parent_uncertainty
def _build_tree(self, X: np.ndarray, y: np.ndarray, depth: int = 0) -> Node:
"""Recursive funtion to build tree.
Args:
X (np.ndarray): input features matrix.
y (np.ndarray): target matrix.
depth (int, optional): depth count of the recursion. Defaults to 0.
Returns:
Node: either leaf node or decision node
"""
m_samples, n_labels = X.shape
# if depth is greater than max depth defined or labels/features are left to 1
# or number of samples are less than the minimum size of samples to split then
# stop recursion and return a node
if (
depth > self.max_depth
or n_labels == 1
or m_samples < self.min_samples_split
):
return Node(leaf_value=self._count_dict(y))
gain, ques, uncertainty = self._find_best_split(X, y)
# if gain is zero
# then no point grinding further here
if gain < 0:
return Node(leaf_value=self._count_dict(y))
t_idx, f_idx = self._partition(X, ques) # get partition idxs
true_branch = self._build_tree(
X[t_idx, :], y[t_idx, :], depth + 1
) # recog true branch samples
false_branch = self._build_tree(
X[f_idx, :], y[f_idx, :], depth + 1
) # recog false branch samples
return Node(
question=ques,
true_branch=true_branch,
false_branch=false_branch,
uncertainty=uncertainty,
)
[docs]
def train(
self,
X: Union[np.ndarray, list],
y: Union[np.ndarray, list],
feature_name: list = None,
target_name: list = None,
) -> None:
"""Train the model
Args:
X (Union[np.ndarray,list]): feature matrix.
y (Union[np.ndarray,list]): target matrix.
feature_name (list, optional): feature names list. Defaults to None.
target_name (list, optional): target name list. Defaults to None.
"""
X = (
np.array(X, dtype="O") if not isinstance(X, (np.ndarray)) else X
) # converting to numpy array
y = (
np.array(y, dtype="O") if not isinstance(y, (np.ndarray)) else y
) # converting to numpy array
# reshaping to vectors
self._X = X.reshape(-1, 1) if len(X.shape) == 1 else X
self._y = y.reshape(-1, 1) if len(y.shape) == 1 else y
# creating feature names if not mentioned
self._feature_names = feature_name or [
f"C_{i}" for i in range(self._X.shape[1])
]
# creating target name if not mentioned
self._target_name = target_name or ["target"]
# BOOOM
# building the tree
self._tree = self._build_tree(X=self._X, y=self._y)
[docs]
def print_tree(self, node: Union[Node, None] = None, spacing: str = "|-") -> None:
"""print the tree
Args:
node (Union[Node,None], optional): starting node. Defaults to None. then it will go to the root node of the tree.
spacing (str, optional): printing separater. Defaults to "|-".
"""
node = node or self._tree
if node._is_leaf_node:
print(spacing, " Predict :", node.leaf_value)
return
# Print the question at this node
print(
spacing
+ str(node.question)
+ " | "
+ self.criteria
+ " :"
+ str(node.uncertainty)
)
# Call this function recursively on the true branch
print(spacing + "--> True:")
self.print_tree(node.true_branch, " " + spacing + "-")
# Call this function recursively on the false branch
print(spacing + "--> False:")
self.print_tree(node.false_branch, " " + spacing + "-")
def _classification(self, row: np.ndarray, node: Union[Node, None]) -> Union[dict]:
"""Classification recursive function
Args:
row (np.ndarray): input matrix.
node (Union[Node,None]): node to start with. mostly root node. rest will be handled by recursion.
Returns:
Union[dict]: leaf value. classification result.
"""
if node._is_leaf_node:
return node.leaf_value
if node.question.match(row):
return self._classification(row, node.true_branch)
else:
return self._classification(row, node.false_branch)
def _leaf_probabilities(self, results: dict) -> dict:
"""get probabilties
Args:
results (dict): results from _classification.
Returns:
dict: dictionary with categorical probabilities.
"""
total = sum(results.values())
probs = {}
for key in results:
probs[key] = (results[key] / total) * 100
return probs
[docs]
def predict(self, X: Union[np.ndarray, list]) -> np.ndarray:
"""predict classification results
Args:
X (Union[np.ndarray,list]): testing matrix.
Raises:
ValueError: input X can only be a list or numpy array.
Returns:
np.ndarray: results of classification.
"""
if isinstance(X, (np.ndarray, list)):
X = np.array(X, dtype="O") if not isinstance(X, (np.ndarray)) else X
if len(X.shape) == 1:
max_result = 0
result_dict = self._classification(row=X, node=self._tree)
result = None
for key in result_dict:
if result_dict[key] > max_result:
result = key
return np.array([[result]], dtype="O")
else:
leaf_value = []
# get maximum caterigorical value from all catergories
for row in X:
max_result = 0
result_dict = self._classification(row=row, node=self._tree)
result = None
for key in result_dict:
if result_dict[key] > max_result:
result = key
leaf_value.append([result])
return np.array(leaf_value, dtype="O")
else:
raise ValueError("X should be list or numpy array")
[docs]
def predict_probability(
self, X: Union[np.ndarray, list]
) -> Union[np.ndarray, dict]:
"""predict classfication probabilities
Args:
X (Union[np.ndarray,list]): testing matrix.
Raises:
ValueError: input X can only be a list or numpy array.
Returns:
Union[np.ndarray, dict]: probabity results of classification.
"""
if isinstance(X, (np.ndarray, list)):
X = np.array(X, dtype="O") if not isinstance(X, (np.ndarray)) else X
if len(X.shape) == 1:
return self._leaf_probabilities(
self._classification(row=X, node=self._tree)
)
else:
leaf_value = []
for row in X:
leaf_value.append(
[
self._leaf_probabilities(
self._classification(row=row, node=self._tree)
)
]
)
return np.array(leaf_value, dtype="O")
else:
raise ValueError("X should be list or numpy array")
[docs]
class DecisionTreeRegressor:
"""Decision Tree Based Regression Model
Args:
max_depth (int, optional): maximum depth of the tree. Defaults to 10.
min_samples_split (int, optional): minimum number of samples while splitting. Defaults to 3.
criteria (str, optional): criteria for best info gain. Defaults to 'variance'.
"""
def __init__(
self,
max_depth: int = 10,
min_samples_split: int = 3,
criteria: str = "variance",
):
"""constructor"""
self._X = None
self._y = None
self._feature_names = None
self._target_name = None
self._tree = None
self.max_depth = max_depth
self.min_samples_split = min_samples_split
self.criteria = criteria
def _mean_leaf_value(self, a: np.ndarray) -> float:
"""leaf values mean
Args:
a (np.ndarray): input array.
Returns:
float: mean value
"""
return float(np.mean(a))
def _partition(
self, rows: np.ndarray, question: Union[Question, None]
) -> Tuple[list, list]:
"""partition the rows based on the question
Args:
rows (np.ndarray): input array to split.
question (Question): question object containing spltting concept.
Returns:
Tuple[list,list]: true idxs and false idxs.
"""
true_idx, false_idx = [], []
for idx, row in enumerate(rows):
if question.match(row):
true_idx.append(idx)
else:
false_idx.append(idx)
return true_idx, false_idx
def _uncertainty(self, a: np.ndarray) -> float:
"""calcualte uncertainty
Args:
a (np.ndarray): input array
Returns:
float: uncertainty value
"""
if self.criteria == "variance":
value = np.var(a)
else:
warnings.warn(f"{self.criteria} is not coded yet. returning to variance.")
value = np.var(a)
return float(value)
def _info_gain(
self, left: np.ndarray, right: np.ndarray, parent_uncertainty: float
) -> float:
"""Calculate information gain after splitting
Args:
left (np.ndarray): left side array.
right (np.ndarray): right side array.
parent_uncertainty (float): parent node Uncertainity.
Returns:
flaot: information gain value.
"""
pr = left.shape[0] / (left.shape[0] + right.shape[0])
child_uncertainty = pr * self._uncertainty(left) - (1 - pr) * self._uncertainty(
right
)
info_gain_value = parent_uncertainty - child_uncertainty
return info_gain_value
def _find_best_split(
self, X: np.ndarray, y: np.ndarray
) -> Tuple[float, Union[Question, None], float]:
"""method to find best split possible for the sample
Args:
X (np.ndarray): Feature matrix.
y (np.ndarray): target matrix.
Returns:
Tuple[float,Union[Question,None],float]: maximum gain from the split, best question of it, and parent node uncertainty
"""
max_gain = -1
best_split_question = None
parent_uncertainty = self._uncertainty(y)
m_samples, n_labels = X.shape
for col_index in range(n_labels): # iterate over feature columns
# get unique values from the feature
unique_values = np.unique(X[:, col_index])
for (
val
) in unique_values: # check for every value and find maximum info gain
ques = Question(
column_index=col_index,
value=val,
header=self._feature_names[col_index],
)
t_idx, f_idx = self._partition(X, ques)
# if it does not split the data
# skip it
if len(t_idx) == 0 or len(f_idx) == 0:
continue
true_y = y[t_idx, :]
false_y = y[f_idx, :]
gain = self._info_gain(true_y, false_y, parent_uncertainty)
if gain > max_gain:
max_gain, best_split_question = gain, ques
return max_gain, best_split_question, parent_uncertainty
def _build_tree(self, X: np.ndarray, y: np.ndarray, depth: int = 0) -> Node:
"""Recursive funtion to build tree
Args:
X (np.ndarray): input features matrix.
y (np.ndarray): target matrix.
depth (int, optional): depth count of the recursion. Defaults to 0.
Returns:
Node: either leaf node or decision node
"""
m_samples, n_labels = X.shape
# if depth is greater than max depth defined or labels/features are left to 1
# or number of samples are less than the minimum size of samples to split then
# stop recursion and return a node
if (
depth > self.max_depth
or n_labels == 1
or m_samples < self.min_samples_split
):
return Node(leaf_value=y)
gain, ques, uncertainty = self._find_best_split(X, y)
# if gain is zero no point in going further
if gain < 0:
return Node(leaf_value=y)
t_idx, f_idx = self._partition(X, ques)
true_branch = self._build_tree(
X[t_idx, :], y[t_idx, :], depth + 1
) # get true samples
false_branch = self._build_tree(
X[f_idx, :], y[f_idx, :], depth + 1
) # get false samples
return Node(
question=ques,
true_branch=true_branch,
false_branch=false_branch,
uncertainty=uncertainty,
)
[docs]
def train(
self,
X: Union[np.ndarray, list],
y: Union[np.ndarray, list],
feature_name: list = None,
target_name: list = None,
) -> None:
"""Train the model
Args:
X (Union[np.ndarray,list]): feature matrix.
y (Union[np.ndarray,list]): target matrix.
feature_name (list, optional): feature names list. Defaults to None.
target_name (list, optional): target name list. Defaults to None.
"""
X = (
np.array(X, dtype="O") if not isinstance(X, (np.ndarray)) else X
) # converting to numpy array
y = (
np.array(y, dtype="O") if not isinstance(y, (np.ndarray)) else y
) # converting to numpy array
# reshaping to vectors
self._X = X.reshape(-1, 1) if len(X.shape) == 1 else X
self._y = y.reshape(-1, 1) if len(y.shape) == 1 else y
# creating feature names if not mentioned
self._feature_names = feature_name or [
f"C_{i}" for i in range(self._X.shape[1])
]
# creating target name if not mentioned
self._target_name = target_name or ["target"]
# BOOOM
# building the tree
self._tree = self._build_tree(X=self._X, y=self._y)
[docs]
def print_tree(
self,
node: Union[Node, None] = None,
spacing: str = "|-",
mean_preds: bool = True,
) -> None:
"""print the tree
Args:
node (Union[Node,None], optional): starting node. Defaults to None. then it will go to the root node of the tree.
spacing (str, optional): printing separater. Defaults to "|-".
mean_preds (bool): do the mean of prediction values. Defaults to True.
"""
node = node or self._tree
if node._is_leaf_node:
if mean_preds:
print(spacing, " Predict :", self._mean_leaf_value(node.leaf_value))
else:
print(spacing, " Predict :", node.leaf_value[..., -1])
return
# Print the question at this node
print(
spacing
+ str(node.question)
+ " | "
+ self.criteria
+ " :"
+ str(node.uncertainty)
)
# Call this function recursively on the true branch
print(spacing + "--> True:")
self.print_tree(node.true_branch, " " + spacing + "-", mean_preds)
# Call this function recursively on the false branch
print(spacing + "--> False:")
self.print_tree(node.false_branch, " " + spacing + "-", mean_preds)
def _regression(
self, row: np.ndarray, node: Union[Node, None], mean_preds: bool
) -> float:
"""regression recursive method
Args:
row (np.ndarray): input matrix.
node (Union[Node,None]): node to start with. mostly root node. rest will be handled by recursion.
mean_preds (bool): do the mean of prediction values.
Returns:
float: regression result.
"""
if node._is_leaf_node:
if mean_preds:
return self._mean_leaf_value(node.leaf_value)
else:
return node.leaf_value[..., -1]
if node.question.match(row):
return self._regression(row, node.true_branch, mean_preds)
else:
return self._regression(row, node.false_branch, mean_preds)
[docs]
def predict(self, X: np.ndarray, mean_preds: bool = True) -> np.ndarray:
"""predict regresssion
Args:
X (np.ndarray): testing matrix.
mean_preds (bool): do the mean of prediction values. Defaults to True.
Raises:
ValueError: X should be list or numpy array
Returns:
np.ndarray: regression prediction.
"""
if isinstance(X, (np.ndarray, list)):
X = np.array(X, dtype="O") if not isinstance(X, (np.ndarray)) else X
if len(X.shape) == 1:
result = self._regression(row=X, node=self._tree, mean_preds=mean_preds)
return np.array([[result]], dtype="O")
else:
leaf_value = []
for row in X:
result = self._regression(
row=row, node=self._tree, mean_preds=mean_preds
)
leaf_value.append([result])
return np.array(leaf_value, dtype="O")
else:
raise ValueError("X should be list or numpy array")