Core code done, working on utility functions

This commit is contained in:
Donaldson 2021-10-29 12:25:23 -07:00
parent 56ea619b57
commit 79c8412616
3 changed files with 129 additions and 80 deletions

View File

@ -72,6 +72,12 @@ func main() {
"Database number for Redis state instance", "Database number for Redis state instance",
) )
redis_instance_id := flag.String(
"redis-instance-id",
"",
"Unique (per-cluster) instance ID",
)
flag.Parse() flag.Parse()
osfs := afero.NewOsFs() osfs := afero.NewOsFs()
@ -105,11 +111,19 @@ func main() {
redis_db_parsed = redis_db_parsed_inner redis_db_parsed = redis_db_parsed_inner
if err != nil { if err != nil {
log.Fatal("Redis mode was selected, but --redis-db or REDIS_DB were not set") redis_db_parsed = 0
} }
} }
state = NewRedisState(redis_url_parsed, int(redis_db_parsed)) redis_instance_id_parsed := *redis_instance_id
if redis_instance_id_parsed == "" {
redis_instance_id_parsed = os.Getenv("REDIS_INSTANCE_ID")
if redis_instance_id_parsed == "" {
log.Fatal("Redis mode was selected, but --redis-instance-id or REDIS_INSTANCE_ID were not set")
}
}
state = NewRedisState(redis_url_parsed, int(redis_db_parsed), redis_instance_id_parsed)
default: default:
case "legacy": case "legacy":
state = NewState(afero.NewBasePathFs(osfs, *statePath)) state = NewState(afero.NewBasePathFs(osfs, *statePath))

View File

@ -2,7 +2,6 @@ package main
import ( import (
"context" "context"
"encoding/json"
"fmt" "fmt"
"strings" "strings"
"time" "time"
@ -11,10 +10,13 @@ import (
) )
const ( const (
REDIS_KEY_MESSAGE = "moth:message" REDIS_KEY_PREFIX = "moth"
REDIS_KEY_TEAMS = "moth:teams" REDIS_KEY_MESSAGE = "message"
REDIS_KEY_TEAM_IDS = "moth:team_ids" REDIS_KEY_TEAMS = "teams"
REDIS_KEY_POINT_LOG = "moth:points" REDIS_KEY_TEAM_IDS = "team_ids"
REDIS_KEY_POINT_LOG = "points"
REDIS_KEY_EVENT_LOG = "events"
REDIS_KEY_ENABLED = "enabled"
) )
type RedisState struct { type RedisState struct {
@ -25,56 +27,20 @@ type RedisState struct {
// Enabled tracks whether the current State system is processing updates // Enabled tracks whether the current State system is processing updates
Enabled bool Enabled bool
eventStream chan []string instance_id string
//lock sync.RWMutex eventStream chan map[string]interface{}
} }
type RedisEventEntry struct { type RedisAward struct {
/* when int64
[]string{ teamID string
strconv.FormatInt(time.Now().Unix(), 10), category string
event, points int
participantID,
teamID,
cat,
strconv.Itoa(points),
},
extra...,*/
// Unix epoch time of this event
When int64
Event string
ParticipantID string
TeamID string
Category string
Points int
Extra []string
}
// MarshalJSON returns the award event, encoded as a list.
func (e RedisEventEntry) MarshalJSON() ([]byte, error) {
/*ao := []interface{}{
a.When,
a.TeamID,
a.Category,
a.Points,
}*/
return json.Marshal(e)
}
// UnmarshalJSON decodes the JSON string b.
func (e RedisEventEntry) UnmarshalJSON(b []byte) error {
//r := bytes.NewReader(b)
//dec := json.NewDecoder(r)
//dec.UseNumber() // Don't use floats
json.Unmarshal(b, &e)
return nil
} }
// NewRedisState returns a new State struct backed by the given Fs // NewRedisState returns a new State struct backed by the given Fs
func NewRedisState(redis_addr string, redis_db int) *RedisState { func NewRedisState(redis_addr string, redis_db int, instance_id string) *RedisState {
rdb := redis.NewClient(&redis.Options{ rdb := redis.NewClient(&redis.Options{
Addr: redis_addr, Addr: redis_addr,
@ -83,19 +49,32 @@ func NewRedisState(redis_addr string, redis_db int) *RedisState {
}) })
s := &RedisState{ s := &RedisState{
Enabled: true,
eventStream: make(chan []string, 80),
ctx: context.Background(), ctx: context.Background(),
redis_client: rdb, redis_client: rdb,
Enabled: true,
instance_id: instance_id,
eventStream: make(chan map[string]interface{}, 80),
} }
//s.initialize()
return s return s
} }
func (s *RedisState) formatRedisKey(key string) string {
return fmt.Sprintf("%s:%s:%s", REDIS_KEY_PREFIX, s.instance_id, key)
}
func (s *RedisState) initialize() {
s.SetMessagesOverride("", false)
}
// ************ Message-related operations ****************
// Messages retrieves the current messages. // Messages retrieves the current messages.
func (s *RedisState) Messages() string { func (s *RedisState) Messages() string {
val, err := s.redis_client.Get(s.ctx, REDIS_KEY_MESSAGE).Result() val, err := s.redis_client.Get(s.ctx, s.formatRedisKey(REDIS_KEY_MESSAGE)).Result()
if err != nil { if err != nil {
return "" return ""
@ -104,9 +83,35 @@ func (s *RedisState) Messages() string {
return val return val
} }
func (s *RedisState) SetMessages(message string) error {
return s.SetMessagesOverride(message, true)
}
func (s *RedisState) SetMessagesOverride(message string, override bool) error {
if override {
return s.redis_client.Set(s.ctx, s.formatRedisKey(REDIS_KEY_MESSAGE), message, 0).Err()
} else {
return s.redis_client.SetNX(s.ctx, s.formatRedisKey(REDIS_KEY_MESSAGE), message, 0).Err()
}
}
// ******************** Team operations ******************
func (s *RedisState) TeamIDs() ([]string, error) {
return s.redis_client.SMembers(s.ctx, s.formatRedisKey(REDIS_KEY_TEAM_IDS)).Result()
}
func (s *RedisState) AddTeamID(teamID string) error {
return s.redis_client.SAdd(s.ctx, s.formatRedisKey(REDIS_KEY_TEAM_IDS), teamID).Err()
}
func (s *RedisState) TeamNames() (map[string]string, error) {
return s.redis_client.HGetAll(s.ctx, s.formatRedisKey(REDIS_KEY_TEAMS)).Result()
}
// TeamName returns team name given a team ID. // TeamName returns team name given a team ID.
func (s *RedisState) TeamName(teamID string) (string, error) { func (s *RedisState) TeamName(teamID string) (string, error) {
team_name, err := s.redis_client.HGet(s.ctx, REDIS_KEY_TEAMS, teamID).Result() team_name, err := s.redis_client.HGet(s.ctx, s.formatRedisKey(REDIS_KEY_TEAMS), teamID).Result()
if err != nil { if err != nil {
return "", fmt.Errorf("unregistered team ID: %s", teamID) return "", fmt.Errorf("unregistered team ID: %s", teamID)
@ -118,7 +123,7 @@ func (s *RedisState) TeamName(teamID string) (string, error) {
// SetTeamName writes out team name. // SetTeamName writes out team name.
// This can only be done once per team. // This can only be done once per team.
func (s *RedisState) SetTeamName(teamID, teamName string) error { func (s *RedisState) SetTeamName(teamID, teamName string) error {
valid_id, err := s.redis_client.SIsMember(s.ctx, REDIS_KEY_TEAM_IDS, teamID).Result() valid_id, err := s.redis_client.SIsMember(s.ctx, s.formatRedisKey(REDIS_KEY_TEAM_IDS), teamID).Result()
if err != nil { if err != nil {
return fmt.Errorf("Unexpected error while validating team ID: %s", teamID) return fmt.Errorf("Unexpected error while validating team ID: %s", teamID)
@ -126,7 +131,7 @@ func (s *RedisState) SetTeamName(teamID, teamName string) error {
return fmt.Errorf("team ID: (%s) not found in list of valid team IDs", teamID) return fmt.Errorf("team ID: (%s) not found in list of valid team IDs", teamID)
} }
success, err := s.redis_client.HSetNX(s.ctx, REDIS_KEY_TEAMS, teamID, teamName).Result() success, err := s.redis_client.HSetNX(s.ctx, s.formatRedisKey(REDIS_KEY_TEAM_IDS), teamID, teamName).Result()
if err != nil { if err != nil {
return fmt.Errorf("Unexpected error while setting team ID: %s and team Name: %s", teamID, teamName) return fmt.Errorf("Unexpected error while setting team ID: %s and team Name: %s", teamID, teamName)
@ -139,13 +144,14 @@ func (s *RedisState) SetTeamName(teamID, teamName string) error {
return fmt.Errorf("Team ID: %s is already set", teamID) return fmt.Errorf("Team ID: %s is already set", teamID)
} }
// PointsLog retrieves the current points log. // PointsLog retrieves the current points log.
func (s *RedisState) PointsLog() award.List { func (s *RedisState) PointsLog() award.List {
redis_args := &redis.ZRangeBy{ redis_args := &redis.ZRangeBy{
Min: "0", Min: "0",
Max: "-1", Max: "-1",
} }
scores, err := s.redis_client.ZRangeByScoreWithScores(s.ctx, REDIS_KEY_POINT_LOG, redis_args).Result() scores, err := s.redis_client.ZRangeByScoreWithScores(s.ctx, s.formatRedisKey(REDIS_KEY_POINT_LOG), redis_args).Result()
if err != nil { if err != nil {
return make(award.List, 0) return make(award.List, 0)
@ -188,7 +194,7 @@ func (s *RedisState) AwardPoints(teamID, category string, points int) error {
redis_args.Members = append(redis_args.Members, new_member) redis_args.Members = append(redis_args.Members, new_member)
_, err := s.redis_client.ZAddArgs(s.ctx, REDIS_KEY_POINT_LOG, redis_args).Result() _, err := s.redis_client.ZAddArgs(s.ctx, s.formatRedisKey(REDIS_KEY_POINT_LOG), redis_args).Result()
if err != nil { if err != nil {
return err return err
@ -199,6 +205,7 @@ func (s *RedisState) AwardPoints(teamID, category string, points int) error {
// LogEvent writes to the event log // LogEvent writes to the event log
func (s *RedisState) LogEvent(event, participantID, teamID, cat string, points int, extra ...string) { func (s *RedisState) LogEvent(event, participantID, teamID, cat string, points int, extra ...string) {
/*
new_event := RedisEventEntry { new_event := RedisEventEntry {
When: time.Now().Unix(), When: time.Now().Unix(),
Event: event, Event: event,
@ -209,39 +216,57 @@ func (s *RedisState) LogEvent(event, participantID, teamID, cat string, points i
Extra: extra, Extra: extra,
} }
message := new_event.MarshalJSON() message, _ := new_event.MarshalJSON()
*/
/* /*
s.eventStream <- append( redis_args := redis.XAddArgs {
[]string{ Stream: s.redisKeyEventLog(),
strconv.FormatInt(time.Now().Unix(), 10), Values: map[string]interface{}{
event, "When": time.Now().Unix(),
participantID, "Event": event,
teamID, "ParticipantID": participantID,
cat, "TeamID": teamID,
strconv.Itoa(points), "Category": cat,
"Points": points,
"Extra": extra,
}, },
extra..., }
)*/
s.redis_client.XAdd(s.ctx, &redis_args)
*/
s.eventStream <-
map[string]interface{}{
"When": time.Now().Unix(),
"Event": event,
"ParticipantID": participantID,
"TeamID": teamID,
"Category": cat,
"Points": points,
"Extra": extra,
}
}
func (s *RedisState) writeEvent(event map[string]interface{}) {
redis_args := redis.XAddArgs {
Stream: s.formatRedisKey(REDIS_KEY_EVENT_LOG),
Values: event,
}
s.redis_client.XAdd(s.ctx, &redis_args)
} }
func (s *RedisState) Maintain(updateInterval time.Duration) { func (s *RedisState) Maintain(updateInterval time.Duration) {
/*
ticker := time.NewTicker(updateInterval) ticker := time.NewTicker(updateInterval)
s.refresh()
for { for {
select { select {
case msg := <-s.eventStream: case msg := <-s.eventStream:
s.eventWriter.Write(msg) s.writeEvent(msg)
s.eventWriter.Flush()
s.eventWriterFile.Sync()
case <-ticker.C: case <-ticker.C:
s.refresh() /* There are no maintanance tasks for this provider, currently. Maybe some state-saving mechanism, at some point? */
case <-s.refreshNow:
s.refresh()
} }
} }
*/
} }

View File

@ -23,6 +23,16 @@ type T struct {
// List is a collection of award events. // List is a collection of award events.
type List []T type List []T
type Award interface {
When() int64
TeamID() string
Category() string
Points() int
MarshalJSON() ([]byte, error)
UnMarshalJSON(b []byte) error
}
// Len returns the length of the awards list. // Len returns the length of the awards list.
func (awards List) Len() int { func (awards List) Len() int {
return len(awards) return len(awards)