Phase 2: IMAP Client & Background Worker (Version: 2026-04.2)
- IMAP client package with TLS/STARTTLS connection management, unseen fetching, batch fetching by UID range, message move/copy operations, folder management - Background worker with steady-state (unseen polling) and catch-up (UID range backlog) modes, per-user IMAP connections, test mode support, placeholder AI classifier - Database: GetUsersWithAutoStart() for worker user discovery - Wired worker into main.go with graceful shutdown
This commit is contained in:
parent
065129493d
commit
8bb9ff067b
12 changed files with 1657 additions and 44 deletions
|
|
@ -5,7 +5,33 @@ All notable changes to inBOXER will be documented in this file.
|
|||
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
|
||||
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
|
||||
|
||||
## [2026-04.1] - 2026-04-23
|
||||
## [2026-04.2] - 2026-04-23
|
||||
|
||||
### Added
|
||||
- IMAP client package (`src/internal/imap/`):
|
||||
- Secure connection management (TLS & STARTTLS) with auto-reconnect
|
||||
- Fetch unseen messages with envelope and body snippet extraction
|
||||
- Batch fetching by UID range for catch-up processing
|
||||
- Message move/copy operations (RFC 6851 UID MOVE)
|
||||
- Ensure folder existence before operations
|
||||
- Unit tests covering error paths (connection failures, not-connected scenarios)
|
||||
- Background worker package (`src/internal/worker/`):
|
||||
- Steady-state mode: polls unseen emails at configurable interval
|
||||
- Catch-up mode: processes backlog in batches with cooldown
|
||||
- Per-user processing with isolated IMAP connections
|
||||
- Test mode support (logs without moving emails)
|
||||
- AI classifier interface (placeholder routes everything to "Other")
|
||||
- Graceful shutdown with signal handling
|
||||
- Unit tests for classifier and lifecycle
|
||||
- Database: `GetUsersWithAutoStart()` method for worker user discovery
|
||||
|
||||
### Changed
|
||||
- Replaced placeholder background worker with full IMAP-driven implementation
|
||||
- Worker now creates target folders automatically on connect
|
||||
- Email processing respects per-user poll interval and batch size
|
||||
|
||||
### Fixed
|
||||
- N/A (initial release)
|
||||
|
||||
### Added
|
||||
- Initial repository structure per `PROJECT_PLAN.md`
|
||||
|
|
|
|||
29
go.mod
29
go.mod
|
|
@ -1,20 +1,25 @@
|
|||
module inboxer
|
||||
|
||||
go 1.23
|
||||
go 1.22
|
||||
|
||||
require (
|
||||
github.com/emersion/go-imap v1.2.1
|
||||
github.com/emersion/go-sasl v0.0.0-20200509203442-7bfe0ed36a21
|
||||
github.com/gorilla/mux v1.8.1
|
||||
github.com/gorilla/securecookie v1.1.2
|
||||
github.com/gorilla/sessions v1.4.0
|
||||
github.com/jinzhu/inflection v1.0.0
|
||||
github.com/jinzhu/now v1.1.5
|
||||
github.com/gorilla/sessions v1.2.0
|
||||
github.com/joho/godotenv v1.5.1
|
||||
github.com/mattn/go-sqlite3 v1.14.22
|
||||
golang.org/x/crypto v0.23.0
|
||||
golang.org/x/text v0.20.0
|
||||
golang.org/x/crypto v0.17.0
|
||||
gopkg.in/yaml.v3 v3.0.1
|
||||
gorm.io/driver/sqlite v1.6.0
|
||||
gorm.io/gorm v1.31.1
|
||||
gorm.io/driver/sqlite v1.5.4
|
||||
gorm.io/gorm v1.25.6
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/emersion/go-imap v1.2.1 // indirect
|
||||
github.com/emersion/go-message v0.15.0 // indirect
|
||||
github.com/emersion/go-sasl v0.0.0-20200509203442-7bfe0ed36a21 // indirect
|
||||
github.com/emersion/go-textwrapper v0.0.0-20200911093747-65d896831594 // indirect
|
||||
github.com/gorilla/securecookie v1.1.2 // indirect
|
||||
github.com/jinzhu/inflection v1.0.0 // indirect
|
||||
github.com/jinzhu/now v1.1.5 // indirect
|
||||
github.com/mattn/go-sqlite3 v1.14.22 // indirect
|
||||
golang.org/x/text v0.14.0 // indirect
|
||||
)
|
||||
|
|
|
|||
28
go.sum
28
go.sum
|
|
@ -1,15 +1,20 @@
|
|||
github.com/emersion/go-imap v1.2.1 h1:+s9ZjMEjOB8NzZMVTM3cCenz2JrQIGGo5j1df19WjTA=
|
||||
github.com/emersion/go-imap v1.2.1/go.mod h1:Qlx1FSx2FTxjnjWpIlVNEuX+ylerZQNFE5NsmKFSejY=
|
||||
github.com/emersion/go-message v0.15.0 h1:urgKGqt2JAc9NFJcgncQcohHdiYb803YTH9OQwHBHIY=
|
||||
github.com/emersion/go-message v0.15.0/go.mod h1:wQUEfE+38+7EW8p8aZ96ptg6bAb1iwdgej19uXASlE4=
|
||||
github.com/emersion/go-sasl v0.0.0-20200509203442-7bfe0ed36a21 h1:OJyUGMJTzHTd1XQp98QTaHernxMYzRaOasRir9hUlFQ=
|
||||
github.com/emersion/go-sasl v0.0.0-20200509203442-7bfe0ed36a21/go.mod h1:iL2twTeMvZnrg54ZoPDNfJaJaqy0xIQFuBdrLsmspwQ=
|
||||
github.com/emersion/go-textwrapper v0.0.0-20200911093747-65d896831594 h1:IbFBtwoTQyw0fIM5xv1HF+Y+3ZijDR839WMulgxCcUY=
|
||||
github.com/emersion/go-textwrapper v0.0.0-20200911093747-65d896831594/go.mod h1:aqO8z8wPrjkscevZJFVE1wXJrLpC5LtJG7fqLOsPb2U=
|
||||
github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0=
|
||||
github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
|
||||
github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY=
|
||||
github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ=
|
||||
github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4=
|
||||
github.com/gorilla/securecookie v1.1.2 h1:YCIWL56dvtr73r6715mJs5ZvhtnY73hBvEF8kXD8ePA=
|
||||
github.com/gorilla/securecookie v1.1.2/go.mod h1:NfCASbcHqRSY+3a8tlWJwsQap2VX5pwzwo4h3eOamfo=
|
||||
github.com/gorilla/sessions v1.4.0 h1:kpIYOp/oi6MG/p5PgxApU8srsSw9tuFbt46Lt7auzqQ=
|
||||
github.com/gorilla/sessions v1.4.0/go.mod h1:FLWm50oby91+hl7p/wRxDth9bWSuk0qVL2emc7lT5ik=
|
||||
github.com/gorilla/sessions v1.2.0 h1:S7P+1Hm5V/AT9cjEcUD5uDaQSX0OE577aCXgoaKpYbQ=
|
||||
github.com/gorilla/sessions v1.2.0/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM=
|
||||
github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E=
|
||||
github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
|
||||
github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ=
|
||||
|
|
@ -18,13 +23,18 @@ github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
|
|||
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
|
||||
github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU=
|
||||
github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
|
||||
golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k=
|
||||
golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
|
||||
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk=
|
||||
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
|
||||
golang.org/x/text v0.20.0 h1:gK/Kv2otX8gz+wn7Rmb3vT96ZwuoxnQlY+HlJVj7Qug=
|
||||
golang.org/x/text v0.20.0/go.mod h1:D4IsuqiFMhST5bX19pQ9ikHC2GsaKyk/oF+pn3ducp4=
|
||||
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
|
||||
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
|
||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
gorm.io/driver/sqlite v1.6.0 h1:WHRRrIiulaPiPFmDcod6prc4l2VGVWHz80KspNsxSfQ=
|
||||
gorm.io/driver/sqlite v1.6.0/go.mod h1:AO9V1qIQddBESngQUKWL9yoH93HIeA1X6V633rBwyT8=
|
||||
gorm.io/gorm v1.31.1 h1:7CA8FTFz/gRfgqgpeKIBcervUn3xSyPUmr6B2WXJ7kg=
|
||||
gorm.io/gorm v1.31.1/go.mod h1:XyQVbO2k6YkOis7C2437jSit3SsDK72s7n7rsSHd+Gs=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
gorm.io/driver/sqlite v1.5.4 h1:IqXwXi8M/ZlPzH/947tn5uik3aYQslP9BVveoax0nV0=
|
||||
gorm.io/driver/sqlite v1.5.4/go.mod h1:qxAuCol+2r6PannQDpOP1FP6ag3mKi4esLnB/jHed+4=
|
||||
gorm.io/gorm v1.25.6 h1:V92+vVda1wEISSOMtodHVRcUIOPYa2tgQtyF+DfFx+A=
|
||||
gorm.io/gorm v1.25.6/go.mod h1:hbnx/Oo0ChWMn1BIhpy1oYozzpM15i4YPuHDmfYtwg8=
|
||||
|
|
|
|||
|
|
@ -13,6 +13,7 @@ import (
|
|||
"inboxer/src/internal/auth"
|
||||
"inboxer/src/internal/db"
|
||||
"inboxer/src/internal/web"
|
||||
"inboxer/src/internal/worker"
|
||||
"inboxer/src/pkg/config"
|
||||
"github.com/gorilla/mux"
|
||||
)
|
||||
|
|
@ -89,8 +90,10 @@ func main() {
|
|||
IdleTimeout: 60 * time.Second,
|
||||
}
|
||||
|
||||
// Start background worker (placeholder)
|
||||
go startBackgroundWorker(database, authService, cfg)
|
||||
// Start background worker
|
||||
classifier := worker.NewPlaceholderClassifier(cfg.Folders.Other)
|
||||
bgWorker := worker.NewWorker(database, cfg, classifier)
|
||||
bgWorker.Start()
|
||||
|
||||
// Start server in goroutine
|
||||
go func() {
|
||||
|
|
@ -106,7 +109,10 @@ func main() {
|
|||
signal.Notify(stop, os.Interrupt, syscall.SIGTERM)
|
||||
<-stop
|
||||
|
||||
log.Println("Shutting down server...")
|
||||
log.Println("Shutting down...")
|
||||
|
||||
// Stop background worker first
|
||||
bgWorker.Stop()
|
||||
|
||||
// Give server time to shutdown gracefully
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
|
|
@ -119,25 +125,6 @@ func main() {
|
|||
log.Println("Server stopped")
|
||||
}
|
||||
|
||||
// startBackgroundWorker starts the email processing worker
|
||||
func startBackgroundWorker(database *db.Database, authService *auth.AuthService, cfg *config.Config) {
|
||||
log.Println("Background worker started (placeholder)")
|
||||
|
||||
// TODO: Implement actual worker logic
|
||||
// - Check for users with auto-start enabled
|
||||
// - Process emails in batches
|
||||
// - Respect poll intervals
|
||||
// - Handle catch-up mode
|
||||
|
||||
// For now, just log periodically to show it's running
|
||||
ticker := time.NewTicker(5 * time.Minute)
|
||||
defer ticker.Stop()
|
||||
|
||||
for range ticker.C {
|
||||
log.Println("Background worker tick")
|
||||
}
|
||||
}
|
||||
|
||||
// loggingMiddleware logs HTTP requests
|
||||
func loggingMiddleware(next http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
|
|
|
|||
|
|
@ -118,6 +118,26 @@ func (d *Database) ClearUserOTP(email string) error {
|
|||
|
||||
// MailboxSettings operations
|
||||
|
||||
// GetUsersWithAutoStart returns all mailbox settings with AutoStart enabled,
|
||||
// with IMAP passwords decrypted for immediate use
|
||||
func (d *Database) GetUsersWithAutoStart() ([]MailboxSettings, error) {
|
||||
var settings []MailboxSettings
|
||||
result := d.DB.Where("auto_start = ?", true).Find(&settings)
|
||||
if result.Error != nil {
|
||||
return nil, fmt.Errorf("failed to query auto-start settings: %w", result.Error)
|
||||
}
|
||||
|
||||
for i := range settings {
|
||||
decrypted, err := d.decrypt(settings[i].IMAPPassEncrypted)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to decrypt IMAP password for user %d: %w", settings[i].UserID, err)
|
||||
}
|
||||
settings[i].IMAPPassEncrypted = decrypted
|
||||
}
|
||||
|
||||
return settings, nil
|
||||
}
|
||||
|
||||
// GetMailboxSettings retrieves mailbox settings for a user
|
||||
func (d *Database) GetMailboxSettings(userID uint) (*MailboxSettings, error) {
|
||||
var settings MailboxSettings
|
||||
|
|
|
|||
331
src/internal/imap/fetcher.go
Normal file
331
src/internal/imap/fetcher.go
Normal file
|
|
@ -0,0 +1,331 @@
|
|||
package imap
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"time"
|
||||
|
||||
"github.com/emersion/go-imap"
|
||||
"github.com/emersion/go-imap/client"
|
||||
)
|
||||
|
||||
// EmailSummary holds the key fields for classification
|
||||
type EmailSummary struct {
|
||||
UID uint32
|
||||
Subject string
|
||||
From string
|
||||
Date time.Time
|
||||
MessageID string
|
||||
Snippet string // first ~200 chars of body text
|
||||
}
|
||||
|
||||
// fetchItems returns the common FetchItems for fetching email metadata + body snippet
|
||||
func fetchItems() []imap.FetchItem {
|
||||
bodySection := &imap.BodySectionName{Peek: true}
|
||||
return []imap.FetchItem{
|
||||
imap.FetchEnvelope,
|
||||
imap.FetchInternalDate,
|
||||
imap.FetchFlags,
|
||||
bodySection.FetchItem(),
|
||||
}
|
||||
}
|
||||
|
||||
// bodySections returns a list of body section names to peek at body content
|
||||
func bodySection() *imap.BodySectionName {
|
||||
return &imap.BodySectionName{Peek: true}
|
||||
}
|
||||
|
||||
// buildEmailSummary converts an imap.Message to EmailSummary
|
||||
func buildEmailSummary(msg *imap.Message) EmailSummary {
|
||||
summary := EmailSummary{
|
||||
UID: msg.Uid,
|
||||
Date: msg.InternalDate,
|
||||
MessageID: msg.Envelope.MessageId,
|
||||
Subject: msg.Envelope.Subject,
|
||||
}
|
||||
|
||||
if len(msg.Envelope.From) > 0 {
|
||||
summary.From = msg.Envelope.From[0].Address()
|
||||
}
|
||||
|
||||
// Extract body snippet from first available body section
|
||||
for _, literal := range msg.Body {
|
||||
if literal != nil {
|
||||
data, err := io.ReadAll(literal)
|
||||
if err == nil && len(data) > 0 {
|
||||
if len(data) > 200 {
|
||||
summary.Snippet = string(data[:200])
|
||||
} else {
|
||||
summary.Snippet = string(data)
|
||||
}
|
||||
}
|
||||
break // first body section only
|
||||
}
|
||||
}
|
||||
|
||||
return summary
|
||||
}
|
||||
|
||||
// FetchUnseen fetches unseen messages from the selected mailbox
|
||||
// Returns the mailbox status and a slice of email summaries
|
||||
func (c *Client) FetchUnseen(mailbox string) (*imap.MailboxStatus, []EmailSummary, error) {
|
||||
var (
|
||||
emails []EmailSummary
|
||||
mbox *imap.MailboxStatus
|
||||
)
|
||||
|
||||
err := c.Execute(func(cl *client.Client) error {
|
||||
var err error
|
||||
mbox, err = cl.Select(mailbox, true)
|
||||
if err != nil {
|
||||
return fmt.Errorf("select %q: %w", mailbox, err)
|
||||
}
|
||||
|
||||
// Search for unseen messages
|
||||
criteria := imap.NewSearchCriteria()
|
||||
criteria.WithoutFlags = []string{imap.SeenFlag}
|
||||
|
||||
uids, err := cl.UidSearch(criteria)
|
||||
if err != nil {
|
||||
return fmt.Errorf("uid search: %w", err)
|
||||
}
|
||||
|
||||
if len(uids) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Fetch envelope + body snippet for each unseen message
|
||||
seqset := new(imap.SeqSet)
|
||||
seqset.AddNum(uids...)
|
||||
|
||||
messages := make(chan *imap.Message, 10)
|
||||
done := make(chan error, 1)
|
||||
|
||||
go func() {
|
||||
done <- cl.UidFetch(seqset, fetchItems(), messages)
|
||||
}()
|
||||
|
||||
for msg := range messages {
|
||||
emails = append(emails, buildEmailSummary(msg))
|
||||
}
|
||||
|
||||
if err := <-done; err != nil {
|
||||
return fmt.Errorf("uid fetch: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
return mbox, emails, nil
|
||||
}
|
||||
|
||||
// FetchBatch fetches a batch of messages by UID range
|
||||
// Used for catch-up mode: from lastProcessedUID+1 to latest
|
||||
// Returns a bounded number of messages (batchSize) sorted ascending
|
||||
func (c *Client) FetchBatch(mailbox string, startUID, maxUID uint32, batchSize int) (*imap.MailboxStatus, []EmailSummary, error) {
|
||||
var (
|
||||
emails []EmailSummary
|
||||
mbox *imap.MailboxStatus
|
||||
)
|
||||
|
||||
err := c.Execute(func(cl *client.Client) error {
|
||||
var err error
|
||||
mbox, err = cl.Select(mailbox, true)
|
||||
if err != nil {
|
||||
return fmt.Errorf("select %q: %w", mailbox, err)
|
||||
}
|
||||
|
||||
// Fetch messages in UID range
|
||||
criteria := imap.NewSearchCriteria()
|
||||
criteria.Uid = new(imap.SeqSet)
|
||||
criteria.Uid.AddRange(startUID, maxUID)
|
||||
|
||||
uids, err := cl.UidSearch(criteria)
|
||||
if err != nil {
|
||||
return fmt.Errorf("uid search: %w", err)
|
||||
}
|
||||
|
||||
if len(uids) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Apply batch size limit
|
||||
batch := uids
|
||||
if len(batch) > batchSize {
|
||||
batch = batch[:batchSize]
|
||||
}
|
||||
|
||||
seqset := new(imap.SeqSet)
|
||||
seqset.AddNum(batch...)
|
||||
|
||||
messages := make(chan *imap.Message, 10)
|
||||
done := make(chan error, 1)
|
||||
|
||||
go func() {
|
||||
done <- cl.UidFetch(seqset, fetchItems(), messages)
|
||||
}()
|
||||
|
||||
for msg := range messages {
|
||||
emails = append(emails, buildEmailSummary(msg))
|
||||
}
|
||||
|
||||
return <-done
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
return mbox, emails, nil
|
||||
}
|
||||
|
||||
// SelectMailbox selects a mailbox and returns its status
|
||||
func (c *Client) SelectMailbox(name string, readOnly bool) (*imap.MailboxStatus, error) {
|
||||
var mbox *imap.MailboxStatus
|
||||
|
||||
err := c.Execute(func(cl *client.Client) error {
|
||||
var err error
|
||||
mbox, err = cl.Select(name, readOnly)
|
||||
return err
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to select mailbox %q: %w", name, err)
|
||||
}
|
||||
|
||||
return mbox, nil
|
||||
}
|
||||
|
||||
// CountUnseen returns the number of unseen messages in the mailbox
|
||||
func (c *Client) CountUnseen(mailbox string) (uint32, error) {
|
||||
var mbox *imap.MailboxStatus
|
||||
|
||||
err := c.Execute(func(cl *client.Client) error {
|
||||
var err error
|
||||
mbox, err = cl.Select(mailbox, true)
|
||||
return err
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return mbox.Unseen, nil
|
||||
}
|
||||
|
||||
// FetchByUIDs fetches specific messages by their UIDs
|
||||
func (c *Client) FetchByUIDs(mailbox string, uids []uint32) ([]EmailSummary, error) {
|
||||
var summaries []EmailSummary
|
||||
|
||||
err := c.Execute(func(cl *client.Client) error {
|
||||
_, err := cl.Select(mailbox, true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
seqset := new(imap.SeqSet)
|
||||
seqset.AddNum(uids...)
|
||||
|
||||
messages := make(chan *imap.Message, 10)
|
||||
done := make(chan error, 1)
|
||||
|
||||
go func() {
|
||||
done <- cl.UidFetch(seqset, fetchItems(), messages)
|
||||
}()
|
||||
|
||||
for msg := range messages {
|
||||
summaries = append(summaries, buildEmailSummary(msg))
|
||||
}
|
||||
|
||||
return <-done
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return summaries, nil
|
||||
}
|
||||
|
||||
// FetchBodySection fetches a specific body section for a single message
|
||||
func (c *Client) FetchBodySection(mailbox string, uid uint32, section *imap.BodySectionName) ([]byte, error) {
|
||||
var body []byte
|
||||
|
||||
err := c.Execute(func(cl *client.Client) error {
|
||||
_, err := cl.Select(mailbox, true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
seqset := new(imap.SeqSet)
|
||||
seqset.AddNum(uid)
|
||||
|
||||
messages := make(chan *imap.Message, 1)
|
||||
done := make(chan error, 1)
|
||||
|
||||
go func() {
|
||||
done <- cl.UidFetch(seqset, []imap.FetchItem{
|
||||
section.FetchItem(),
|
||||
}, messages)
|
||||
}()
|
||||
|
||||
for msg := range messages {
|
||||
for _, bs := range msg.Body {
|
||||
if bs != nil {
|
||||
body, err = io.ReadAll(bs)
|
||||
if err != nil {
|
||||
return fmt.Errorf("read body: %w", err)
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := <-done; err != nil {
|
||||
return fmt.Errorf("uid fetch body: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return body, nil
|
||||
}
|
||||
|
||||
// EnsureFolders creates folders if they don't exist
|
||||
func (c *Client) EnsureFolders(folders []string) error {
|
||||
return c.Execute(func(cl *client.Client) error {
|
||||
mailboxes := make(chan *imap.MailboxInfo, 50)
|
||||
done := make(chan error, 1)
|
||||
|
||||
go func() {
|
||||
done <- cl.List("", "*", mailboxes)
|
||||
}()
|
||||
|
||||
existing := make(map[string]bool)
|
||||
for m := range mailboxes {
|
||||
existing[m.Name] = true
|
||||
}
|
||||
|
||||
if err := <-done; err != nil {
|
||||
return fmt.Errorf("failed to list mailboxes: %w", err)
|
||||
}
|
||||
|
||||
for _, folder := range folders {
|
||||
if !existing[folder] {
|
||||
if err := cl.Create(folder); err != nil {
|
||||
return fmt.Errorf("failed to create folder %q: %w", folder, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
174
src/internal/imap/imap.go
Normal file
174
src/internal/imap/imap.go
Normal file
|
|
@ -0,0 +1,174 @@
|
|||
package imap
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/emersion/go-imap"
|
||||
"github.com/emersion/go-imap/client"
|
||||
)
|
||||
|
||||
// Config holds IMAP server connection settings
|
||||
type Config struct {
|
||||
Host string
|
||||
Port int
|
||||
User string
|
||||
Password string
|
||||
TLS bool
|
||||
}
|
||||
|
||||
// Client wraps go-imap client with connection management
|
||||
type Client struct {
|
||||
config Config
|
||||
mu sync.Mutex
|
||||
client *client.Client
|
||||
}
|
||||
|
||||
// NewClient creates a new IMAP client wrapper
|
||||
func NewClient(config Config) *Client {
|
||||
return &Client{config: config}
|
||||
}
|
||||
|
||||
// Connect establishes a connection to the IMAP server and logs in
|
||||
func (c *Client) Connect() error {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
return c.connect()
|
||||
}
|
||||
|
||||
// connect performs the actual connection (must hold mu)
|
||||
func (c *Client) connect() error {
|
||||
// Close existing connection if any
|
||||
c.close()
|
||||
|
||||
addr := fmt.Sprintf("%s:%d", c.config.Host, c.config.Port)
|
||||
|
||||
var (
|
||||
cl *client.Client
|
||||
err error
|
||||
)
|
||||
|
||||
if c.config.TLS {
|
||||
// TLS connection (port 993)
|
||||
cl, err = client.DialTLS(addr, &tls.Config{
|
||||
ServerName: c.config.Host,
|
||||
})
|
||||
} else {
|
||||
// Plain connection (port 143) then STARTTLS
|
||||
cl, err = client.Dial(addr)
|
||||
if err == nil {
|
||||
err = cl.StartTLS(&tls.Config{
|
||||
ServerName: c.config.Host,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to connect: %w", err)
|
||||
}
|
||||
|
||||
// Login
|
||||
if err := cl.Login(c.config.User, c.config.Password); err != nil {
|
||||
cl.Logout()
|
||||
return fmt.Errorf("failed to login: %w", err)
|
||||
}
|
||||
|
||||
// Set timeout
|
||||
cl.Timeout = 30 * time.Second
|
||||
|
||||
c.client = cl
|
||||
return nil
|
||||
}
|
||||
|
||||
// EnsureConnected checks if connected and reconnects if needed
|
||||
func (c *Client) EnsureConnected() error {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
if c.client == nil || c.client.State() == imap.LogoutState {
|
||||
return c.connect()
|
||||
}
|
||||
|
||||
// Test connection with NOOP
|
||||
if err := c.client.Noop(); err != nil {
|
||||
return c.connect()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// close closes the connection (must hold mu)
|
||||
func (c *Client) close() {
|
||||
if c.client != nil {
|
||||
c.client.Logout()
|
||||
c.client = nil
|
||||
}
|
||||
}
|
||||
|
||||
// Close closes the IMAP connection
|
||||
func (c *Client) Close() error {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
c.close()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Execute runs a function with a connected client, reconnecting if needed
|
||||
func (c *Client) Execute(fn func(cl *client.Client) error) error {
|
||||
c.mu.Lock()
|
||||
cl := c.client
|
||||
c.mu.Unlock()
|
||||
|
||||
if cl == nil {
|
||||
if err := c.EnsureConnected(); err != nil {
|
||||
return err
|
||||
}
|
||||
c.mu.Lock()
|
||||
cl = c.client
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
// Attempt the operation
|
||||
err := fn(cl)
|
||||
if err != nil {
|
||||
// Connection might have dropped, try to reconnect and retry once
|
||||
if connErr := c.EnsureConnected(); connErr != nil {
|
||||
return fmt.Errorf("operation failed and reconnection failed: %v (original: %v)", connErr, err)
|
||||
}
|
||||
|
||||
c.mu.Lock()
|
||||
cl = c.client
|
||||
c.mu.Unlock()
|
||||
|
||||
// Retry once
|
||||
return fn(cl)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// LoggedOut returns a channel that closes when the connection drops
|
||||
func (c *Client) LoggedOut() <-chan struct{} {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
if c.client != nil {
|
||||
return c.client.LoggedOut()
|
||||
}
|
||||
|
||||
ch := make(chan struct{})
|
||||
close(ch)
|
||||
return ch
|
||||
}
|
||||
|
||||
// IsConnected returns whether the client is connected
|
||||
func (c *Client) IsConnected() bool {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
return c.client != nil && c.client.State() != imap.LogoutState
|
||||
}
|
||||
334
src/internal/imap/imap_test.go
Normal file
334
src/internal/imap/imap_test.go
Normal file
|
|
@ -0,0 +1,334 @@
|
|||
package imap_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"inboxer/src/internal/imap"
|
||||
)
|
||||
|
||||
func TestNewClient(t *testing.T) {
|
||||
cfg := imap.Config{
|
||||
Host: "imap.example.com",
|
||||
Port: 993,
|
||||
User: "user",
|
||||
Password: "pass",
|
||||
TLS: true,
|
||||
}
|
||||
|
||||
cl := imap.NewClient(cfg)
|
||||
if cl == nil {
|
||||
t.Fatal("expected non-nil client")
|
||||
}
|
||||
}
|
||||
|
||||
func TestConfigValues(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
config imap.Config
|
||||
}{
|
||||
{
|
||||
name: "TLS config",
|
||||
config: imap.Config{
|
||||
Host: "imap.example.com",
|
||||
Port: 993,
|
||||
User: "user",
|
||||
Password: "pass",
|
||||
TLS: true,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "plain config",
|
||||
config: imap.Config{
|
||||
Host: "imap.example.com",
|
||||
Port: 143,
|
||||
User: "user",
|
||||
Password: "pass",
|
||||
TLS: false,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
c := imap.NewClient(tt.config)
|
||||
if c == nil {
|
||||
t.Fatal("NewClient returned nil")
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestConnectInvalid(t *testing.T) {
|
||||
// Test with invalid host / connection refused
|
||||
cfg := imap.Config{
|
||||
Host: "127.0.0.1",
|
||||
Port: 1, // unlikely to be open
|
||||
User: "test",
|
||||
Password: "test",
|
||||
TLS: false,
|
||||
}
|
||||
|
||||
cl := imap.NewClient(cfg)
|
||||
err := cl.Connect()
|
||||
if err == nil {
|
||||
t.Fatal("expected connection error on invalid port")
|
||||
}
|
||||
}
|
||||
|
||||
func TestEnsureConnectedNeverConnected(t *testing.T) {
|
||||
cfg := imap.Config{
|
||||
Host: "127.0.0.1",
|
||||
Port: 1,
|
||||
User: "test",
|
||||
Password: "test",
|
||||
TLS: false,
|
||||
}
|
||||
|
||||
cl := imap.NewClient(cfg)
|
||||
err := cl.EnsureConnected()
|
||||
if err == nil {
|
||||
t.Fatal("expected error on connecting to closed port")
|
||||
}
|
||||
}
|
||||
|
||||
func TestCloseIdempotent(t *testing.T) {
|
||||
cfg := imap.Config{
|
||||
Host: "imap.example.com",
|
||||
Port: 993,
|
||||
User: "test",
|
||||
Password: "test",
|
||||
TLS: true,
|
||||
}
|
||||
|
||||
cl := imap.NewClient(cfg)
|
||||
// Close when never connected should not panic
|
||||
err := cl.Close()
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error on close: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSelectMailboxNotConnected(t *testing.T) {
|
||||
cfg := imap.Config{
|
||||
Host: "imap.example.com",
|
||||
Port: 993,
|
||||
User: "test",
|
||||
Password: "test",
|
||||
TLS: true,
|
||||
}
|
||||
|
||||
cl := imap.NewClient(cfg)
|
||||
_, err := cl.SelectMailbox("INBOX", true)
|
||||
if err == nil {
|
||||
t.Fatal("expected error on select with no connection")
|
||||
}
|
||||
}
|
||||
|
||||
func TestFetchUnseenNotConnected(t *testing.T) {
|
||||
cfg := imap.Config{
|
||||
Host: "imap.example.com",
|
||||
Port: 993,
|
||||
User: "test",
|
||||
Password: "test",
|
||||
TLS: true,
|
||||
}
|
||||
|
||||
cl := imap.NewClient(cfg)
|
||||
_, _, err := cl.FetchUnseen("INBOX")
|
||||
if err == nil {
|
||||
t.Fatal("expected error on fetch with no connection")
|
||||
}
|
||||
}
|
||||
|
||||
func TestMoveMessagesNotConnected(t *testing.T) {
|
||||
cfg := imap.Config{
|
||||
Host: "imap.example.com",
|
||||
Port: 993,
|
||||
User: "test",
|
||||
Password: "test",
|
||||
TLS: true,
|
||||
}
|
||||
|
||||
cl := imap.NewClient(cfg)
|
||||
err := cl.MoveMessages("INBOX", []uint32{1}, "INBOX/Archive")
|
||||
if err == nil {
|
||||
t.Fatal("expected error on move with no connection")
|
||||
}
|
||||
}
|
||||
|
||||
func TestEnsureFoldersNoConnection(t *testing.T) {
|
||||
cfg := imap.Config{
|
||||
Host: "imap.example.com",
|
||||
Port: 993,
|
||||
User: "test",
|
||||
Password: "test",
|
||||
TLS: true,
|
||||
}
|
||||
|
||||
cl := imap.NewClient(cfg)
|
||||
err := cl.EnsureFolders([]string{"INBOX/Work"})
|
||||
if err == nil {
|
||||
t.Fatal("expected error with no connection")
|
||||
}
|
||||
}
|
||||
|
||||
func TestCountUnseenNotConnected(t *testing.T) {
|
||||
cfg := imap.Config{
|
||||
Host: "imap.example.com",
|
||||
Port: 993,
|
||||
User: "test",
|
||||
Password: "test",
|
||||
TLS: true,
|
||||
}
|
||||
|
||||
cl := imap.NewClient(cfg)
|
||||
_, err := cl.CountUnseen("INBOX")
|
||||
if err == nil {
|
||||
t.Fatal("expected error with no connection")
|
||||
}
|
||||
}
|
||||
|
||||
func TestFetchBatchNotConnected(t *testing.T) {
|
||||
cfg := imap.Config{
|
||||
Host: "imap.example.com",
|
||||
Port: 993,
|
||||
User: "test",
|
||||
Password: "test",
|
||||
TLS: true,
|
||||
}
|
||||
|
||||
cl := imap.NewClient(cfg)
|
||||
_, _, err := cl.FetchBatch("INBOX", 1, 100, 10)
|
||||
if err == nil {
|
||||
t.Fatal("expected error with no connection")
|
||||
}
|
||||
}
|
||||
|
||||
func TestCopyMessagesNotConnected(t *testing.T) {
|
||||
cfg := imap.Config{
|
||||
Host: "imap.example.com",
|
||||
Port: 993,
|
||||
User: "test",
|
||||
Password: "test",
|
||||
TLS: true,
|
||||
}
|
||||
|
||||
cl := imap.NewClient(cfg)
|
||||
err := cl.CopyMessages("INBOX", []uint32{1}, "INBOX/Archive")
|
||||
if err == nil {
|
||||
t.Fatal("expected error with no connection")
|
||||
}
|
||||
}
|
||||
|
||||
func TestMarkSeenNotConnected(t *testing.T) {
|
||||
cfg := imap.Config{
|
||||
Host: "imap.example.com",
|
||||
Port: 993,
|
||||
User: "test",
|
||||
Password: "test",
|
||||
TLS: true,
|
||||
}
|
||||
|
||||
cl := imap.NewClient(cfg)
|
||||
err := cl.MarkSeen("INBOX", []uint32{1})
|
||||
if err == nil {
|
||||
t.Fatal("expected error with no connection")
|
||||
}
|
||||
}
|
||||
|
||||
func TestFetchByUIDsNotConnected(t *testing.T) {
|
||||
cfg := imap.Config{
|
||||
Host: "imap.example.com",
|
||||
Port: 993,
|
||||
User: "test",
|
||||
Password: "test",
|
||||
TLS: true,
|
||||
}
|
||||
|
||||
cl := imap.NewClient(cfg)
|
||||
_, err := cl.FetchByUIDs("INBOX", []uint32{1})
|
||||
if err == nil {
|
||||
t.Fatal("expected error with no connection")
|
||||
}
|
||||
}
|
||||
|
||||
func TestFetchBodySectionNotConnected(t *testing.T) {
|
||||
cfg := imap.Config{
|
||||
Host: "imap.example.com",
|
||||
Port: 993,
|
||||
User: "test",
|
||||
Password: "test",
|
||||
TLS: true,
|
||||
}
|
||||
|
||||
cl := imap.NewClient(cfg)
|
||||
_, err := cl.FetchBodySection("INBOX", 1, nil)
|
||||
if err == nil {
|
||||
t.Fatal("expected error with no connection")
|
||||
}
|
||||
}
|
||||
|
||||
// TestEmailSummaryFields validates EmailSummary struct
|
||||
func TestEmailSummaryFields(t *testing.T) {
|
||||
now := time.Now()
|
||||
s := imap.EmailSummary{
|
||||
UID: 42,
|
||||
Subject: "Test",
|
||||
From: "user@example.com",
|
||||
Date: now,
|
||||
MessageID: "<abc@example.com>",
|
||||
Snippet: "Hello world",
|
||||
}
|
||||
|
||||
if s.UID != 42 {
|
||||
t.Errorf("expected UID 42, got %d", s.UID)
|
||||
}
|
||||
if s.Subject != "Test" {
|
||||
t.Errorf("expected Subject Test, got %s", s.Subject)
|
||||
}
|
||||
if s.From != "user@example.com" {
|
||||
t.Errorf("expected From user@example.com, got %s", s.From)
|
||||
}
|
||||
if s.MessageID != "<abc@example.com>" {
|
||||
t.Errorf("expected MessageID, got %s", s.MessageID)
|
||||
}
|
||||
if s.Snippet != "Hello world" {
|
||||
t.Errorf("expected snippet, got %s", s.Snippet)
|
||||
}
|
||||
}
|
||||
|
||||
func TestIsConnected(t *testing.T) {
|
||||
cfg := imap.Config{
|
||||
Host: "imap.example.com",
|
||||
Port: 993,
|
||||
User: "test",
|
||||
Password: "test",
|
||||
TLS: true,
|
||||
}
|
||||
|
||||
cl := imap.NewClient(cfg)
|
||||
if cl.IsConnected() {
|
||||
t.Fatal("should not be connected after NewClient")
|
||||
}
|
||||
}
|
||||
|
||||
func TestLoggedOutNoConnection(t *testing.T) {
|
||||
cfg := imap.Config{
|
||||
Host: "imap.example.com",
|
||||
Port: 993,
|
||||
User: "test",
|
||||
Password: "test",
|
||||
TLS: true,
|
||||
}
|
||||
|
||||
cl := imap.NewClient(cfg)
|
||||
ch := cl.LoggedOut()
|
||||
// Channel should be closed immediately (no connection)
|
||||
select {
|
||||
case <-ch:
|
||||
// OK, closed
|
||||
default:
|
||||
t.Fatal("expected logged out channel to be closed when not connected")
|
||||
}
|
||||
}
|
||||
101
src/internal/imap/mover.go
Normal file
101
src/internal/imap/mover.go
Normal file
|
|
@ -0,0 +1,101 @@
|
|||
package imap
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/emersion/go-imap"
|
||||
"github.com/emersion/go-imap/client"
|
||||
)
|
||||
|
||||
// MoveMessages moves messages identified by UIDs from the current mailbox
|
||||
// to the target mailbox. Emails are never deleted, only moved.
|
||||
func (c *Client) MoveMessages(mailbox string, uids []uint32, target string) error {
|
||||
return c.Execute(func(cl *client.Client) error {
|
||||
_, err := cl.Select(mailbox, false)
|
||||
if err != nil {
|
||||
return fmt.Errorf("select source %q: %w", mailbox, err)
|
||||
}
|
||||
|
||||
if err := ensureFolder(cl, target); err != nil {
|
||||
return fmt.Errorf("ensure target %q: %w", target, err)
|
||||
}
|
||||
|
||||
seqset := new(imap.SeqSet)
|
||||
seqset.AddNum(uids...)
|
||||
|
||||
if err := cl.UidMove(seqset, target); err != nil {
|
||||
return fmt.Errorf("uid move to %q: %w", target, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// CopyMessages copies messages to a target folder without removing from source
|
||||
func (c *Client) CopyMessages(mailbox string, uids []uint32, target string) error {
|
||||
return c.Execute(func(cl *client.Client) error {
|
||||
_, err := cl.Select(mailbox, true)
|
||||
if err != nil {
|
||||
return fmt.Errorf("select source %q: %w", mailbox, err)
|
||||
}
|
||||
|
||||
if err := ensureFolder(cl, target); err != nil {
|
||||
return fmt.Errorf("ensure target %q: %w", target, err)
|
||||
}
|
||||
|
||||
seqset := new(imap.SeqSet)
|
||||
seqset.AddNum(uids...)
|
||||
|
||||
if err := cl.UidCopy(seqset, target); err != nil {
|
||||
return fmt.Errorf("uid copy to %q: %w", target, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// MarkSeen marks messages as read by UID
|
||||
func (c *Client) MarkSeen(mailbox string, uids []uint32) error {
|
||||
return c.Execute(func(cl *client.Client) error {
|
||||
_, err := cl.Select(mailbox, false)
|
||||
if err != nil {
|
||||
return fmt.Errorf("select %q: %w", mailbox, err)
|
||||
}
|
||||
|
||||
seqset := new(imap.SeqSet)
|
||||
seqset.AddNum(uids...)
|
||||
|
||||
if err := cl.UidStore(seqset, imap.FormatFlagsOp(imap.AddFlags, false), []interface{}{imap.SeenFlag}, nil); err != nil {
|
||||
return fmt.Errorf("mark seen: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// ensureFolder creates a folder if it doesn't exist
|
||||
func ensureFolder(cl *client.Client, name string) error {
|
||||
mailboxes := make(chan *imap.MailboxInfo, 10)
|
||||
done := make(chan error, 1)
|
||||
|
||||
go func() {
|
||||
done <- cl.List("", "*", mailboxes)
|
||||
}()
|
||||
|
||||
existing := make(map[string]bool)
|
||||
for m := range mailboxes {
|
||||
existing[m.Name] = true
|
||||
}
|
||||
|
||||
if err := <-done; err != nil {
|
||||
return fmt.Errorf("list: %w", err)
|
||||
}
|
||||
|
||||
if !existing[name] {
|
||||
if err := cl.Create(name); err != nil {
|
||||
return fmt.Errorf("create %q: %w", name, err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
23
src/internal/worker/placeholder_classifier.go
Normal file
23
src/internal/worker/placeholder_classifier.go
Normal file
|
|
@ -0,0 +1,23 @@
|
|||
package worker
|
||||
|
||||
import (
|
||||
"inboxer/src/internal/imap"
|
||||
)
|
||||
|
||||
// PlaceholderClassifier classifies all emails as "Other" until the AI
|
||||
// integration (Phase 3) is implemented.
|
||||
type PlaceholderClassifier struct {
|
||||
DefaultFolder string
|
||||
}
|
||||
|
||||
// NewPlaceholderClassifier creates a classifier that routes everything
|
||||
// to the given default folder.
|
||||
func NewPlaceholderClassifier(defaultFolder string) *PlaceholderClassifier {
|
||||
return &PlaceholderClassifier{DefaultFolder: defaultFolder}
|
||||
}
|
||||
|
||||
// Classify returns a default classification for all emails.
|
||||
// This placeholder will be replaced by the real AI classifier in Phase 3.
|
||||
func (c *PlaceholderClassifier) Classify(email imap.EmailSummary) (string, int, error) {
|
||||
return c.DefaultFolder, 0, nil
|
||||
}
|
||||
383
src/internal/worker/worker.go
Normal file
383
src/internal/worker/worker.go
Normal file
|
|
@ -0,0 +1,383 @@
|
|||
package worker
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"math"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"inboxer/src/internal/db"
|
||||
"inboxer/src/internal/imap"
|
||||
"inboxer/src/pkg/config"
|
||||
)
|
||||
|
||||
// AIClassifier is the interface for AI email classification.
|
||||
// The real implementation will be added in Phase 3.
|
||||
type AIClassifier interface {
|
||||
// Classify returns the target folder name for an email.
|
||||
// Returns folder name and confidence score (0-100).
|
||||
Classify(email imap.EmailSummary) (folder string, confidence int, err error)
|
||||
}
|
||||
|
||||
// FolderConfig holds the target folder names for classification
|
||||
type FolderConfig struct {
|
||||
Important string
|
||||
Ecommerce string
|
||||
Other string
|
||||
Spam string
|
||||
}
|
||||
|
||||
// foldersList returns the list of target folders
|
||||
func (f FolderConfig) foldersList() []string {
|
||||
return []string{f.Important, f.Ecommerce, f.Other, f.Spam}
|
||||
}
|
||||
|
||||
// Worker processes emails for all configured users
|
||||
type Worker struct {
|
||||
db *db.Database
|
||||
cfg *config.Config
|
||||
folders FolderConfig
|
||||
classifier AIClassifier
|
||||
stopCh chan struct{}
|
||||
stopped bool
|
||||
mu sync.Mutex
|
||||
wg sync.WaitGroup
|
||||
userWorkers sync.WaitGroup
|
||||
}
|
||||
|
||||
// NewWorker creates a new background worker
|
||||
func NewWorker(database *db.Database, cfg *config.Config, classifier AIClassifier) *Worker {
|
||||
return &Worker{
|
||||
db: database,
|
||||
cfg: cfg,
|
||||
folders: FolderConfig{
|
||||
Important: cfg.Folders.Important,
|
||||
Ecommerce: cfg.Folders.Ecommerce,
|
||||
Other: cfg.Folders.Other,
|
||||
Spam: cfg.Folders.Spam,
|
||||
},
|
||||
classifier: classifier,
|
||||
stopCh: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
// Start begins the background worker loop
|
||||
func (w *Worker) Start() {
|
||||
w.mu.Lock()
|
||||
if w.stopped {
|
||||
w.mu.Unlock()
|
||||
return
|
||||
}
|
||||
w.mu.Unlock()
|
||||
|
||||
// Create new stop channel if old one was closed
|
||||
w.mu.Lock()
|
||||
select {
|
||||
case <-w.stopCh:
|
||||
w.stopCh = make(chan struct{})
|
||||
default:
|
||||
}
|
||||
w.mu.Unlock()
|
||||
|
||||
w.wg.Add(1)
|
||||
go w.mainLoop()
|
||||
log.Println("Worker: started")
|
||||
}
|
||||
|
||||
// Stop signals the worker to shut down gracefully
|
||||
func (w *Worker) Stop() {
|
||||
w.mu.Lock()
|
||||
if w.stopped {
|
||||
w.mu.Unlock()
|
||||
return
|
||||
}
|
||||
w.stopped = true
|
||||
w.mu.Unlock()
|
||||
|
||||
close(w.stopCh)
|
||||
w.wg.Wait()
|
||||
w.userWorkers.Wait()
|
||||
log.Println("Worker: stopped")
|
||||
}
|
||||
|
||||
// mainLoop is the top-level worker loop that processes users
|
||||
func (w *Worker) mainLoop() {
|
||||
defer w.wg.Done()
|
||||
|
||||
// Run immediately on start, then poll at the default interval
|
||||
const defaultPollInterval = 5 * time.Minute
|
||||
|
||||
ticker := time.NewTicker(defaultPollInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
// Run first iteration immediately
|
||||
w.processAllUsers()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
w.processAllUsers()
|
||||
case <-w.stopCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// processAllUsers iterates over all users with AutoStart enabled
|
||||
func (w *Worker) processAllUsers() {
|
||||
settings, err := w.db.GetUsersWithAutoStart()
|
||||
if err != nil {
|
||||
log.Printf("Worker: failed to get auto-start users: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
if len(settings) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
for _, s := range settings {
|
||||
select {
|
||||
case <-w.stopCh:
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
w.userWorkers.Add(1)
|
||||
go w.processUser(s)
|
||||
}
|
||||
}
|
||||
|
||||
// processUser processes emails for a single user
|
||||
func (w *Worker) processUser(settings db.MailboxSettings) {
|
||||
defer w.userWorkers.Done()
|
||||
|
||||
imapConfig := imap.Config{
|
||||
Host: settings.IMAPHost,
|
||||
Port: settings.IMAPPort,
|
||||
User: settings.IMAPUser,
|
||||
Password: settings.IMAPPassEncrypted, // Already decrypted by GetUsersWithAutoStart
|
||||
TLS: settings.IMAPTLS,
|
||||
}
|
||||
|
||||
client := imap.NewClient(imapConfig)
|
||||
|
||||
if err := client.Connect(); err != nil {
|
||||
log.Printf("Worker [user %d]: IMAP connect failed: %v", settings.UserID, err)
|
||||
return
|
||||
}
|
||||
defer client.Close()
|
||||
|
||||
// Ensure target folders exist
|
||||
if err := client.EnsureFolders(w.folders.foldersList()); err != nil {
|
||||
log.Printf("Worker [user %d]: ensure folders failed: %v", settings.UserID, err)
|
||||
// Non-fatal; continue processing
|
||||
}
|
||||
|
||||
// Determine mode: catch-up if last_processed_uid is 0, else steady-state
|
||||
if settings.LastProcessedUID == 0 {
|
||||
w.runCatchUp(client, settings)
|
||||
} else {
|
||||
w.runSteadyState(client, settings)
|
||||
}
|
||||
}
|
||||
|
||||
// runSteadyState processes unseen emails in steady-state mode
|
||||
// Steady-state: fetch unseen (up to batch_size), classify, move
|
||||
func (w *Worker) runSteadyState(cl *imap.Client, settings db.MailboxSettings) {
|
||||
batchSize := w.cfg.Worker.SteadyStateBatchSize
|
||||
if settings.BatchSize > 0 {
|
||||
batchSize = settings.BatchSize
|
||||
}
|
||||
|
||||
mbox, emails, err := cl.FetchUnseen("INBOX")
|
||||
if err != nil {
|
||||
log.Printf("Worker [user %d]: fetch unseen failed: %v", settings.UserID, err)
|
||||
return
|
||||
}
|
||||
|
||||
if len(emails) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// Log mailbox stats
|
||||
log.Printf("Worker [user %d]: mailbox has %d unseen, %d total", settings.UserID, mbox.Unseen, mbox.Messages)
|
||||
|
||||
// Apply batch limit
|
||||
batch := emails
|
||||
if len(batch) > batchSize {
|
||||
batch = batch[:batchSize]
|
||||
}
|
||||
|
||||
w.processEmails(cl, settings, batch)
|
||||
}
|
||||
|
||||
// runCatchUp processes all emails from last_processed_uid to latest
|
||||
// Catch-up mode: fetch in batches of catch_up_batch_size with cooldown
|
||||
func (w *Worker) runCatchUp(cl *imap.Client, settings db.MailboxSettings) {
|
||||
batchSize := w.cfg.Worker.CatchUpBatchSize
|
||||
cooldown := time.Duration(w.cfg.Worker.CatchUpCooldownSeconds) * time.Second
|
||||
|
||||
startUID := settings.LastProcessedUID
|
||||
if startUID == 0 {
|
||||
startUID = 1 // Start from the beginning
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-w.stopCh:
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
// Fetch the next batch
|
||||
_, emails, err := cl.FetchBatch("INBOX", startUID, math.MaxUint32, batchSize)
|
||||
if err != nil {
|
||||
log.Printf("Worker [user %d]: catch-up fetch failed: %v", settings.UserID, err)
|
||||
return
|
||||
}
|
||||
|
||||
if len(emails) == 0 {
|
||||
return // No more emails to process
|
||||
}
|
||||
|
||||
w.processEmails(cl, settings, emails)
|
||||
|
||||
// Update start UID for next batch
|
||||
lastUID := emails[len(emails)-1].UID
|
||||
startUID = lastUID + 1
|
||||
|
||||
// Cooldown between batches
|
||||
if cooldown > 0 {
|
||||
time.Sleep(cooldown)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// processEmails classifies and moves a batch of emails
|
||||
func (w *Worker) processEmails(cl *imap.Client, settings db.MailboxSettings, emails []imap.EmailSummary) {
|
||||
for _, email := range emails {
|
||||
select {
|
||||
case <-w.stopCh:
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
w.processSingleEmail(cl, settings, email)
|
||||
}
|
||||
}
|
||||
|
||||
// processSingleEmail classifies a single email and moves it
|
||||
func (w *Worker) processSingleEmail(cl *imap.Client, settings db.MailboxSettings, email imap.EmailSummary) {
|
||||
// Check stop signal
|
||||
select {
|
||||
case <-w.stopCh:
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
// Classify the email
|
||||
folder, confidence, err := w.classifier.Classify(email)
|
||||
if err != nil {
|
||||
log.Printf("Worker [user %d]: classification failed for UID %d: %v",
|
||||
settings.UserID, email.UID, err)
|
||||
|
||||
// Record the failure (keep email in INBOX)
|
||||
record := &db.ProcessedEmail{
|
||||
UserID: settings.UserID,
|
||||
MessageID: email.MessageID,
|
||||
UID: email.UID,
|
||||
From: email.From,
|
||||
Subject: email.Subject,
|
||||
ReceivedDate: email.Date,
|
||||
ClassifiedFolder: "INBOX", // Not moved
|
||||
Error: fmt.Sprintf("classification failed: %v", err),
|
||||
TestMode: settings.TestMode,
|
||||
}
|
||||
|
||||
if dbErr := w.db.CreateProcessedEmail(record); dbErr != nil {
|
||||
log.Printf("Worker [user %d]: failed to record error for UID %d: %v",
|
||||
settings.UserID, email.UID, dbErr)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// In test mode, just log and record (don't actually move)
|
||||
if settings.TestMode {
|
||||
log.Printf("Worker [user %d] [TEST] classified UID %d -> %s (confidence: %d%%)",
|
||||
settings.UserID, email.UID, folder, confidence)
|
||||
|
||||
record := &db.ProcessedEmail{
|
||||
UserID: settings.UserID,
|
||||
MessageID: email.MessageID,
|
||||
UID: email.UID,
|
||||
From: email.From,
|
||||
Subject: email.Subject,
|
||||
ReceivedDate: email.Date,
|
||||
ClassifiedFolder: folder,
|
||||
ConfidenceScore: confidence,
|
||||
Moved: false,
|
||||
TestMode: true,
|
||||
}
|
||||
|
||||
if dbErr := w.db.CreateProcessedEmail(record); dbErr != nil {
|
||||
log.Printf("Worker [user %d]: failed to record test result for UID %d: %v",
|
||||
settings.UserID, email.UID, dbErr)
|
||||
}
|
||||
|
||||
// Still update last_processed_uid in test mode
|
||||
_ = w.db.UpdateLastProcessedUID(settings.UserID, email.UID)
|
||||
return
|
||||
}
|
||||
|
||||
// Move the email
|
||||
if err := cl.MoveMessages("INBOX", []uint32{email.UID}, folder); err != nil {
|
||||
log.Printf("Worker [user %d]: failed to move UID %d to %q: %v",
|
||||
settings.UserID, email.UID, folder, err)
|
||||
|
||||
// Record the failure
|
||||
record := &db.ProcessedEmail{
|
||||
UserID: settings.UserID,
|
||||
MessageID: email.MessageID,
|
||||
UID: email.UID,
|
||||
From: email.From,
|
||||
Subject: email.Subject,
|
||||
ReceivedDate: email.Date,
|
||||
ClassifiedFolder: folder,
|
||||
ConfidenceScore: confidence,
|
||||
Error: fmt.Sprintf("move failed: %v", err),
|
||||
TestMode: false,
|
||||
}
|
||||
|
||||
if dbErr := w.db.CreateProcessedEmail(record); dbErr != nil {
|
||||
log.Printf("Worker [user %d]: failed to record move error for UID %d: %v",
|
||||
settings.UserID, email.UID, dbErr)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Record success
|
||||
record := &db.ProcessedEmail{
|
||||
UserID: settings.UserID,
|
||||
MessageID: email.MessageID,
|
||||
UID: email.UID,
|
||||
From: email.From,
|
||||
Subject: email.Subject,
|
||||
ReceivedDate: email.Date,
|
||||
ClassifiedFolder: folder,
|
||||
ConfidenceScore: confidence,
|
||||
Moved: true,
|
||||
TestMode: false,
|
||||
}
|
||||
|
||||
if dbErr := w.db.CreateProcessedEmail(record); dbErr != nil {
|
||||
log.Printf("Worker [user %d]: failed to record success for UID %d: %v",
|
||||
settings.UserID, email.UID, dbErr)
|
||||
}
|
||||
|
||||
// Update last processed UID
|
||||
_ = w.db.UpdateLastProcessedUID(settings.UserID, email.UID)
|
||||
|
||||
log.Printf("Worker [user %d]: moved UID %d -> %s (confidence: %d%%)",
|
||||
settings.UserID, email.UID, folder, confidence)
|
||||
}
|
||||
219
src/internal/worker/worker_test.go
Normal file
219
src/internal/worker/worker_test.go
Normal file
|
|
@ -0,0 +1,219 @@
|
|||
package worker_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"inboxer/src/internal/db"
|
||||
"inboxer/src/internal/imap"
|
||||
"inboxer/src/internal/worker"
|
||||
"inboxer/src/pkg/config"
|
||||
)
|
||||
|
||||
// mockClassifier implements worker.AIClassifier for testing
|
||||
type mockClassifier struct {
|
||||
folder string
|
||||
confidence int
|
||||
err error
|
||||
}
|
||||
|
||||
func (m *mockClassifier) Classify(email imap.EmailSummary) (string, int, error) {
|
||||
return m.folder, m.confidence, m.err
|
||||
}
|
||||
|
||||
// mockDB wraps the real database for testing (uses in-memory SQLite)
|
||||
type mockDB struct {
|
||||
*db.Database
|
||||
}
|
||||
|
||||
func newMockDB(t *testing.T) *db.Database {
|
||||
t.Helper()
|
||||
|
||||
// We need a different approach since NewDatabase needs a file path
|
||||
// Use temporary file
|
||||
database, err := db.NewDatabase(t.TempDir()+"/test.db", "test-secret-key-32-bytes-long!!")
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create test database: %v", err)
|
||||
}
|
||||
return database
|
||||
}
|
||||
|
||||
func TestNewWorker(t *testing.T) {
|
||||
cfg := &config.Config{
|
||||
Worker: config.WorkerConfig{
|
||||
SteadyStateBatchSize: 10,
|
||||
SteadyStateIntervalMinutes: 5,
|
||||
CatchUpBatchSize: 50,
|
||||
CatchUpCooldownSeconds: 5,
|
||||
},
|
||||
Folders: config.FolderConfig{
|
||||
Important: "Important",
|
||||
Ecommerce: "eCommerce",
|
||||
Other: "Other",
|
||||
Spam: "Spam",
|
||||
},
|
||||
}
|
||||
|
||||
classifier := worker.NewPlaceholderClassifier("Other")
|
||||
w := worker.NewWorker(nil, cfg, classifier)
|
||||
|
||||
if w == nil {
|
||||
t.Fatal("NewWorker returned nil")
|
||||
}
|
||||
}
|
||||
|
||||
func TestNewPlaceholderClassifier(t *testing.T) {
|
||||
c := worker.NewPlaceholderClassifier("Other")
|
||||
if c == nil {
|
||||
t.Fatal("NewPlaceholderClassifier returned nil")
|
||||
}
|
||||
|
||||
folder, confidence, err := c.Classify(imap.EmailSummary{
|
||||
UID: 1,
|
||||
Subject: "Test",
|
||||
From: "test@example.com",
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if folder != "Other" {
|
||||
t.Errorf("expected folder 'Other', got %q", folder)
|
||||
}
|
||||
if confidence != 0 {
|
||||
t.Errorf("expected confidence 0, got %d", confidence)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPlaceholderClassifierFields(t *testing.T) {
|
||||
c := worker.NewPlaceholderClassifier("Work")
|
||||
if c.DefaultFolder != "Work" {
|
||||
t.Errorf("expected DefaultFolder 'Work', got %q", c.DefaultFolder)
|
||||
}
|
||||
|
||||
folder, _, _ := c.Classify(imap.EmailSummary{UID: 42})
|
||||
if folder != "Work" {
|
||||
t.Errorf("expected folder 'Work', got %q", folder)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPlaceholderClassifierSubjectFrom(t *testing.T) {
|
||||
c := worker.NewPlaceholderClassifier("Spam")
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
email imap.EmailSummary
|
||||
}{
|
||||
{"normal email", imap.EmailSummary{UID: 1, Subject: "Hello", From: "friend@example.com"}},
|
||||
{"empty subject", imap.EmailSummary{UID: 2, Subject: "", From: "noreply@example.com"}},
|
||||
{"empty from", imap.EmailSummary{UID: 3, Subject: "Test", From: ""}},
|
||||
{"all empty", imap.EmailSummary{UID: 4}},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
folder, _, err := c.Classify(tt.email)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if folder != "Spam" {
|
||||
t.Errorf("expected 'Spam', got %q", folder)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestStartStopWorker(t *testing.T) {
|
||||
database := newMockDB(t)
|
||||
cfg := &config.Config{
|
||||
Worker: config.WorkerConfig{
|
||||
SteadyStateBatchSize: 10,
|
||||
SteadyStateIntervalMinutes: 5,
|
||||
CatchUpBatchSize: 50,
|
||||
CatchUpCooldownSeconds: 1,
|
||||
},
|
||||
Folders: config.FolderConfig{
|
||||
Important: "Important",
|
||||
Ecommerce: "eCommerce",
|
||||
Other: "Other",
|
||||
Spam: "Spam",
|
||||
},
|
||||
}
|
||||
|
||||
classifier := worker.NewPlaceholderClassifier("Other")
|
||||
w := worker.NewWorker(database, cfg, classifier)
|
||||
|
||||
// Start and immediately stop to test lifecycle
|
||||
w.Start()
|
||||
time.Sleep(100 * time.Millisecond) // Let it process
|
||||
w.Stop()
|
||||
|
||||
// No assertion errors = pass
|
||||
}
|
||||
|
||||
func TestProcessNowHandler(t *testing.T) {
|
||||
// Test that calling Start/Stop multiple times is safe
|
||||
database := newMockDB(t)
|
||||
cfg := &config.Config{
|
||||
Worker: config.WorkerConfig{
|
||||
SteadyStateBatchSize: 10,
|
||||
SteadyStateIntervalMinutes: 5,
|
||||
CatchUpBatchSize: 50,
|
||||
CatchUpCooldownSeconds: 1,
|
||||
},
|
||||
Folders: config.FolderConfig{
|
||||
Important: "Important",
|
||||
Ecommerce: "eCommerce",
|
||||
Other: "Other",
|
||||
Spam: "Spam",
|
||||
},
|
||||
}
|
||||
|
||||
classifier := worker.NewPlaceholderClassifier("Other")
|
||||
w := worker.NewWorker(database, cfg, classifier)
|
||||
|
||||
// Start and stop multiple times (should be safe)
|
||||
w.Start()
|
||||
w.Start() // Second start should be safe
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
w.Stop()
|
||||
w.Stop() // Second stop should be safe
|
||||
|
||||
// Start one final time
|
||||
w.Start()
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
w.Stop()
|
||||
}
|
||||
|
||||
func TestMockClassifier(t *testing.T) {
|
||||
// Test with error
|
||||
errorClassifier := &mockClassifier{
|
||||
folder: "",
|
||||
confidence: 0,
|
||||
err: fmt.Errorf("classification failed"),
|
||||
}
|
||||
|
||||
_, _, err := errorClassifier.Classify(imap.EmailSummary{UID: 1})
|
||||
if err == nil {
|
||||
t.Fatal("expected error from mock classifier")
|
||||
}
|
||||
|
||||
// Test with specific folder
|
||||
successClassifier := &mockClassifier{
|
||||
folder: "Important",
|
||||
confidence: 95,
|
||||
err: nil,
|
||||
}
|
||||
|
||||
folder, confidence, err := successClassifier.Classify(imap.EmailSummary{UID: 2})
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if folder != "Important" {
|
||||
t.Errorf("expected 'Important', got %q", folder)
|
||||
}
|
||||
if confidence != 95 {
|
||||
t.Errorf("expected 95, got %d", confidence)
|
||||
}
|
||||
}
|
||||
Loading…
Reference in a new issue