package external import ( "encoding/json" "fmt" "io" "os" "os/exec" "strings" "sync" "sync/atomic" "time" log "github.com/sirupsen/logrus" . "git.deuxfleurs.fr/Deuxfleurs/easybridge/connector" ) // Serialization protocol type extMessage struct { // Header: message type and identifier MsgType string `json:"_type"` MsgId uint64 `json:"_id"` // Message fields Key string `json:"key"` Value string `json:"value"` EventId string `json:"event_id"` Error string `json:"error"` Room RoomID `json:"room"` User UserID `json:"user"` } type extMessageWithData struct { extMessage Data interface{} `json:"data"` } // Possible values for MsgType const ( // ezbr -> external CONFIGURE = "configure" GET_USER = "get_user" SET_USER_INFO = "set_user_info" SET_ROOM_INFO = "set_room_info" JOIN = "join" INVITE = "invite" LEAVE = "leave" SEND = "send" CLOSE = "close" // external -> ezbr JOINED = "joined" LEFT = "left" USER_INFO_UPDATED = "user_info_updated" ROOM_INFO_UPDATED = "room_info_updated" EVENT = "event" CACHE_PUT = "cache_put" CACHE_GET = "cache_get" // reply messages // ezbr -> external: all must wait for a reply! // external -> ezbr: only CACHE_GET produces a reply REP_OK = "rep_ok" REP_ERROR = "rep_error" ) // ---- type External struct { handler Handler protocol string command string debug bool config Configuration recv io.Reader send io.Writer sendJson *json.Encoder generation int proc *exec.Cmd counter uint64 inflightRequests map[uint64]chan *extMessageWithData lock sync.Mutex } func (ext *External) SetHandler(h Handler) { ext.handler = h } func (ext *External) Protocol() string { return ext.protocol } func (ext *External) Configure(c Configuration) error { var err error if ext.proc != nil { ext.Close() } ext.inflightRequests = map[uint64]chan *extMessageWithData{} ext.generation += 1 err = ext.setupProc() if err != nil { return err } go ext.restartLoop(ext.generation) _, err = ext.cmd(extMessage{ MsgType: CONFIGURE, }, c) if err != nil { return err } return nil } // ---- Process management and communication logic func (ext *External) setupProc() error { var err error ext.proc = exec.Command(ext.command) ext.recv, err = ext.proc.StdoutPipe() if err != nil { return err } ext.send, err = ext.proc.StdinPipe() if err != nil { return err } if ext.debug { ext.recv = io.TeeReader(ext.recv, os.Stderr) ext.send = io.MultiWriter(ext.send, os.Stderr) } ext.sendJson = json.NewEncoder(ext.send) ext.proc.Stderr = os.Stderr err = ext.proc.Start() if err != nil { return err } go ext.recvLoop() return nil } func (ext *External) restartLoop(generation int) { for { if ext.proc == nil { break } ext.proc.Wait() if ext.generation != generation { break } log.Printf("Process %s stopped, restarting.", ext.command) err := ext.setupProc() if err != nil { ext.proc = nil log.Warnf("Unable to restart %s: %s", ext.command, err) break } } } func (m *extMessageWithData) DecodeJSON(jj []byte) error { var c extMessage err := json.Unmarshal(jj, &c) if err != nil { return err } *m = extMessageWithData{extMessage: c} switch c.MsgType { case USER_INFO_UPDATED: var ui UserInfo err := json.Unmarshal(jj, &ui) if err != nil { return err } m.Data = &ui case ROOM_INFO_UPDATED: var ri RoomInfo err := json.Unmarshal(jj, &ri) if err != nil { return err } m.Data = &ri case EVENT: var ev Event err := json.Unmarshal(jj, &ev) if err != nil { return err } m.Data = &ev } return nil } func (ext *External) recvLoop() { reader := json.NewDecoder(ext.recv) for { var msg extMessageWithData err := reader.Decode(&msg) if err != nil { log.Warnf("Failed to decode from %s: %s. Stopping reading.", ext.command, err) break } if strings.HasPrefix(msg.MsgType, "rep_") { func() { ext.lock.Lock() defer ext.lock.Unlock() if ch, ok := ext.inflightRequests[msg.MsgId]; ok { ch <- &msg delete(ext.inflightRequests, msg.MsgId) } }() } else { ext.handleCmd(&msg) } } } func (ext *External) cmd(msg extMessage, data interface{}) (*extMessageWithData, error) { msg_id := atomic.AddUint64(&ext.counter, 1) msg.MsgId = msg_id fullMsg := extMessageWithData{ extMessage: msg, Data: data, } ch := make(chan *extMessageWithData) func() { ext.lock.Lock() defer ext.lock.Unlock() ext.inflightRequests[msg_id] = ch }() defer func() { ext.lock.Lock() defer ext.lock.Unlock() delete(ext.inflightRequests, msg_id) }() err := ext.sendJson.Encode(&fullMsg) if err != nil { return nil, err } select { case rep := <-ch: if rep.MsgType == REP_ERROR { return nil, fmt.Errorf("%s: %s", msg.MsgType, rep.Error) } else { return rep, nil } case <-time.After(5 * time.Second): return nil, fmt.Errorf("(%s) timeout", msg.MsgType) } } func (ext *External) Close() { ext.sendJson.Encode(&extMessage{ MsgType: CLOSE, }) ext.generation += 1 proc := ext.proc ext.proc = nil ext.recv = nil ext.send = nil ext.sendJson = nil go func() { time.Sleep(10 * time.Second) proc.Process.Kill() }() } // ---- Actual message handling :) func (ext *External) handleCmd(msg *extMessageWithData) { switch msg.MsgType { case JOINED: ext.handler.Joined(msg.Room) case LEFT: ext.handler.Left(msg.Room) case USER_INFO_UPDATED: ext.handler.UserInfoUpdated(msg.User, msg.Data.(*UserInfo)) case ROOM_INFO_UPDATED: ext.handler.RoomInfoUpdated(msg.Room, msg.User, msg.Data.(*RoomInfo)) case EVENT: ext.handler.Event(msg.Data.(*Event)) case CACHE_PUT: ext.handler.CachePut(msg.Key, msg.Value) case CACHE_GET: value := ext.handler.CacheGet(msg.Key) ext.sendJson.Encode(&extMessage{ MsgType: REP_OK, MsgId: msg.MsgId, Value: value, }) } } func (ext *External) User() UserID { rep, err := ext.cmd(extMessage{ MsgType: GET_USER, }, nil) if err != nil { log.Warnf("Unable to get user!") return "" } return rep.User } func (ext *External) SetUserInfo(info *UserInfo) error { _, err := ext.cmd(extMessage{ MsgType: SET_USER_INFO, }, info) return err } func (ext *External) SetRoomInfo(room RoomID, info *RoomInfo) error { _, err := ext.cmd(extMessage{ MsgType: SET_ROOM_INFO, Room: room, }, info) return err } func (ext *External) Join(room RoomID) error { _, err := ext.cmd(extMessage{ MsgType: JOIN, Room: room, }, nil) return err } func (ext *External) Invite(user UserID, room RoomID) error { _, err := ext.cmd(extMessage{ MsgType: LEAVE, User: user, Room: room, }, nil) return err } func (ext *External) Leave(room RoomID) { _, err := ext.cmd(extMessage{ MsgType: LEAVE, Room: room, }, nil) if err != nil { log.Warnf("Could not leave %s: %s", room, err.Error()) } } func (ext *External) Send(event *Event) (string, error) { rep, err := ext.cmd(extMessage{ MsgType: SEND, }, event) if err != nil { return "", err } return rep.EventId, nil }