package main import ( "fmt" "net/http" "time" ) // handleSyncPull returns all records modified since the `since` query parameter. // If `since` is empty, returns all records (initial sync). func (app *App) handleSyncPull(w http.ResponseWriter, r *http.Request) { since := r.URL.Query().Get("since") event, _ := app.getEvent() attendees, _ := app.attendeesSince(since) departments, _ := app.listDepartments(since) volunteers, _ := app.listVolunteers("", nil, since) shifts, _ := app.listShifts(nil, "", since) volunteerShifts, _ := app.listVolunteerShifts(since) if attendees == nil { attendees = []Attendee{} } if departments == nil { departments = []Department{} } if volunteers == nil { volunteers = []Volunteer{} } if shifts == nil { shifts = []Shift{} } if volunteerShifts == nil { volunteerShifts = []VolunteerShift{} } writeJSON(w, map[string]any{ "server_time": time.Now().UTC().Format("2006-01-02T15:04:05Z"), "event": event, "attendees": attendees, "departments": departments, "volunteers": volunteers, "shifts": shifts, "volunteer_shifts": volunteerShifts, }) } // handleSyncStream is an SSE endpoint that broadcasts real-time events // (check-ins, etc.) to connected clients. func (app *App) handleSyncStream(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") w.Header().Set("X-Accel-Buffering", "no") flusher, ok := w.(http.Flusher) if !ok { http.Error(w, "streaming not supported", http.StatusInternalServerError) return } ch := app.broker.subscribe() defer app.broker.unsubscribe(ch) fmt.Fprintf(w, "data: {\"event\":\"connected\"}\n\n") flusher.Flush() ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() for { select { case <-r.Context().Done(): return case payload, ok := <-ch: if !ok { return } fmt.Fprintf(w, "data: %s\n\n", payload) flusher.Flush() case <-ticker.C: fmt.Fprintf(w, ": ping\n\n") flusher.Flush() } } }