-
Notifications
You must be signed in to change notification settings - Fork 2.7k
/
decision_tree.py
282 lines (229 loc) · 9.35 KB
/
decision_tree.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
"""
Author: Philip Andreadis
e-mail: [email protected]
Implementation of Decision Tree model from scratch.
Metric used to apply the split on the data is the Gini index which is calculated for each feature's single value
in order to find the best split on each step. This means there is room for improvement performance wise as this
process is O(n^2) and can be reduced to linear complexity.
Parameters of the model:
max_depth (int): Maximum depth of the decision tree
min_node_size (int): Minimum number of instances a node can have. If this threshold is exceeded the node is terminated
Both are up to the user to set.
Input dataset to train() function must be a numpy array containing both feature and label values.
"""
from collections import Counter
import numpy as np
class DecisionTree:
def __init__(self, max_depth, min_node_size):
self.max_depth = max_depth
self.min_node_size = min_node_size
self.final_tree = {}
"""
This function calculates the gini index of a split in the dataset
Firstly the gini score is calculated for each child note and the resulting Gini is the weighted sum of gini_left and gini_right
Parameters:
child_nodes (list of np arrays): The two groups of instances resulting from the split
Returns:
float:Gini index of the split
"""
def calculate_gini(self, child_nodes):
n = 0
# Calculate number of all instances of the parent node
for node in child_nodes:
n = n + len(node)
gini = 0
# Calculate gini index for each child node
for node in child_nodes:
m = len(node)
# Avoid division by zero if a child node is empty
if m == 0:
continue
# Create a list with each instance's class value
y = []
for row in node:
y.append(row[-1])
# Count the frequency for each class value
freq = Counter(y).values()
node_gini = 1
for i in freq:
node_gini = node_gini - (i / m) ** 2
gini = gini + (m / n) * node_gini
return gini
"""
This function splits the dataset on certain value of a feature
Parameters:
feature_index (int): Index of selected feature
threshold : Value of the feature split point
Returns:
np.array: Two new groups of split instances
"""
def apply_split(self, feature_index, threshold, data):
instances = data.tolist()
left_child = []
right_child = []
for row in instances:
if row[feature_index] < threshold:
left_child.append(row)
else:
right_child.append(row)
left_child = np.array(left_child)
right_child = np.array(right_child)
return left_child, right_child
"""
This function finds the best split on the dataset on each iteration of the algorithm by evaluating
all possible splits and applying the one with the minimum Gini index.
Parameters:
data: Dataset
Returns node (dict): Dictionary with the index of the splitting feature and its value and the two child nodes
"""
def find_best_split(self, data):
num_of_features = len(data[0]) - 1
gini_score = 1000
f_index = 0
f_value = 0
# Iterate through each feature and find minimum gini score
for column in range(num_of_features):
for row in data:
value = row[column]
l, r = self.apply_split(column, value, data)
children = [l, r]
score = self.calculate_gini(children)
# print("Candidate split feature X{} < {} with Gini score {}".format(column,value,score))
if score < gini_score:
gini_score = score
f_index = column
f_value = value
child_nodes = children
# print("Chosen feature is {} and its value is {} with gini index {}".format(f_index,f_value,gini_score))
node = {"feature": f_index, "value": f_value, "children": child_nodes}
return node
"""
This function calculates the most frequent class value in a group of instances
Parameters:
node: Group of instances
Returns : Most common class value
"""
def calc_class(self, node):
# Create a list with each instance's class value
y = []
for row in node:
y.append(row[-1])
# Find most common class value
occurence_count = Counter(y)
return occurence_count.most_common(1)[0][0]
"""
Recursive function that builds the decision tree by applying split on every child node until they become terminal.
Cases to terminate a node is: i.max depth of tree is reached ii.minimum size of node is not met iii.child node is empty
Parameters:
node: Group of instances
depth (int): Current depth of the tree
"""
def recursive_split(self, node, depth):
l, r = node["children"]
del node["children"]
if l.size == 0:
c_value = self.calc_class(r)
node["left"] = node["right"] = {"class_value": c_value, "depth": depth}
return
elif r.size == 0:
c_value = self.calc_class(l)
node["left"] = node["right"] = {"class_value": c_value, "depth": depth}
return
# Check if tree has reached max depth
if depth >= self.max_depth:
# Terminate left child node
c_value = self.calc_class(l)
node["left"] = {"class_value": c_value, "depth": depth}
# Terminate right child node
c_value = self.calc_class(r)
node["right"] = {"class_value": c_value, "depth": depth}
return
# process left child
if len(l) <= self.min_node_size:
c_value = self.calc_class(l)
node["left"] = {"class_value": c_value, "depth": depth}
else:
node["left"] = self.find_best_split(l)
self.recursive_split(node["left"], depth + 1)
# process right child
if len(r) <= self.min_node_size:
c_value = self.calc_class(r)
node["right"] = {"class_value": c_value, "depth": depth}
else:
node["right"] = self.find_best_split(r)
self.recursive_split(node["right"], depth + 1)
"""
Apply the recursive split algorithm on the data in order to build the decision tree
Parameters:
X (np.array): Training data
Returns tree (dict): The decision tree in the form of a dictionary.
"""
def train(self, X):
# Create initial node
tree = self.find_best_split(X)
# Generate the rest of the tree via recursion
self.recursive_split(tree, 1)
self.final_tree = tree
return tree
"""
Prints out the decision tree.
Parameters:
tree (dict): Decision tree
"""
def print_dt(self, tree, depth=0):
if "feature" in tree:
print(
"\nSPLIT NODE: feature #{} < {} depth:{}\n".format(
tree["feature"], tree["value"], depth
)
)
self.print_dt(tree["left"], depth + 1)
self.print_dt(tree["right"], depth + 1)
else:
print(
"TERMINAL NODE: class value:{} depth:{}".format(
tree["class_value"], tree["depth"]
)
)
"""
This function outputs the class value of the instance given based on the decision tree created previously.
Parameters:
tree (dict): Decision tree
instance(id np.array): Single instance of data
Returns (float): predicted class value of the given instance
"""
def predict_single(self, tree, instance):
if not tree:
print("ERROR: Please train the decision tree first")
return -1
if "feature" in tree:
if instance[tree["feature"]] < tree["value"]:
return self.predict_single(tree["left"], instance)
else:
return self.predict_single(tree["right"], instance)
else:
return tree["class_value"]
"""
This function outputs the class value for each instance of the given dataset.
Parameters:
X (np.array): Dataset with labels
Returns y (np.array): array with the predicted class values of the dataset
"""
def predict(self, X):
y_predict = []
for row in X:
y_predict.append(self.predict_single(self.final_tree, row))
return np.array(y_predict)
if __name__ == "__main__":
# # test dataset
# X = np.array([[1, 1,0], [3, 1, 0], [1, 4, 0], [2, 4, 1], [3, 3, 1], [5, 1, 1]])
# y = np.array([0, 0, 0, 1, 1, 1])
train_data = np.loadtxt("example_data/data.txt", delimiter=",")
train_y = np.loadtxt("example_data/targets.txt")
# Build tree
dt = DecisionTree(5, 1)
tree = dt.train(train_data)
y_pred = dt.predict(train_data)
print(f"Accuracy: {sum(y_pred == train_y) / train_y.shape[0]}")
# Print out the decision tree
# dt.print_dt(tree)