package external import ( "bufio" "encoding/json" "fmt" "io" "os" "os/exec" "strings" "sync" "sync/atomic" "time" log "github.com/sirupsen/logrus" . "git.deuxfleurs.fr/Deuxfleurs/easybridge/connector" ) // Serialization protocol type extMessage struct { // Header: message type and identifier MsgType string `json:"_type"` MsgId uint64 `json:"_id"` // Message fields Key string `json:"key"` Value string `json:"value"` EventId string `json:"event_id"` Error string `json:"error"` Room RoomID `json:"room"` User UserID `json:"user"` } type extMessageWithData struct { extMessage Data interface{} `json:"data"` } // Possible values for MsgType const ( // ezbr -> external CONFIGURE = "configure" GET_USER = "get_user" SET_USER_INFO = "set_user_info" SET_ROOM_INFO = "set_room_info" JOIN = "join" INVITE = "invite" LEAVE = "leave" SEND = "send" CLOSE = "close" // external -> ezbr JOINED = "joined" LEFT = "left" USER_INFO_UPDATED = "user_info_updated" ROOM_INFO_UPDATED = "room_info_updated" EVENT = "event" CACHE_PUT = "cache_put" CACHE_GET = "cache_get" // reply messages // ezbr -> external: all must wait for a reply! // external -> ezbr: only CACHE_GET produces a reply REP_OK = "rep_ok" REP_ERROR = "rep_error" ) // ---- type External struct { handler Handler protocol string command string debug bool config Configuration recvPipe io.ReadCloser sendPipe io.WriteCloser sendJson *json.Encoder generation int proc *exec.Cmd handlerChan chan *extMessageWithData counter uint64 inflightRequests map[uint64]chan *extMessageWithData lock sync.Mutex } func (ext *External) SetHandler(h Handler) { ext.handler = h } func (ext *External) Protocol() string { return ext.protocol } func (ext *External) Configure(c Configuration) error { var err error if ext.proc != nil { ext.Close() } ext.inflightRequests = map[uint64]chan *extMessageWithData{} ext.generation += 1 ext.handlerChan = make(chan *extMessageWithData, 1000) go ext.handlerLoop(ext.generation) err = ext.setupProc(ext.generation) if err != nil { return err } go ext.restartLoop(ext.generation) _, err = ext.cmd(extMessage{ MsgType: CONFIGURE, }, c) if err != nil { return err } return nil } // ---- Process management and communication logic func (ext *External) setupProc(generation int) error { var err error ext.proc = exec.Command(ext.command) ext.recvPipe, err = ext.proc.StdoutPipe() if err != nil { return err } ext.sendPipe, err = ext.proc.StdinPipe() if err != nil { return err } send := io.Writer(ext.sendPipe) recv := io.Reader(ext.recvPipe) if ext.debug { recv = io.TeeReader(recv, os.Stderr) send = io.MultiWriter(send, os.Stderr) } ext.sendJson = json.NewEncoder(send) ext.proc.Stderr = os.Stderr err = ext.proc.Start() if err != nil { return err } go ext.recvLoop(recv, generation) return nil } func (ext *External) restartLoop(generation int) { for i := 0; i < 2; i++ { if ext.proc == nil { break } ext.proc.Wait() if ext.generation != generation { break } log.Printf("Process %s stopped, restarting.", ext.command) log.Printf("Generation %d vs %d", ext.generation, generation) err := ext.setupProc(generation) if err != nil { ext.proc = nil log.Warnf("Unable to restart %s: %s", ext.command, err) break } } log.Warnf("More than 3 attempts (%s); abandonning.", ext.command) } func (m *extMessageWithData) UnmarshalJSON(jj []byte) error { var c extMessage err := json.Unmarshal(jj, &c) if err != nil { return err } *m = extMessageWithData{extMessage: c} switch c.MsgType { case USER_INFO_UPDATED: var ui struct { Data UserInfo `json:"data"` } err := json.Unmarshal(jj, &ui) if err != nil { return err } m.Data = &ui.Data return nil case ROOM_INFO_UPDATED: var ri struct { Data RoomInfo `json:"data"` } err := json.Unmarshal(jj, &ri) if err != nil { return err } m.Data = &ri.Data return nil case EVENT: var ev struct { Data Event `json:"data"` } err := json.Unmarshal(jj, &ev) if err != nil { return err } m.Data = &ev.Data return nil case JOINED, LEFT, CACHE_PUT, CACHE_GET, REP_OK, REP_ERROR: return nil default: return fmt.Errorf("Invalid message type for message from external program: '%s'", c.MsgType) } } func (ext *External) recvLoop(from io.Reader, generation int) { scanner := bufio.NewScanner(from) for scanner.Scan() { var msg extMessageWithData err := json.Unmarshal(scanner.Bytes(), &msg) if err != nil { log.Warnf("Failed to decode from %s: %s. Skipping line.", ext.command, err.Error()) continue } if scanner.Err() != nil { log.Warnf("Failed to read from %s: %s. Stopping here.", ext.command, scanner.Err().Error()) break } log.Debugf("GOT MESSAGE: %#v %#v", msg, msg.Data) if strings.HasPrefix(msg.MsgType, "rep_") { func() { ext.lock.Lock() defer ext.lock.Unlock() if ch, ok := ext.inflightRequests[msg.MsgId]; ok { ch <- &msg delete(ext.inflightRequests, msg.MsgId) } }() } else { ext.handlerChan <- &msg } if ext.generation != generation { break } } } func (ext *External) handlerLoop(generation int) { for ext.handlerChan != nil && ext.generation == generation { select { case msg := <-ext.handlerChan: ext.handleCmd(msg) case <-time.After(10 * time.Second): } } } func (ext *External) cmd(msg extMessage, data interface{}) (*extMessageWithData, error) { msg_id := atomic.AddUint64(&ext.counter, 1) msg.MsgId = msg_id fullMsg := extMessageWithData{ extMessage: msg, Data: data, } ch := make(chan *extMessageWithData) func() { ext.lock.Lock() defer ext.lock.Unlock() ext.inflightRequests[msg_id] = ch }() defer func() { ext.lock.Lock() defer ext.lock.Unlock() delete(ext.inflightRequests, msg_id) }() err := ext.sendJson.Encode(&fullMsg) if err != nil { return nil, err } select { case rep := <-ch: if rep.MsgType == REP_ERROR { return nil, fmt.Errorf("%s: %s", msg.MsgType, rep.Error) } else { return rep, nil } case <-time.After(30 * time.Second): return nil, fmt.Errorf("(%s) timeout", msg.MsgType) } } func (ext *External) Close() { ext.generation += 1 ext.sendJson.Encode(&extMessage{ MsgType: CLOSE, }) ext.proc.Process.Signal(os.Interrupt) ext.recvPipe.Close() ext.sendPipe.Close() go func() { time.Sleep(1 * time.Second) log.Info("Sending SIGKILL to external process (did not terminate within 1 second)") ext.proc.Process.Kill() }() ext.proc.Wait() ext.proc = nil ext.recvPipe = nil ext.sendPipe = nil ext.sendJson = nil ext.handlerChan = nil } // ---- Actual message handling :) func (ext *External) handleCmd(msg *extMessageWithData) { switch msg.MsgType { case JOINED: ext.handler.Joined(msg.Room) case LEFT: ext.handler.Left(msg.Room) case USER_INFO_UPDATED: ext.handler.UserInfoUpdated(msg.User, msg.Data.(*UserInfo)) case ROOM_INFO_UPDATED: ext.handler.RoomInfoUpdated(msg.Room, msg.User, msg.Data.(*RoomInfo)) case EVENT: ext.handler.Event(msg.Data.(*Event)) case CACHE_PUT: ext.handler.CachePut(msg.Key, msg.Value) case CACHE_GET: value := ext.handler.CacheGet(msg.Key) ext.sendJson.Encode(&extMessage{ MsgType: REP_OK, MsgId: msg.MsgId, Key: msg.Key, Value: value, }) } } func (ext *External) User() UserID { rep, err := ext.cmd(extMessage{ MsgType: GET_USER, }, nil) if err != nil { log.Warnf("Unable to get user! %s", err.Error()) return "" } return rep.User } func (ext *External) SetUserInfo(info *UserInfo) error { _, err := ext.cmd(extMessage{ MsgType: SET_USER_INFO, }, info) return err } func (ext *External) SetRoomInfo(room RoomID, info *RoomInfo) error { _, err := ext.cmd(extMessage{ MsgType: SET_ROOM_INFO, Room: room, }, info) return err } func (ext *External) Join(room RoomID) error { _, err := ext.cmd(extMessage{ MsgType: JOIN, Room: room, }, nil) return err } func (ext *External) Invite(user UserID, room RoomID) error { _, err := ext.cmd(extMessage{ MsgType: INVITE, User: user, Room: room, }, nil) return err } func (ext *External) Leave(room RoomID) { _, err := ext.cmd(extMessage{ MsgType: LEAVE, Room: room, }, nil) if err != nil { log.Warnf("Could not leave %s: %s", room, err.Error()) } } func (ext *External) Send(event *Event) (string, error) { rep, err := ext.cmd(extMessage{ MsgType: SEND, }, event) if err != nil { return "", err } return rep.EventId, nil }