-
Notifications
You must be signed in to change notification settings - Fork 1
/
useGPRGPSModel.p
145 lines (116 loc) · 5.24 KB
/
useGPRGPSModel.p
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# deploy model
# this model will apply pseudo throttle values from 1000 to 2000us and create variations of single input sample
# GPS speed will be predicted for each synthesized sample
# output is stored as gpsPredictionPDF (retrievable from kdb through shared python space)
import sys
import numpy as np
import pandas as pd
import sklearn.gaussian_process as gp
from joblib import load # model persistance library
fileName = 'useGPRGPSModel.p'
trainingSetName = 'trainingDataAbove100kph.csv'
comments = 'Using GPR GPS model'
print(comments)
def mse(pred, actual):
return ((pred-actual)**2).mean()
def strFloat(floatVal):
return "{0:.2f}".format(round(floatVal, 2))
# fallback csv training data if not using kdb data (for testing purposes)
# will not be used if kdb input is detected when running this script
csvTrainingData = 'trainingDataAbove100kph.csv'
trainingDataPDFNotFound = False
if 'trainingDataPDF' not in globals():
trainingDataPDFNotFound = True
if trainingDataPDFNotFound == True:
trainingDataPDF = pd.read_csv(csvTrainingData)
print("Testing using csv input!")
# using else or try catch causes bugs with embedpy
if trainingDataPDFNotFound == False:
trainingSetName = "KDB+ Input"
print("Predicting using KDB+ input!")
if 'synthesizedSampleIndex' not in globals():
synthesizedSampleIndex = 0
if 'numSamplesToUse' not in globals():
numSamplesToUse = 10
# train test split only applied when debugging using csv file
trainPercentage = 0.7 # not in use
trainingDataTrain = trainingDataPDF[:int(
trainPercentage*len(trainingDataPDF))] # not in use
model = load('./models/gprGPSSpeedModel.model')
pd.set_option('display.max_rows', None)
# if not importing data from kdb+ (testing purposes)
if 'inputPDF' not in globals():
# train test split only applied when debugging using csv file
inputPDF = trainingDataTrain.copy()
print("importing data from csv source")
# inputPDF = trainingDataTrain.copy()
# select number of samples used for prediction
# COMMENT OUT WHEN USING WITH KDB+!!!
# numSamplesToUse = 1
# inputPDF = inputPDF.tail(numSamplesToUse)
# drop actual throttle data for prediction
inputPDF.drop(['GPSspeedkph'], axis=1, inplace=True)
# inputPDF = inputPDF[['rcCommand0', 'rcCommand1', 'rcCommand3']]
# predict gps speed for range of throttle values
if 'lowThrottle' not in globals():
lowThrottle = 1000
if 'highThrottle' not in globals():
highThrottle = 2000
if 'throttleSteps' not in globals():
throttleSteps = 10
# prepare throttle variations vector and parameters
steps = (highThrottle - lowThrottle) / throttleSteps
steps = int(steps)
highThrottle += 1
throttleInputRange = list(range(lowThrottle, highThrottle, steps))
throttleInputRange.reverse()
# prepare dataframe for throttle variations input
tempPDF = inputPDF.copy()
# print("tempPDF")
# print(tempPDF)
for x in range(throttleSteps):
inputPDF = inputPDF.append(tempPDF)
inputPDF.reset_index(inplace=True)
inputPDF.loc[:, ['rcCommand3']] = 1000
inputPDF.drop(['index'], axis=1, inplace=True)
# print("Throttle range: ")
# for x in range(len(throttleInputRange)):
# print(throttleInputRange[x])
# generate list of input data with varying throttle ranges for speed prediction
for x in range(len(throttleInputRange)):
for y in range(numSamplesToUse):
# print("index: " + str(x*numSamplesToUse+y))
inputPDF.loc[[x*numSamplesToUse+y],
'rcCommand3'] = throttleInputRange[x]
# print("inputPDF columns:")
# print(inputPDF.columns)
# run model to get speed predictions
gpsSpeedPrediction, covMatrix = model.predict(inputPDF, return_cov=True)
print("gpsSpeedPrediction")
print(gpsSpeedPrediction)
# inputPDF merge predicted values to new dataframe
gpsPredictionPDF = pd.merge(inputPDF, pd.Series(data=gpsSpeedPrediction, name='GPSspeedkph'), how='inner', on=None, left_on=None, right_on=None,
left_index=True, right_index=True, sort=False,
suffixes=('_x', '_y'), copy=True, indicator=False,
validate=None)
# label with throttle scenario/throttleInputSequence
throttleInputSequence = list()
synthesizedSampleIndexRef, synthesizedSampleIndex = synthesizedSampleIndex, list()
for x in range(len(throttleInputRange)):
for y in range(numSamplesToUse):
throttleInputSequence.append(x)
synthesizedSampleIndex.append(synthesizedSampleIndexRef)
# print('throttleInputSequence:')
# print(throttleInputSequence)
gpsPredictionPDF = pd.merge(gpsPredictionPDF, pd.Series(data=throttleInputSequence, name='throttleInputSequence'), how='inner', on=None, left_on=None, right_on=None,
left_index=True, right_index=True, sort=False,
suffixes=('_x', '_y'), copy=True, indicator=False,
validate=None)
gpsPredictionPDF = pd.merge(gpsPredictionPDF, pd.Series(data=synthesizedSampleIndex, name='synthesizedSampleIndex'), how='inner', on=None, left_on=None, right_on=None,
left_index=True, right_index=True, sort=False,
suffixes=('_x', '_y'), copy=True, indicator=False,
validate=None)
print('gpsPredictionPDF set')
# print(gpsPredictionPDF)
print('prediction complete!')
# retrieve predictionPDF using KDB+!