diff --git a/cmd/mothd/main.go b/cmd/mothd/main.go index 03adedf..e238bac 100644 --- a/cmd/mothd/main.go +++ b/cmd/mothd/main.go @@ -72,6 +72,12 @@ func main() { "Database number for Redis state instance", ) + redis_instance_id := flag.String( + "redis-instance-id", + "", + "Unique (per-cluster) instance ID", + ) + flag.Parse() osfs := afero.NewOsFs() @@ -105,11 +111,19 @@ func main() { redis_db_parsed = redis_db_parsed_inner if err != nil { - log.Fatal("Redis mode was selected, but --redis-db or REDIS_DB were not set") + redis_db_parsed = 0 } } - state = NewRedisState(redis_url_parsed, int(redis_db_parsed)) + redis_instance_id_parsed := *redis_instance_id + if redis_instance_id_parsed == "" { + redis_instance_id_parsed = os.Getenv("REDIS_INSTANCE_ID") + if redis_instance_id_parsed == "" { + log.Fatal("Redis mode was selected, but --redis-instance-id or REDIS_INSTANCE_ID were not set") + } + } + + state = NewRedisState(redis_url_parsed, int(redis_db_parsed), redis_instance_id_parsed) default: case "legacy": state = NewState(afero.NewBasePathFs(osfs, *statePath)) diff --git a/cmd/mothd/redis_state.go b/cmd/mothd/redis_state.go index 41fda79..1524634 100644 --- a/cmd/mothd/redis_state.go +++ b/cmd/mothd/redis_state.go @@ -2,7 +2,6 @@ package main import ( "context" - "encoding/json" "fmt" "strings" "time" @@ -11,10 +10,13 @@ import ( ) const ( - REDIS_KEY_MESSAGE = "moth:message" - REDIS_KEY_TEAMS = "moth:teams" - REDIS_KEY_TEAM_IDS = "moth:team_ids" - REDIS_KEY_POINT_LOG = "moth:points" + REDIS_KEY_PREFIX = "moth" + REDIS_KEY_MESSAGE = "message" + REDIS_KEY_TEAMS = "teams" + REDIS_KEY_TEAM_IDS = "team_ids" + REDIS_KEY_POINT_LOG = "points" + REDIS_KEY_EVENT_LOG = "events" + REDIS_KEY_ENABLED = "enabled" ) type RedisState struct { @@ -25,56 +27,20 @@ type RedisState struct { // Enabled tracks whether the current State system is processing updates Enabled bool - eventStream chan []string + instance_id string - //lock sync.RWMutex + eventStream chan map[string]interface{} } -type RedisEventEntry struct { - /* - []string{ - strconv.FormatInt(time.Now().Unix(), 10), - event, - participantID, - teamID, - cat, - strconv.Itoa(points), - }, - extra...,*/ - // Unix epoch time of this event - When int64 - Event string - ParticipantID string - TeamID string - Category string - Points int - Extra []string -} - -// MarshalJSON returns the award event, encoded as a list. -func (e RedisEventEntry) MarshalJSON() ([]byte, error) { - /*ao := []interface{}{ - a.When, - a.TeamID, - a.Category, - a.Points, - }*/ - - return json.Marshal(e) -} - -// UnmarshalJSON decodes the JSON string b. -func (e RedisEventEntry) UnmarshalJSON(b []byte) error { - //r := bytes.NewReader(b) - //dec := json.NewDecoder(r) - //dec.UseNumber() // Don't use floats - json.Unmarshal(b, &e) - - return nil +type RedisAward struct { + when int64 + teamID string + category string + points int } // NewRedisState returns a new State struct backed by the given Fs -func NewRedisState(redis_addr string, redis_db int) *RedisState { +func NewRedisState(redis_addr string, redis_db int, instance_id string) *RedisState { rdb := redis.NewClient(&redis.Options{ Addr: redis_addr, @@ -83,19 +49,32 @@ func NewRedisState(redis_addr string, redis_db int) *RedisState { }) s := &RedisState{ - Enabled: true, - eventStream: make(chan []string, 80), - ctx: context.Background(), redis_client: rdb, + Enabled: true, + instance_id: instance_id, + eventStream: make(chan map[string]interface{}, 80), } + //s.initialize() + return s } +func (s *RedisState) formatRedisKey(key string) string { + return fmt.Sprintf("%s:%s:%s", REDIS_KEY_PREFIX, s.instance_id, key) +} + +func (s *RedisState) initialize() { + s.SetMessagesOverride("", false) +} + + +// ************ Message-related operations **************** + // Messages retrieves the current messages. func (s *RedisState) Messages() string { - val, err := s.redis_client.Get(s.ctx, REDIS_KEY_MESSAGE).Result() + val, err := s.redis_client.Get(s.ctx, s.formatRedisKey(REDIS_KEY_MESSAGE)).Result() if err != nil { return "" @@ -104,9 +83,35 @@ func (s *RedisState) Messages() string { return val } +func (s *RedisState) SetMessages(message string) error { + return s.SetMessagesOverride(message, true) +} + +func (s *RedisState) SetMessagesOverride(message string, override bool) error { + if override { + return s.redis_client.Set(s.ctx, s.formatRedisKey(REDIS_KEY_MESSAGE), message, 0).Err() + } else { + return s.redis_client.SetNX(s.ctx, s.formatRedisKey(REDIS_KEY_MESSAGE), message, 0).Err() + } +} + +// ******************** Team operations ****************** + +func (s *RedisState) TeamIDs() ([]string, error) { + return s.redis_client.SMembers(s.ctx, s.formatRedisKey(REDIS_KEY_TEAM_IDS)).Result() +} + +func (s *RedisState) AddTeamID(teamID string) error { + return s.redis_client.SAdd(s.ctx, s.formatRedisKey(REDIS_KEY_TEAM_IDS), teamID).Err() +} + +func (s *RedisState) TeamNames() (map[string]string, error) { + return s.redis_client.HGetAll(s.ctx, s.formatRedisKey(REDIS_KEY_TEAMS)).Result() +} + // TeamName returns team name given a team ID. func (s *RedisState) TeamName(teamID string) (string, error) { - team_name, err := s.redis_client.HGet(s.ctx, REDIS_KEY_TEAMS, teamID).Result() + team_name, err := s.redis_client.HGet(s.ctx, s.formatRedisKey(REDIS_KEY_TEAMS), teamID).Result() if err != nil { return "", fmt.Errorf("unregistered team ID: %s", teamID) @@ -118,7 +123,7 @@ func (s *RedisState) TeamName(teamID string) (string, error) { // SetTeamName writes out team name. // This can only be done once per team. func (s *RedisState) SetTeamName(teamID, teamName string) error { - valid_id, err := s.redis_client.SIsMember(s.ctx, REDIS_KEY_TEAM_IDS, teamID).Result() + valid_id, err := s.redis_client.SIsMember(s.ctx, s.formatRedisKey(REDIS_KEY_TEAM_IDS), teamID).Result() if err != nil { return fmt.Errorf("Unexpected error while validating team ID: %s", teamID) @@ -126,7 +131,7 @@ func (s *RedisState) SetTeamName(teamID, teamName string) error { return fmt.Errorf("team ID: (%s) not found in list of valid team IDs", teamID) } - success, err := s.redis_client.HSetNX(s.ctx, REDIS_KEY_TEAMS, teamID, teamName).Result() + success, err := s.redis_client.HSetNX(s.ctx, s.formatRedisKey(REDIS_KEY_TEAM_IDS), teamID, teamName).Result() if err != nil { return fmt.Errorf("Unexpected error while setting team ID: %s and team Name: %s", teamID, teamName) @@ -139,13 +144,14 @@ func (s *RedisState) SetTeamName(teamID, teamName string) error { return fmt.Errorf("Team ID: %s is already set", teamID) } + // PointsLog retrieves the current points log. func (s *RedisState) PointsLog() award.List { redis_args := &redis.ZRangeBy{ Min: "0", Max: "-1", } - scores, err := s.redis_client.ZRangeByScoreWithScores(s.ctx, REDIS_KEY_POINT_LOG, redis_args).Result() + scores, err := s.redis_client.ZRangeByScoreWithScores(s.ctx, s.formatRedisKey(REDIS_KEY_POINT_LOG), redis_args).Result() if err != nil { return make(award.List, 0) @@ -188,7 +194,7 @@ func (s *RedisState) AwardPoints(teamID, category string, points int) error { redis_args.Members = append(redis_args.Members, new_member) - _, err := s.redis_client.ZAddArgs(s.ctx, REDIS_KEY_POINT_LOG, redis_args).Result() + _, err := s.redis_client.ZAddArgs(s.ctx, s.formatRedisKey(REDIS_KEY_POINT_LOG), redis_args).Result() if err != nil { return err @@ -199,6 +205,7 @@ func (s *RedisState) AwardPoints(teamID, category string, points int) error { // LogEvent writes to the event log func (s *RedisState) LogEvent(event, participantID, teamID, cat string, points int, extra ...string) { + /* new_event := RedisEventEntry { When: time.Now().Unix(), Event: event, @@ -209,39 +216,57 @@ func (s *RedisState) LogEvent(event, participantID, teamID, cat string, points i Extra: extra, } - message := new_event.MarshalJSON() + message, _ := new_event.MarshalJSON() + */ + /* - s.eventStream <- append( - []string{ - strconv.FormatInt(time.Now().Unix(), 10), - event, - participantID, - teamID, - cat, - strconv.Itoa(points), + redis_args := redis.XAddArgs { + Stream: s.redisKeyEventLog(), + Values: map[string]interface{}{ + "When": time.Now().Unix(), + "Event": event, + "ParticipantID": participantID, + "TeamID": teamID, + "Category": cat, + "Points": points, + "Extra": extra, }, - extra..., - )*/ + } + s.redis_client.XAdd(s.ctx, &redis_args) + */ + s.eventStream <- + map[string]interface{}{ + "When": time.Now().Unix(), + "Event": event, + "ParticipantID": participantID, + "TeamID": teamID, + "Category": cat, + "Points": points, + "Extra": extra, + } +} + +func (s *RedisState) writeEvent(event map[string]interface{}) { + redis_args := redis.XAddArgs { + Stream: s.formatRedisKey(REDIS_KEY_EVENT_LOG), + Values: event, + } + + s.redis_client.XAdd(s.ctx, &redis_args) } func (s *RedisState) Maintain(updateInterval time.Duration) { - /* ticker := time.NewTicker(updateInterval) - s.refresh() + for { select { case msg := <-s.eventStream: - s.eventWriter.Write(msg) - s.eventWriter.Flush() - s.eventWriterFile.Sync() + s.writeEvent(msg) case <-ticker.C: - s.refresh() - case <-s.refreshNow: - s.refresh() + /* There are no maintanance tasks for this provider, currently. Maybe some state-saving mechanism, at some point? */ } } - */ } \ No newline at end of file diff --git a/pkg/award/award.go b/pkg/award/award.go index fda4c05..3c8697c 100644 --- a/pkg/award/award.go +++ b/pkg/award/award.go @@ -23,6 +23,16 @@ type T struct { // List is a collection of award events. type List []T +type Award interface { + When() int64 + TeamID() string + Category() string + Points() int + + MarshalJSON() ([]byte, error) + UnMarshalJSON(b []byte) error +} + // Len returns the length of the awards list. func (awards List) Len() int { return len(awards)