diff --git a/account.go b/account.go index 732ff77..8365275 100644 --- a/account.go +++ b/account.go @@ -418,10 +418,17 @@ func (a *Account) eventInternal(event *Event) error { } } + var cache_key string if event.Id != "" { - cache_key := fmt.Sprintf("%s/event_seen/%s/%s", + // If the event has an ID, make sure it is processed only once + cache_key = fmt.Sprintf("%s/event_seen/%s/%s", a.Protocol, mx_room_id, event.Id) - if !dbKvTestAndSet(cache_key, "yes") { + slot_key := dbKvSlotKey(cache_key) + + dbLockSlot(slot_key) + defer dbUnlockSlot(slot_key) + + if dbKvGet(cache_key) == "yes" { // false: cache key was not modified, meaning we // already saw the event return nil @@ -470,6 +477,12 @@ func (a *Account) eventInternal(event *Event) error { } } } + + // Mark message as received in db + if cache_key != "" { + dbKvPutLocked(cache_key, "yes") + } + return nil } } diff --git a/connector/connector.go b/connector/connector.go index 38ce828..e8c382d 100644 --- a/connector/connector.go +++ b/connector/connector.go @@ -59,8 +59,14 @@ type Connector interface { // Leave a channel Leave(roomId RoomID) - // Send an event - Send(event *Event) error + // Send an event. Returns the ID of the created remote message. + // This ID is used to deduplicate messages: if it comes back, it should have the same Id + // than the one returned here. + // For backends that do not implement IDs (e.g. IRC), an empty string is returned. + // (FIXME how to deduplicate IRC messages?) + // The event that is fed in this function may have its ID already set, + // in which case the backend is free to re-use the ID or select a new one. + Send(event *Event) (string, error) // Close the connection Close() diff --git a/connector/irc/irc.go b/connector/irc/irc.go index dafe9db..4e5f4fd 100644 --- a/connector/irc/irc.go +++ b/connector/irc/irc.go @@ -198,9 +198,9 @@ func (irc *IRC) Leave(roomId RoomID) { irc.conn.Cmd.Part(ch) } -func (irc *IRC) Send(event *Event) error { +func (irc *IRC) Send(event *Event) (string, error) { if irc.conn == nil { - return fmt.Errorf("Not connected") + return "", fmt.Errorf("Not connected") } // Workaround girc bug @@ -212,17 +212,17 @@ func (irc *IRC) Send(event *Event) error { if event.Room != "" { ch, err := irc.checkRoomId(event.Room) if err != nil { - return err + return "", err } dest = ch } else if event.Recipient != "" { ui, err := irc.checkUserId(event.Recipient) if err != nil { - return err + return "", err } dest = ui } else { - return fmt.Errorf("Invalid target") + return "", fmt.Errorf("Invalid target") } if event.Attachments != nil && len(event.Attachments) > 0 { @@ -230,7 +230,7 @@ func (irc *IRC) Send(event *Event) error { url := at.URL() if url == "" { // TODO find a way to send them using some hosing of some kind - return fmt.Errorf("Attachment without URL sent to IRC") + return "", fmt.Errorf("Attachment without URL sent to IRC") } else { irc.conn.Cmd.Message(dest, fmt.Sprintf("%s (%s, %dkb)", url, at.Mimetype(), at.Size()/1024)) @@ -243,9 +243,9 @@ func (irc *IRC) Send(event *Event) error { } else if event.Type == EVENT_ACTION { irc.conn.Cmd.Action(dest, event.Text) } else { - return fmt.Errorf("Invalid event type") + return "", fmt.Errorf("Invalid event type") } - return nil + return "", nil } func (irc *IRC) Close() { diff --git a/connector/mattermost/mattermost.go b/connector/mattermost/mattermost.go index 12ac604..52eb40f 100644 --- a/connector/mattermost/mattermost.go +++ b/connector/mattermost/mattermost.go @@ -243,7 +243,7 @@ func (mm *Mattermost) Leave(roomId RoomID) { // Not supported? TODO } -func (mm *Mattermost) Send(event *Event) error { +func (mm *Mattermost) Send(event *Event) (string, error) { post := &model.Post{ Message: event.Text, } @@ -254,29 +254,29 @@ func (mm *Mattermost) Send(event *Event) error { if event.Room != "" { ch, err := mm.checkRoomId(event.Room) if err != nil { - return err + return "", err } post.ChannelId = ch } else if event.Recipient != "" { ui, err := mm.checkUserId(event.Recipient) if err != nil { - return err + return "", err } _, resp := mm.conn.Client.CreateDirectChannel(mm.conn.User.Id, ui) if resp.Error != nil { - return resp.Error + return "", resp.Error } channelName := model.GetDMNameFromIds(ui, mm.conn.User.Id) err = mm.conn.UpdateChannels() if err != nil { - return err + return "", err } post.ChannelId = mm.conn.GetChannelId(channelName, "") } else { - return fmt.Errorf("Invalid target") + return "", fmt.Errorf("Invalid target") } if event.Attachments != nil { @@ -284,28 +284,28 @@ func (mm *Mattermost) Send(event *Event) error { for _, file := range event.Attachments { rdr, err := file.Read() if err != nil { - return err + return "", err } defer rdr.Close() data, err := ioutil.ReadAll(rdr) if err != nil { - return err + return "", err } up_file, err := mm.conn.UploadFile(data, post.ChannelId, file.Filename()) if err != nil { log.Warnf("UploadFile error: %s", err) - return err + return "", err } post.FileIds = append(post.FileIds, up_file) } } - _, resp := mm.conn.Client.CreatePost(post) + created_post, resp := mm.conn.Client.CreatePost(post) if resp.Error != nil { log.Warnf("CreatePost error: %s", resp.Error) - return resp.Error + return "", resp.Error } - return nil + return created_post.Id, nil } func (mm *Mattermost) Close() { diff --git a/connector/xmpp/xmpp.go b/connector/xmpp/xmpp.go index dcf1db6..efaaf64 100644 --- a/connector/xmpp/xmpp.go +++ b/connector/xmpp/xmpp.go @@ -306,13 +306,13 @@ func (xm *XMPP) Leave(roomId RoomID) { xm.conn.LeaveMUC(string(roomId)) } -func (xm *XMPP) Send(event *Event) error { +func (xm *XMPP) Send(event *Event) (string, error) { if event.Attachments != nil && len(event.Attachments) > 0 { for _, at := range event.Attachments { url := at.URL() if url == "" { // TODO find a way to send them using some hosing of some kind - return fmt.Errorf("Attachment without URL sent to XMPP") + return "", fmt.Errorf("Attachment without URL sent to XMPP") } else { event.Text += fmt.Sprintf("\n%s (%s, %dkb)", url, at.Mimetype(), at.Size()/1024) @@ -320,6 +320,10 @@ func (xm *XMPP) Send(event *Event) error { } } + if event.Id == "" { + event.Id = xid.New().String() + } + log.Tracef("xm *XMPP Send %#v\n", event) if len(event.Recipient) > 0 { _, err := xm.conn.Send(gxmpp.Chat{ @@ -327,20 +331,17 @@ func (xm *XMPP) Send(event *Event) error { Remote: string(event.Recipient), Text: event.Text, }) - return err + return event.Id, err } else if len(event.Room) > 0 { - if event.Id == "" { - event.Id = xid.New().String() - } _, err := xm.conn.Send(gxmpp.Chat{ Type: "groupchat", Remote: string(event.Room), Text: event.Text, ID: event.Id, }) - return err + return event.Id, err } else { - return fmt.Errorf("Invalid event") + return "", fmt.Errorf("Invalid event") } } diff --git a/db.go b/db.go index f9bed06..cdbdca1 100644 --- a/db.go +++ b/db.go @@ -163,6 +163,14 @@ func dbKvPut(key string, value string) { dbLockSlot(slot_key) defer dbUnlockSlot(slot_key) + dbKvPutLocked(key, value) +} + +// Variant of dbKvPut that does not take a lock, +// use this if the slot is already locked +func dbKvPutLocked(key string, value string) { + slot_key := dbKvSlotKey(key) + var entry DbKv db.Where(&DbKv{Key: key}).Assign(&DbKv{Value: value}).FirstOrCreate(&entry) dbCache.Add(slot_key, value) @@ -179,9 +187,7 @@ func dbKvTestAndSet(key string, value string) bool { return false } - var entry DbKv - db.Where(&DbKv{Key: key}).Assign(&DbKv{Value: value}).FirstOrCreate(&entry) - dbCache.Add(slot_key, value) + dbKvPutLocked(key, value) return true } diff --git a/server.go b/server.go index 241a977..dc04006 100644 --- a/server.go +++ b/server.go @@ -145,18 +145,22 @@ func handleTxnEvent(e *mxlib.Event) error { } else if e.Sender == pm_room.MxUserID { ev.Author = acct.Conn.User() ev.Recipient = pm_room.UserID - return acct.Conn.Send(ev) + _, err := acct.Conn.Send(ev) + return err } } else if room := dbIsPublicRoom(e.RoomId); room != nil { - cache_key := fmt.Sprintf("%s/event_seen/%s/%s", - room.Protocol, e.RoomId, ev.Id) - dbKvPut(cache_key, "yes") // If this is a regular room acct := FindJoinedAccount(e.Sender, room.Protocol, room.RoomID) if acct != nil { ev.Author = acct.Conn.User() ev.Room = room.RoomID - return acct.Conn.Send(ev) + created_ev_id, err := acct.Conn.Send(ev) + if err == nil && created_ev_id != "" { + cache_key := fmt.Sprintf("%s/event_seen/%s/%s", + room.Protocol, e.RoomId, created_ev_id) + dbKvPut(cache_key, "yes") + } + return err } else { mx.RoomKick(e.RoomId, e.Sender, fmt.Sprintf("Not present in %s on %s, please talk with Easybridge to rejoin", room.RoomID, room.Protocol)) return fmt.Errorf("not joined %s on %s", room.RoomID, room.Protocol)