package mattermost import ( "encoding/json" "fmt" "io/ioutil" "net/http" _ "os" "strings" "sync" "time" "github.com/42wim/matterbridge/matterclient" "github.com/mattermost/mattermost-server/model" log "github.com/sirupsen/logrus" . "git.deuxfleurs.fr/Deuxfleurs/easybridge/connector" ) // User id format: nickname@server // Room id format: room_name@team@server type Mattermost struct { handler Handler server string username string teams map[string]bool initial_members int // How many room members (maximum) to load when first joining a channel initial_backlog int // How many previous messages (maximum) to load when first joining a channel conn *matterclient.MMClient handlerStopChan chan bool caches mmCaches } type mmCaches struct { sync.Mutex mmusers map[string]string // map mm username to mm user id sentjoined map[string]bool // map username/room name to bool displayname map[UserID]string // map username to last displayname initsynced map[string]bool // chans for which init sync has been done } func (mm *Mattermost) SetHandler(h Handler) { mm.handler = h } func (mm *Mattermost) Protocol() string { return MATTERMOST_PROTOCOL } func (mm *Mattermost) Configure(c Configuration) error { if mm.conn != nil { mm.Close() } // Reinitialize shared data structures mm.handlerStopChan = make(chan bool, 1) mm.caches.mmusers = make(map[string]string) mm.caches.sentjoined = make(map[string]bool) mm.caches.displayname = make(map[UserID]string) mm.caches.initsynced = make(map[string]bool) // Read config var err error mm.server, err = c.GetString("server") if err != nil { return err } mm.username, err = c.GetString("username") if err != nil { return err } mm.initial_members, err = c.GetInt("initial_members", 100) if err != nil { return err } mm.initial_backlog, err = c.GetInt("initial_backlog", 1000) if err != nil { return err } teams, err := c.GetString("teams") if err != nil { return err } mm.teams = map[string]bool{} anyteam := "" for _, team := range strings.Split(teams, ",") { anyteam = strings.TrimSpace(team) mm.teams[anyteam] = true } notls, err := c.GetBool("no_tls", false) if err != nil { return err } password, _ := c.GetString("password", "") token, _ := c.GetString("token", "") if token != "" { password = "token=" + token } // Try to log in mm.conn = matterclient.New(mm.username, password, anyteam, mm.server) mm.conn.Credentials.NoTLS = notls err = mm.conn.Login() if err != nil { return err } // Try to start listening for messages // Everytime the listener reconnects, mm.handleConnected does a sync of room status mm.conn.OnWsConnect = mm.handleConnected go mm.conn.WsReceiver() go mm.conn.StatusLoop() go mm.handleLoop(mm.conn.MessageChan, mm.handlerStopChan) return nil } func (mm *Mattermost) User() UserID { return UserID(mm.username + "@" + mm.server) } func (mm *Mattermost) getTeamIdByName(name string) string { for _, team := range mm.conn.OtherTeams { if team.Team.Name == name { return team.Id } } return "" } func (mm *Mattermost) checkRoomId(id RoomID) (string, error) { x := strings.Split(string(id), "@") if len(x) == 1 { return "", fmt.Errorf("Please write whole room ID with team and server: %s@@%s", id, mm.server) } if len(x) == 2 { return x[0], nil } if len(x) != 3 || x[2] != mm.server { return "", fmt.Errorf("Invalid room ID: %s", id) } team_id := mm.getTeamIdByName(x[1]) if team_id == "" { return "", fmt.Errorf("Team not found: %s", id) } ch_id := mm.conn.GetChannelId(x[0], team_id) if ch_id == "" { return "", fmt.Errorf("Channel not found: %s", id) } return ch_id, nil } func (mm *Mattermost) reverseRoomId(id string) (bool, RoomID) { team := mm.conn.GetChannelTeamId(id) if team == "" { return true, RoomID(fmt.Sprintf("%s@%s", id, mm.server)) } else { teamName := mm.conn.GetTeamName(team) if u, ok := mm.teams[teamName]; ok && u { name := mm.conn.GetChannelName(id) return true, RoomID(fmt.Sprintf("%s@%s@%s", name, teamName, mm.server)) } else { return false, "" } } } func (mm *Mattermost) checkUserId(id UserID) (string, error) { x := strings.Split(string(id), "@") if len(x) == 1 { return "", fmt.Errorf("Please write whole user ID with server: %s@%s", id, mm.server) } if len(x) != 2 || x[1] != mm.server { return "", fmt.Errorf("Invalid user ID: %s", id) } mm.caches.Lock() defer mm.caches.Unlock() if user_id, ok := mm.caches.mmusers[x[0]]; ok { return user_id, nil } u, resp := mm.conn.Client.GetUserByUsername(x[0], "") if u == nil || resp.Error != nil { return "", fmt.Errorf("Not found: %s (%s)", x[0], resp.Error) } mm.caches.mmusers[x[0]] = u.Id return u.Id, nil } func (mm *Mattermost) SetUserInfo(info *UserInfo) error { return fmt.Errorf("Not implemented") } func (mm *Mattermost) SetRoomInfo(roomId RoomID, info *RoomInfo) error { ch, err := mm.checkRoomId(roomId) if err != nil { return err } if info.Topic != "" { mm.conn.UpdateChannelHeader(ch, info.Topic) } if info.Picture.MediaObject != nil { err = fmt.Errorf("Not supported: channel picture on mattermost") } if info.Name != "" { err = fmt.Errorf("Not supported: channel name on mattermost") } return err } func (mm *Mattermost) Join(roomId RoomID) error { ch, err := mm.checkRoomId(roomId) if err != nil { return err } return mm.conn.JoinChannel(ch) } func (mm *Mattermost) Invite(userId UserID, roomId RoomID) error { if roomId == "" { _, err := mm.checkUserId(userId) return err } return fmt.Errorf("Not supported: invite on mattermost") } func (mm *Mattermost) Leave(roomId RoomID) { // Not supported? TODO } func (mm *Mattermost) Send(event *Event) (string, error) { post := &model.Post{ Message: event.Text, } if event.Type == EVENT_ACTION { post.Type = "me" } if event.Room != "" { ch, err := mm.checkRoomId(event.Room) if err != nil { return "", err } post.ChannelId = ch } else if event.Recipient != "" { ui, err := mm.checkUserId(event.Recipient) if err != nil { return "", err } _, resp := mm.conn.Client.CreateDirectChannel(mm.conn.User.Id, ui) if resp.Error != nil { return "", resp.Error } channelName := model.GetDMNameFromIds(ui, mm.conn.User.Id) err = mm.conn.UpdateChannels() if err != nil { return "", err } post.ChannelId = mm.conn.GetChannelId(channelName, "") } else { return "", fmt.Errorf("Invalid target") } if event.Attachments != nil { post.FileIds = []string{} for _, file := range event.Attachments { rdr, err := file.Read() if err != nil { return "", err } defer rdr.Close() data, err := ioutil.ReadAll(rdr) if err != nil { return "", err } up_file, err := mm.conn.UploadFile(data, post.ChannelId, file.Filename()) if err != nil { log.Warnf("UploadFile error: %s", err) return "", err } post.FileIds = append(post.FileIds, up_file) } } created_post, resp := mm.conn.Client.CreatePost(post) if resp.Error != nil { log.Warnf("CreatePost error: %s", resp.Error) return "", resp.Error } return created_post.Id, nil } func (mm *Mattermost) Close() { if mm.conn != nil { mm.conn.WsQuit = true } if mm.handlerStopChan != nil { mm.handlerStopChan <- true mm.handlerStopChan = nil } } func (mm *Mattermost) handleConnected() { log.Debugf("(Re-)connected to mattermost: %s@%s ; doing channel sync\n", mm.username, mm.server) // Initial channel sync chans := mm.conn.GetChannels() doneCh := make(map[string]bool) for _, ch := range chans { if _, ok := doneCh[ch.Id]; !ok { doneCh[ch.Id] = true go mm.syncChannel(ch) } } } func (mm *Mattermost) syncChannel(ch *model.Channel) { // The first time we see a chan, sync everything (member list, names, profile pictures) must_initsync := func() bool { mm.caches.Lock() defer mm.caches.Unlock() if x, ok := mm.caches.initsynced[ch.Id]; ok && x { return false } mm.caches.initsynced[ch.Id] = true return true }() if must_initsync { mm.initSyncChannel(ch) } // The following times, only sync missing messages mm.backlogChannel(ch) } func (mm *Mattermost) initSyncChannel(ch *model.Channel) { if len(strings.Split(ch.Name, "__")) == 2 { // DM channel // Update remote user info users := strings.Split(ch.Name, "__") for _, uid := range users { if uid != mm.conn.User.Id { user := mm.conn.GetUser(uid) if user != nil { mm.updateUserInfo(user) } } } } else { interested, id := mm.reverseRoomId(ch.Id) if !interested { // Skip channels that are not in teams we want to bridge return } mm.handler.Joined(id) // Update room info room_info := &RoomInfo{ Name: ch.DisplayName, Topic: ch.Header, } for _, t := range mm.conn.OtherTeams { if t.Id == ch.TeamId { if t.Team.DisplayName != "" { room_info.Name = t.Team.DisplayName + " / " + room_info.Name } else { room_info.Name = t.Team.Name + " / " + room_info.Name } if t.Team.LastTeamIconUpdate > 0 { room_info.Picture = SMediaObject{&LazyBlobMediaObject{ ObjectFilename: fmt.Sprintf("%s-%d", t.Team.Name, t.Team.LastTeamIconUpdate), GetFn: func(o *LazyBlobMediaObject) error { team_img, resp := mm.conn.Client.GetTeamIcon(t.Id, "") if resp.Error != nil { log.Warnf("Could not get team image: %s", resp.Error.Error()) return resp.Error } o.ObjectData = team_img o.ObjectMimetype = http.DetectContentType(team_img) return nil }, }} } break } } mm.handler.RoomInfoUpdated(id, UserID(""), room_info) // Update member list // TODO (when this will be slow, i.e. hundreds of members): do only a diff members, resp := mm.conn.Client.GetChannelMembers(ch.Id, 0, mm.initial_members, "") if resp.Error == nil { for _, mem := range *members { if mem.UserId == mm.conn.User.Id { continue } user := mm.conn.GetUser(mem.UserId) if user != nil { mm.ensureJoined(user, id) mm.updateUserInfo(user) } else { log.Warnf("Could not find joined user: %s", mem.UserId) } } } else { log.Warnf("Could not get channel members: %s", resp.Error.Error()) } } } func (mm *Mattermost) backlogChannel(ch *model.Channel) { // Read backlog last_seen_post := mm.handler.CacheGet(fmt.Sprintf("last_seen_%s", ch.Id)) if last_seen_post != "" { const NUM_PER_PAGE = 100 page := 0 backlogs := []*model.PostList{} for { backlog, resp := mm.conn.Client.GetPostsAfter(ch.Id, last_seen_post, page, NUM_PER_PAGE, "") if resp.Error == nil { backlogs = append(backlogs, backlog) if len(backlog.Order) == NUM_PER_PAGE { page += 1 } else { break } } else { break } } for i := 0; i < len(backlogs); i++ { mm.processBacklog(ch, backlogs[i]) } } else { backlog, resp := mm.conn.Client.GetPostsForChannel(ch.Id, 0, mm.initial_backlog, "") if resp.Error == nil { mm.processBacklog(ch, backlog) } else { log.Warnf("Could not get channel backlog: %s", resp.Error) } } } func (mm *Mattermost) processBacklog(ch *model.Channel, backlog *model.PostList) { for i := 0; i < len(backlog.Order); i++ { post_id := backlog.Order[len(backlog.Order)-i-1] post := backlog.Posts[post_id] post_time := time.Unix(post.CreateAt/1000, 0) post.Message = fmt.Sprintf("[%s] %s", post_time.Format("2006-01-02 15:04 MST"), post.Message) mm.handlePost(ch.Name, post, true) } } func (mm *Mattermost) handleLoop(msgCh chan *matterclient.Message, quitCh chan bool) { for { select { case <-quitCh: break case msg := <-msgCh: log.Tracef("Mattermost: %#v\n", msg) log.Tracef("Mattermost raw: %#v\n", msg.Raw) err := mm.handlePosted(msg.Raw) if err != nil { log.Warnf("Mattermost error: %s", err) } } } } func (mm *Mattermost) updateUserInfo(user *model.User) { userId := UserID(fmt.Sprintf("%s@%s", user.Username, mm.server)) userDisp := user.GetDisplayName(model.SHOW_NICKNAME_FULLNAME) mm.caches.Lock() defer mm.caches.Unlock() if lastdn, ok := mm.caches.displayname[userId]; !ok || lastdn != userDisp { ui := &UserInfo{ DisplayName: userDisp, } if user.LastPictureUpdate > 0 { ui.Avatar = SMediaObject{&LazyBlobMediaObject{ ObjectFilename: fmt.Sprintf("%s-%d", user.Username, user.LastPictureUpdate), GetFn: func(o *LazyBlobMediaObject) error { img, resp := mm.conn.Client.GetProfileImage(user.Id, "") if resp.Error != nil { log.Warnf("Could not get profile picture: %s", resp.Error.Error()) return resp.Error } o.ObjectData = img o.ObjectMimetype = http.DetectContentType(img) return nil }, }} } mm.handler.UserInfoUpdated(userId, ui) mm.caches.displayname[userId] = userDisp } } func (mm *Mattermost) ensureJoined(user *model.User, roomId RoomID) { userId := UserID(fmt.Sprintf("%s@%s", user.Username, mm.server)) cache_key := fmt.Sprintf("%s / %s", userId, roomId) mm.caches.Lock() defer mm.caches.Unlock() if _, ok := mm.caches.sentjoined[cache_key]; !ok { mm.handler.Event(&Event{ Author: userId, Room: roomId, Type: EVENT_JOIN, }) mm.caches.sentjoined[cache_key] = true } } func (mm *Mattermost) handlePosted(msg *model.WebSocketEvent) error { channel_name, ok := msg.Data["channel_name"].(string) if !ok { return nil } post_str := msg.Data["post"].(string) var post model.Post err := json.Unmarshal([]byte(post_str), &post) if err != nil { return err } return mm.handlePost(channel_name, &post, false) } func (mm *Mattermost) handlePost(channel_name string, post *model.Post, only_messages bool) error { // Skip self messages if post.UserId == mm.conn.User.Id { return nil } // Find sending user user := mm.conn.GetUser(post.UserId) if user == nil { return fmt.Errorf("Invalid user") } userId := UserID(fmt.Sprintf("%s@%s", user.Username, mm.server)) mm.updateUserInfo(user) // Build message event msg_ev := &Event{ Id: post.Id, Author: userId, Text: post.Message, Type: EVENT_MESSAGE, } if post.Type == "me" { msg_ev.Type = EVENT_ACTION } // Handle files if post.FileIds != nil && len(post.FileIds) > 0 { msg_ev.Attachments = []SMediaObject{} for _, file := range post.Metadata.Files { media_object := &LazyBlobMediaObject{ ObjectFilename: file.Name, ObjectMimetype: file.MimeType, GetFn: func(o *LazyBlobMediaObject) error { blob, resp := mm.conn.Client.GetFile(file.Id) if resp.Error != nil { return resp.Error } o.ObjectData = blob return nil }, } if file.Width > 0 { media_object.ObjectImageSize = &ImageSize{ Width: file.Width, Height: file.Height, } } msg_ev.Attachments = append(msg_ev.Attachments, SMediaObject{media_object}) } } // Dispatch as PM or as room message if len(strings.Split(channel_name, "__")) == 2 { // Private message, no need to find room id if user.Id == mm.conn.User.Id { // Skip self sent messages return nil } mm.handler.Event(msg_ev) mm.handler.CachePut(fmt.Sprintf("last_seen_%s", post.ChannelId), post.Id) } else { interested, roomId := mm.reverseRoomId(post.ChannelId) if !interested { return nil } if roomId == "" { return fmt.Errorf("Invalid channel id") } mm.ensureJoined(user, roomId) if post.Type == "system_header_change" { if !only_messages { new_header := post.Props["new_header"].(string) mm.handler.RoomInfoUpdated(roomId, userId, &RoomInfo{ Topic: new_header, }) } } else if post.Type == "" || post.Type == "me" { msg_ev.Room = roomId mm.handler.Event(msg_ev) mm.handler.CachePut(fmt.Sprintf("last_seen_%s", post.ChannelId), post.Id) } else { return fmt.Errorf("Unhandled post type: %s", post.Type) } } return nil }