From 72a19a996b92dd8312777e53e31b7e421db1a931 Mon Sep 17 00:00:00 2001 From: Dennis Adjei-Baah Date: Sat, 24 Feb 2018 13:11:59 -0800 Subject: [PATCH] initial commit --- .gitignore | 1 + car/main.go | 205 +++++++++ car/routes.json | 19 + gen/master-station.pb.go | 409 ++++++++++++++++++ master-station/aggregator/aggregator.go | 87 ++++ master-station/main.go | 79 ++++ .../public-api/master-station-api.go | 78 ++++ proto/master-station.proto | 33 ++ 8 files changed, 911 insertions(+) create mode 100644 .gitignore create mode 100644 car/main.go create mode 100644 car/routes.json create mode 100644 gen/master-station.pb.go create mode 100644 master-station/aggregator/aggregator.go create mode 100644 master-station/main.go create mode 100644 master-station/public-api/master-station-api.go create mode 100644 proto/master-station.proto diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..485dee6 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.idea diff --git a/car/main.go b/car/main.go new file mode 100644 index 0000000..41c03f3 --- /dev/null +++ b/car/main.go @@ -0,0 +1,205 @@ +package main + +import ( + "log" + "google.golang.org/grpc" + pb "github.com/vds/gen" + "context" + "os" + "time" + "flag" + "io/ioutil" + "encoding/json" + "fmt" + "errors" + "math/rand" +) + +var trainId string +var line string +var direction bool +var inService bool + +type Stop struct { + Name string + NextStop int +} +type Line struct { + LineName string + Stops []Stop +} + +type Data struct { + Routes []Line +} + +type Train struct { + status string + trainId string + line Line + direction bool + stopNum int + holdCount int + speed float64 + seconds float64 + distanceToNextStop int + minutesToNextStop int32 +} + +func (t *Train) printRoute() string { + return fmt.Sprintf("Line %s. Heading %s. %s => %s station", + t.line.LineName, + determineDirection(t.direction), + t.status, + t.line.Stops[t.stopNum].Name) +} +func (t *Train) ping() *pb.TrainPing { + + return &pb.TrainPing{ + TrainId: t.trainId, + Route: t.printRoute(), + Status: t.status, + Speed: t.speed, + NextStop: float64(t.distanceToNextStop), + MinutesToNextStop: t.minutesToNextStop, + Line: t.line.LineName, + Direction: determineDirection(t.direction), + NextStation: t.line.Stops[t.stopNum].Name, + Timestamp: 0, + } +} + +func (t *Train) accelerate() { + max := 12.0 + a := 2.0 + distanceTraveled := (t.speed * t.seconds) + (0.5 * a * (t.seconds + 1*t.seconds + 1)) + t.distanceToNextStop -= int(distanceTraveled) + t.speed += distanceTraveled / float64(1.0) + if t.speed > max { + t.speed = max + } + t.minutesToNextStop = int32(t.distanceToNextStop / int(t.speed)) + t.seconds += 1 + +} + +func (t *Train) moveTrain() { + if t.status == "NOT_IN_SERVICE" { + t.status = "IN_TRANSIT" + t.distanceToNextStop = t.line.Stops[t.stopNum].NextStop + } else if t.status == "IN_TRANSIT" { + t.accelerate() + if t.distanceToNextStop <= 0 { + t.status = "HOLDING" + t.distanceToNextStop = 0 + t.speed = 0 + t.seconds = 0 + t.minutesToNextStop = 0 + t.holdCount = rand.Intn(3) + } + } else if t.status == "HOLDING" { + if t.holdCount != 0 { + t.holdCount -= 1 + } else { + t.status = "DISEMBARK" + } + + } else if t.status == "DISEMBARK" { + t.status = "IN_TRANSIT" + if t.stopNum < len(t.line.Stops)-1 && t.direction { + t.stopNum += 1 + } else if t.stopNum > 0 && !t.direction { + t.stopNum -= 1 + } else { + if t.direction { + t.stopNum = len(t.line.Stops) - 1 + } else { + t.stopNum = 0 + } + t.direction = !t.direction + time.Sleep(10 * time.Second) + } + t.distanceToNextStop = t.line.Stops[t.stopNum].NextStop + } + time.Sleep(5 * time.Second) +} + +func determineDirection(direction bool) string { + if direction { + return "INBOUND" + } else { + return "OUTBOUND" + } +} + +func findLine(routes []Line, lineName string) (Line, error) { + for i, r := range routes { + if r.LineName == lineName { + return routes[i], nil + } + } + return Line{}, errors.New("line not found") +} + +func runTrain(client pb.MasterServiceClient) error { + var data Data + dat, err := ioutil.ReadFile("./car/routes.json") + if err != nil { + log.Fatalf("Error reading file %+v", err) + return err + } + err = json.Unmarshal(dat, &data) + if err != nil { + log.Fatalf("Error parsing json %+v", err) + } + selectedLine, err := findLine(data.Routes, line) + if err != nil { + log.Fatalf("Unable to find route information") + return err + } + + train := &Train{ + trainId: trainId, + line: selectedLine, + status: "NOT_IN_SERVICE", + direction: direction, + } + + stream, err := client.ReceiveTrainPing(context.Background()) + if err != nil { + log.Printf("failed with %+v", err) + return err + } + for { + train.moveTrain() + ping := train.ping() + log.Println("Ping sent: ", ping.TrainId, ping.Route, ping.Speed, ping.NextStop, ping.MinutesToNextStop) + err := stream.Send(ping) + if err != nil { + log.Fatalf("failed with %+v", err) + stream.CloseSend() + break + } + } + return err +} + +func main() { + flag.StringVar(&trainId, "trainId", "1593", "The Id for the train") + flag.StringVar(&line, "line", "K", "The line the train is on") + flag.BoolVar(&direction, "direction", true, "Direction true for inbound false for outbound") + flag.BoolVar(&inService, "inService", false, "Train is in or out of service") + var opts []grpc.DialOption + + opts = append(opts, grpc.WithInsecure()) + conn, err := grpc.Dial("localhost:8881", opts...) + if err != nil { + log.Fatalf("fail to dial: %v", err) + } + defer conn.Close() + client := pb.NewMasterServiceClient(conn) + err = runTrain(client) + if err != nil { + os.Exit(1) + } +} diff --git a/car/routes.json b/car/routes.json new file mode 100644 index 0000000..6f2358f --- /dev/null +++ b/car/routes.json @@ -0,0 +1,19 @@ +{ + "routes": [ + { + "lineName": "K", + "stops": [ + {"name": "Overlook", "nextStop":300}, + {"name":"Fordham", "nextStop":200}, + {"name":"Devonshire","nextStop":500}, + {"name":"Oklahoma","nextStop":300}, + {"name":"Litteton","nextStop":100}, + {"name":"Oak Valley","nextStop":600}, + {"name":"Woodsman","nextStop":200}, + {"name":"Brookings", "nextStop":300}, + {"name":"Glenwood", "nextStop":1000}, + {"name":"South Poplar", "nextStop":200} + ] + } + ] +} \ No newline at end of file diff --git a/gen/master-station.pb.go b/gen/master-station.pb.go new file mode 100644 index 0000000..8ee6538 --- /dev/null +++ b/gen/master-station.pb.go @@ -0,0 +1,409 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: master-station.proto + +/* +Package master_station is a generated protocol buffer package. + +It is generated from these files: + master-station.proto + +It has these top-level messages: + LineRequest + LineInfo + TrainPing + Empty +*/ +package master_station + +import proto "github.com/golang/protobuf/proto" +import fmt "fmt" +import math "math" + +import ( + context "golang.org/x/net/context" + grpc "google.golang.org/grpc" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package + +type LineRequest struct { + Line string `protobuf:"bytes,1,opt,name=line" json:"line,omitempty"` + TrainType string `protobuf:"bytes,2,opt,name=trainType" json:"trainType,omitempty"` +} + +func (m *LineRequest) Reset() { *m = LineRequest{} } +func (m *LineRequest) String() string { return proto.CompactTextString(m) } +func (*LineRequest) ProtoMessage() {} +func (*LineRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } + +func (m *LineRequest) GetLine() string { + if m != nil { + return m.Line + } + return "" +} + +func (m *LineRequest) GetTrainType() string { + if m != nil { + return m.TrainType + } + return "" +} + +type LineInfo struct { + TrainType string `protobuf:"bytes,1,opt,name=trainType" json:"trainType,omitempty"` +} + +func (m *LineInfo) Reset() { *m = LineInfo{} } +func (m *LineInfo) String() string { return proto.CompactTextString(m) } +func (*LineInfo) ProtoMessage() {} +func (*LineInfo) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } + +func (m *LineInfo) GetTrainType() string { + if m != nil { + return m.TrainType + } + return "" +} + +type TrainPing struct { + TrainId string `protobuf:"bytes,1,opt,name=trainId" json:"trainId,omitempty"` + Route string `protobuf:"bytes,2,opt,name=route" json:"route,omitempty"` + Status string `protobuf:"bytes,3,opt,name=status" json:"status,omitempty"` + Speed float64 `protobuf:"fixed64,4,opt,name=speed" json:"speed,omitempty"` + NextStop float64 `protobuf:"fixed64,5,opt,name=nextStop" json:"nextStop,omitempty"` + MinutesToNextStop int32 `protobuf:"varint,6,opt,name=minutesToNextStop" json:"minutesToNextStop,omitempty"` + Line string `protobuf:"bytes,7,opt,name=line" json:"line,omitempty"` + Direction string `protobuf:"bytes,8,opt,name=direction" json:"direction,omitempty"` + NextStation string `protobuf:"bytes,9,opt,name=nextStation" json:"nextStation,omitempty"` + Timestamp int64 `protobuf:"varint,10,opt,name=timestamp" json:"timestamp,omitempty"` +} + +func (m *TrainPing) Reset() { *m = TrainPing{} } +func (m *TrainPing) String() string { return proto.CompactTextString(m) } +func (*TrainPing) ProtoMessage() {} +func (*TrainPing) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} } + +func (m *TrainPing) GetTrainId() string { + if m != nil { + return m.TrainId + } + return "" +} + +func (m *TrainPing) GetRoute() string { + if m != nil { + return m.Route + } + return "" +} + +func (m *TrainPing) GetStatus() string { + if m != nil { + return m.Status + } + return "" +} + +func (m *TrainPing) GetSpeed() float64 { + if m != nil { + return m.Speed + } + return 0 +} + +func (m *TrainPing) GetNextStop() float64 { + if m != nil { + return m.NextStop + } + return 0 +} + +func (m *TrainPing) GetMinutesToNextStop() int32 { + if m != nil { + return m.MinutesToNextStop + } + return 0 +} + +func (m *TrainPing) GetLine() string { + if m != nil { + return m.Line + } + return "" +} + +func (m *TrainPing) GetDirection() string { + if m != nil { + return m.Direction + } + return "" +} + +func (m *TrainPing) GetNextStation() string { + if m != nil { + return m.NextStation + } + return "" +} + +func (m *TrainPing) GetTimestamp() int64 { + if m != nil { + return m.Timestamp + } + return 0 +} + +type Empty struct { +} + +func (m *Empty) Reset() { *m = Empty{} } +func (m *Empty) String() string { return proto.CompactTextString(m) } +func (*Empty) ProtoMessage() {} +func (*Empty) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} } + +func init() { + proto.RegisterType((*LineRequest)(nil), "LineRequest") + proto.RegisterType((*LineInfo)(nil), "LineInfo") + proto.RegisterType((*TrainPing)(nil), "TrainPing") + proto.RegisterType((*Empty)(nil), "Empty") +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion4 + +// Client API for MasterService service + +type MasterServiceClient interface { + ReceiveTrainPing(ctx context.Context, opts ...grpc.CallOption) (MasterService_ReceiveTrainPingClient, error) +} + +type masterServiceClient struct { + cc *grpc.ClientConn +} + +func NewMasterServiceClient(cc *grpc.ClientConn) MasterServiceClient { + return &masterServiceClient{cc} +} + +func (c *masterServiceClient) ReceiveTrainPing(ctx context.Context, opts ...grpc.CallOption) (MasterService_ReceiveTrainPingClient, error) { + stream, err := grpc.NewClientStream(ctx, &_MasterService_serviceDesc.Streams[0], c.cc, "/MasterService/ReceiveTrainPing", opts...) + if err != nil { + return nil, err + } + x := &masterServiceReceiveTrainPingClient{stream} + return x, nil +} + +type MasterService_ReceiveTrainPingClient interface { + Send(*TrainPing) error + CloseAndRecv() (*Empty, error) + grpc.ClientStream +} + +type masterServiceReceiveTrainPingClient struct { + grpc.ClientStream +} + +func (x *masterServiceReceiveTrainPingClient) Send(m *TrainPing) error { + return x.ClientStream.SendMsg(m) +} + +func (x *masterServiceReceiveTrainPingClient) CloseAndRecv() (*Empty, error) { + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + m := new(Empty) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// Server API for MasterService service + +type MasterServiceServer interface { + ReceiveTrainPing(MasterService_ReceiveTrainPingServer) error +} + +func RegisterMasterServiceServer(s *grpc.Server, srv MasterServiceServer) { + s.RegisterService(&_MasterService_serviceDesc, srv) +} + +func _MasterService_ReceiveTrainPing_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(MasterServiceServer).ReceiveTrainPing(&masterServiceReceiveTrainPingServer{stream}) +} + +type MasterService_ReceiveTrainPingServer interface { + SendAndClose(*Empty) error + Recv() (*TrainPing, error) + grpc.ServerStream +} + +type masterServiceReceiveTrainPingServer struct { + grpc.ServerStream +} + +func (x *masterServiceReceiveTrainPingServer) SendAndClose(m *Empty) error { + return x.ServerStream.SendMsg(m) +} + +func (x *masterServiceReceiveTrainPingServer) Recv() (*TrainPing, error) { + m := new(TrainPing) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +var _MasterService_serviceDesc = grpc.ServiceDesc{ + ServiceName: "MasterService", + HandlerType: (*MasterServiceServer)(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "ReceiveTrainPing", + Handler: _MasterService_ReceiveTrainPing_Handler, + ClientStreams: true, + }, + }, + Metadata: "master-station.proto", +} + +// Client API for CarService service + +type CarServiceClient interface { + SendTrainPing(ctx context.Context, opts ...grpc.CallOption) (CarService_SendTrainPingClient, error) +} + +type carServiceClient struct { + cc *grpc.ClientConn +} + +func NewCarServiceClient(cc *grpc.ClientConn) CarServiceClient { + return &carServiceClient{cc} +} + +func (c *carServiceClient) SendTrainPing(ctx context.Context, opts ...grpc.CallOption) (CarService_SendTrainPingClient, error) { + stream, err := grpc.NewClientStream(ctx, &_CarService_serviceDesc.Streams[0], c.cc, "/CarService/SendTrainPing", opts...) + if err != nil { + return nil, err + } + x := &carServiceSendTrainPingClient{stream} + return x, nil +} + +type CarService_SendTrainPingClient interface { + Send(*TrainPing) error + CloseAndRecv() (*Empty, error) + grpc.ClientStream +} + +type carServiceSendTrainPingClient struct { + grpc.ClientStream +} + +func (x *carServiceSendTrainPingClient) Send(m *TrainPing) error { + return x.ClientStream.SendMsg(m) +} + +func (x *carServiceSendTrainPingClient) CloseAndRecv() (*Empty, error) { + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + m := new(Empty) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// Server API for CarService service + +type CarServiceServer interface { + SendTrainPing(CarService_SendTrainPingServer) error +} + +func RegisterCarServiceServer(s *grpc.Server, srv CarServiceServer) { + s.RegisterService(&_CarService_serviceDesc, srv) +} + +func _CarService_SendTrainPing_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(CarServiceServer).SendTrainPing(&carServiceSendTrainPingServer{stream}) +} + +type CarService_SendTrainPingServer interface { + SendAndClose(*Empty) error + Recv() (*TrainPing, error) + grpc.ServerStream +} + +type carServiceSendTrainPingServer struct { + grpc.ServerStream +} + +func (x *carServiceSendTrainPingServer) SendAndClose(m *Empty) error { + return x.ServerStream.SendMsg(m) +} + +func (x *carServiceSendTrainPingServer) Recv() (*TrainPing, error) { + m := new(TrainPing) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +var _CarService_serviceDesc = grpc.ServiceDesc{ + ServiceName: "CarService", + HandlerType: (*CarServiceServer)(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "SendTrainPing", + Handler: _CarService_SendTrainPing_Handler, + ClientStreams: true, + }, + }, + Metadata: "master-station.proto", +} + +func init() { proto.RegisterFile("master-station.proto", fileDescriptor0) } + +var fileDescriptor0 = []byte{ + // 318 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x84, 0x52, 0xcf, 0x4b, 0xfb, 0x30, + 0x14, 0x27, 0xdb, 0xda, 0xad, 0x6f, 0x0c, 0xbe, 0xdf, 0x30, 0x24, 0x0c, 0x0f, 0x65, 0x20, 0xe4, + 0xa0, 0x3b, 0x6c, 0x27, 0x4f, 0x1e, 0xc4, 0xc3, 0x40, 0x45, 0xba, 0xfd, 0x03, 0x75, 0x7d, 0x4a, + 0xc0, 0x26, 0x31, 0x79, 0x1d, 0xee, 0x0f, 0xf2, 0xff, 0x94, 0xa6, 0x5b, 0xe7, 0xf4, 0xe0, 0xed, + 0x7d, 0x7e, 0xf5, 0x35, 0x9f, 0x04, 0xc6, 0x65, 0xee, 0x09, 0xdd, 0x95, 0xa7, 0x9c, 0x94, 0xd1, + 0x33, 0xeb, 0x0c, 0x99, 0xe9, 0x0d, 0x0c, 0xef, 0x95, 0xc6, 0x0c, 0xdf, 0x2b, 0xf4, 0xc4, 0x39, + 0xf4, 0xde, 0x94, 0x46, 0xc1, 0x52, 0x26, 0x93, 0x2c, 0xcc, 0xfc, 0x1c, 0x12, 0x72, 0xb9, 0xd2, + 0xeb, 0x9d, 0x45, 0xd1, 0x09, 0xc2, 0x91, 0x98, 0x4a, 0x18, 0xd4, 0x1f, 0x58, 0xea, 0x17, 0x73, + 0xea, 0x64, 0x3f, 0x9d, 0x9f, 0x1d, 0x48, 0xd6, 0x35, 0x7a, 0x52, 0xfa, 0x95, 0x0b, 0xe8, 0x07, + 0x69, 0x59, 0xec, 0x9d, 0x07, 0xc8, 0xc7, 0x10, 0x39, 0x53, 0xd1, 0x61, 0x57, 0x03, 0xf8, 0x19, + 0xc4, 0xf5, 0x9f, 0x57, 0x5e, 0x74, 0x03, 0xbd, 0x47, 0xb5, 0xdb, 0x5b, 0xc4, 0x42, 0xf4, 0x52, + 0x26, 0x59, 0xd6, 0x00, 0x3e, 0x81, 0x81, 0xc6, 0x0f, 0x5a, 0x91, 0xb1, 0x22, 0x0a, 0x42, 0x8b, + 0xf9, 0x25, 0xfc, 0x2f, 0x95, 0xae, 0x08, 0xfd, 0xda, 0x3c, 0x1e, 0x4c, 0x71, 0xca, 0x64, 0x94, + 0xfd, 0x16, 0xda, 0x46, 0xfa, 0xa7, 0x8d, 0x14, 0xca, 0xe1, 0xa6, 0xee, 0x51, 0x0c, 0x9a, 0x73, + 0xb6, 0x04, 0x4f, 0x61, 0xd8, 0xec, 0x0a, 0x3d, 0x8b, 0x24, 0xe8, 0xdf, 0xa9, 0xd0, 0x93, 0x2a, + 0xd1, 0x53, 0x5e, 0x5a, 0x01, 0x29, 0x93, 0xdd, 0xec, 0x48, 0x4c, 0xfb, 0x10, 0xdd, 0x95, 0x96, + 0x76, 0xf3, 0x6b, 0x18, 0x3d, 0x84, 0x3b, 0x5b, 0xa1, 0xdb, 0xaa, 0x0d, 0x72, 0x09, 0xff, 0x32, + 0xdc, 0xa0, 0xda, 0xe2, 0xb1, 0x47, 0x98, 0xb5, 0xf3, 0x24, 0x9e, 0x85, 0xa0, 0x64, 0xf3, 0x05, + 0xc0, 0x6d, 0xde, 0xe6, 0x2e, 0x60, 0xb4, 0x42, 0x5d, 0xfc, 0x11, 0x7a, 0x8e, 0xc3, 0x93, 0x58, + 0x7c, 0x05, 0x00, 0x00, 0xff, 0xff, 0xd1, 0x0e, 0x05, 0x93, 0x2a, 0x02, 0x00, 0x00, +} diff --git a/master-station/aggregator/aggregator.go b/master-station/aggregator/aggregator.go new file mode 100644 index 0000000..79b941c --- /dev/null +++ b/master-station/aggregator/aggregator.go @@ -0,0 +1,87 @@ +package aggregator + +import ( + "gopkg.in/mgo.v2" + "time" + "fmt" + pb "github.com/vds/gen" + "gopkg.in/mgo.v2/bson" + "log" +) + +type TrainSchedule struct { + trainId string + minutesOut int32 +} + +type StationSchedule struct { + StationName string + Inbound []TrainSchedule + Outbound []TrainSchedule +} + +type Key struct { + Line string `bson:"line"` + Direction string `bson:"direction"` + Trainid string `bson:"trainId"` + Nextstation string `bson:"nextstation"` +} + +type GroupResult struct { + Id Key `bson:"_id"` + Pings []pb.TrainPing `bson:"pings"` +} + +//func createSchedule(trains []GroupResult, schedule *map[string]*StationSchedule) { +// for _, train := range trains { +// if val, ok := schedule[train.Id.Nextstation]; ok { +// if train.Id.Direction == "INBOUND" { +// if len(train.Pings) > 0 { +// val.Inbound = append(val.Inbound, TrainSchedule{train.Id.Trainid, train.Pings[0].MinutesToNextStop}) +// } +// } +// if train.Id.Direction == "OUTBOUND"{ +// if len(train.Pings) > 0 { +// val.Outbound = append(val.Outbound, TrainSchedule{train.Id.Trainid, train.Pings[0].MinutesToNextStop}) +// } +// } +// } else { +// stationSched := &StationSchedule{StationName:train.Id.Nextstation} +// //schedule[train.Id.Nextstation] = +// if train.Id.Direction == "INBOUND" { +// if len(train.Pings) > 0 { +// stationSched.Inbound = append(stationSched.Inbound, TrainSchedule{train.Id.Trainid, train.Pings[0].MinutesToNextStop}) +// } +// } +// if train.Id.Direction == "OUTBOUND"{ +// if len(train.Pings) > 0 { +// stationSched.Outbound = append(stationSched.Outbound, TrainSchedule{train.Id.Trainid, train.Pings[0].MinutesToNextStop}) +// } +// } +// schedule[train.Id.Nextstation] = stationSched +// } +// } +// fmt.Println(schedule) +// +//} + +func Aggregate(s *mgo.Session) { + ticker := time.NewTicker(10 * time.Second) + go func() { + defer s.Close() + for t := range ticker.C { + fmt.Println(t) + trains := make([]GroupResult, 0) + trainCollection := s.DB("test").C("trains") + err := trainCollection.Pipe([]bson.M{{"$sort": bson.M{"timestamp": -1}}, + {"$group": bson.M{"_id": bson.M{"line": "$line", "direction": "$direction", "trainid": "$trainid", "nextstation": "$nextstation"}, "pings": bson.M{"$push": "$$ROOT"}}}}). + All(&trains) + if err != nil { + log.Fatalln("Error: ", err) + } + //var schedule = make(map[string]*StationSchedule) + //createSchedule(trains, &schedule) + fmt.Printf("Result: %+v", trains) + } + }() +} diff --git a/master-station/main.go b/master-station/main.go new file mode 100644 index 0000000..b3193f2 --- /dev/null +++ b/master-station/main.go @@ -0,0 +1,79 @@ +package main + +import ( + pb "github.com/vds/gen" + "io" + "fmt" + "net" + "log" + "google.golang.org/grpc" + "gopkg.in/mgo.v2" + "time" + "github.com/vds/master-station/public-api" + "github.com/vds/master-station/aggregator" + "gopkg.in/mgo.v2/bson" +) + +type masterServiceServer struct { + session *mgo.Session + trains *mgo.Collection +} + +func (m *masterServiceServer) ReceiveTrainPing(stream pb.MasterService_ReceiveTrainPingServer) error { + for { + ping, err := stream.Recv() + if err == io.EOF { + stream.SendAndClose(&pb.Empty{}) + log.Println("Lost connection to train") + } + if err != nil { + log.Printf("stream closed %+v", err) + return err + } + log.Printf("Pinged! %+v", ping) + go func() { + ping.Timestamp = time.Now().UTC().Unix() + err = m.trains.Insert(ping) + if err != nil { + log.Printf("DB operation failed %+v", err) + } + if ping.Status == "DISEMBARK"{ + info, err := m.trains.RemoveAll(bson.M{ + "trainid": ping.TrainId, + "timestamp": + bson.M{ + "$lte": ping.Timestamp, + }}) + if err != nil { + log.Printf("DB operation failed %+v", err) + } + log.Printf("DB Info: %+v", info) + } + }() + } +} + +func main() { + lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", 8881)) + if err != nil { + log.Fatalf("failed to listen: %v", err) + } + session, err := mgo.Dial("localhost:27017") + + if err != nil { + log.Fatalf("failed to listen: %v", err) + } + server := &masterServiceServer{ + session: session, + trains: session.DB("test").C("trains"), + } + go func() { + public_api.ServeHTTP() + }() + go func(){ + aggregator.Aggregate(session.Copy()) + }() + grpcServer := grpc.NewServer() + pb.RegisterMasterServiceServer(grpcServer, server) + grpcServer.Serve(lis) +} diff --git a/master-station/public-api/master-station-api.go b/master-station/public-api/master-station-api.go new file mode 100644 index 0000000..0540a0c --- /dev/null +++ b/master-station/public-api/master-station-api.go @@ -0,0 +1,78 @@ +package public_api + +import ( + "github.com/gorilla/mux" + "log" + "net/http" + "gopkg.in/mgo.v2" + "gopkg.in/mgo.v2/bson" + "encoding/json" + "github.com/vds/gen" +) + +var trainCollection *mgo.Collection +var routeCollection *mgo.Collection + +type JsonError struct { + error string + details string +} +type Stop struct { + Name string + NextStop int32 `bson:"nextStop"` +} +type Line struct { + LineName string `json:"lineName" bson:"lineName"` + Stops []Stop +} + +func GetStationSchedule(w http.ResponseWriter, r *http.Request) { + params := mux.Vars(r) + w.Header().Set("Access-Control-Allow-Origin", "*") + query := bson.M{ + "nextstation": params["station"], + "line": params["line"], + "direction": params["direction"], + } + result := master_station.TrainPing{} + err := trainCollection.Find(query).Sort("-timestamp").Limit(1).One(&result) + if err != nil { + w.WriteHeader(http.StatusNotFound) + } + dat, jsonErr := json.Marshal(result) + if jsonErr != nil { + w.WriteHeader(http.StatusInternalServerError) + } + w.Write(dat) +} + +func HandleRoutes(w http.ResponseWriter, r *http.Request) { + result := []Line{} + w.Header().Set("Access-Control-Allow-Origin", "*") + err := routeCollection.Find(bson.M{}).All(&result) + if err != nil { + w.WriteHeader(http.StatusNotFound) + w.Write(make([]byte, 0)) + } + dat, jsonErr := json.Marshal(result) + if jsonErr != nil { + w.WriteHeader(http.StatusInternalServerError) + } + w.Write(dat) +} + + + +func ServeHTTP() { + session, err := mgo.Dial("localhost:27017") + if err != nil { + log.Fatalf("Unable to start API due to DB Error: %+v", err) + } + trainCollection = session.DB("test").C("trains") + routeCollection = session.DB("test").C("routes") + router := mux.NewRouter() + router.HandleFunc("/station", GetStationSchedule).Methods("GET"). + Queries("station", "{station}", "line", "{line}", "direction", "{direction}") + router.HandleFunc("/routes", HandleRoutes).Methods("GET") + log.Println(http.ListenAndServe(":8888", router)) +} diff --git a/proto/master-station.proto b/proto/master-station.proto new file mode 100644 index 0000000..93a7197 --- /dev/null +++ b/proto/master-station.proto @@ -0,0 +1,33 @@ +syntax = 'proto3'; + +message LineRequest { + string line = 1; + string trainType = 2; +} + +message LineInfo { + string trainType = 1; +} + +message TrainPing { + string trainId = 1; + string route = 2; + string status = 3; + double speed = 4; + double nextStop = 5; + int32 minutesToNextStop = 6; + string line = 7; + string direction = 8; + string nextStation = 9; + int64 timestamp = 10; +} + +message Empty {} + +service MasterService { + rpc ReceiveTrainPing (stream TrainPing) returns (Empty); +} + +service CarService { + rpc SendTrainPing (stream TrainPing) returns (Empty); +} \ No newline at end of file