Как создать простой планировщик событий в Go
Базовый рабочий механизм представляет собой следующее: запланированное событие добавляется в базу данных, из которой оно будет выполняться в определенное время. Другая задача будет запускаться регулярно, чтобы проверить, не истекло ли какое-либо событие в базе данных. Если да, то запустится событие polling.
Начнем с создания схемы базы данных в postgresql, которая будет использоваться для хранения событий:
CREATE TABLE IF NOT EXISTS "public"."jobs" (
"id" SERIAL PRIMARY KEY,
"name" varchar(50) NOT NULL,
"payload" text,
"runAt" TIMESTAMP NOT NULL
)Теперь определим структуру данных для следующих элементов.
Event— запланированная задача.Listeners— список слушателей событий.ListenFunc— функция, которая будет вызываться при запуске события.
// Listeners прикрепляет слушателей событий
type Listeners map[string]ListenFunc
// Функция ListenFunc, которая прослушивает события
type ListenFunc func(string)
// Структура события
type Event struct {
ID uint
Name string
Payload string
}Также определим структуру Scheduler, которая будет планировать события и запускать слушателей:
// Структура данных планировщика
type Scheduler struct {
db *sql.DB
listeners Listeners
}
// NewScheduler создает новый планировщик
func NewScheduler(db *sql.DB, listeners Listeners) Scheduler {
return Scheduler{
db: db,
listeners: listeners,
}
}Здесь мы создаем новый планировщик, передавая ему экземпляр sql.DB и начальных слушателей.
Теперь нужно добавить реализацию функции планирования, которая будет помещать событие в таблицу jobs:
// Schedule планирует предоставленные события
func (s Scheduler) Schedule(event string, payload string, runAt time.Time) {
log.Print("🚀 Scheduling event ", event, " to run at ", runAt)
_, err := s.db.Exec(`INSERT INTO "public"."jobs" ("name", "payload", "runAt") VALUES ($1, $2, $3)`, event, payload, runAt)
if err != nil {
log.Print("schedule insert error: ", err)
}
}
// AddListener добавляет функцию listener в Listeners
func (s Scheduler) AddListener(event string, listenFunc ListenFunc) {
s.listeners[event] = listenFunc
}В функции AddListener мы просто присваиваем функцию listener к имени события.
Мы завершили первый этап: добавили событие в таблицу job. Теперь нужно извлечь устаревшие задачи из базы данных, выполнить их, а затем удалить.
Реализация функции ниже показывает, как можно выявить устаревшие события в таблице, а также сериализацию события в структуру Event:
// checkDueEvents проверяет и возвращает соответствующие события
func (s Scheduler) checkDueEvents() []Event {
events := []Event{}
rows, err := s.db.Query(`SELECT "id", "name", "payload" FROM "public"."jobs" WHERE "runAt" < $1`, time.Now())
if err != nil {
log.Print("💀 error: ", err)
return nil
}
for rows.Next() {
evt := Event{}
rows.Scan(&evt.ID, &evt.Name, &evt.Payload)
events = append(events, evt)
}
return events
}Переходим ко второму этапу: вызываем зарегистрированных слушателей событий из базы данных:
// callListeners вызывает слушателя определенного события
func (s Scheduler) callListeners(event Event) {
eventFn, ok := s.listeners[event.Name]
if ok {
go eventFn(event.Payload)
_, err := s.db.Exec(`DELETE FROM "public"."jobs" WHERE "id" = $1`, event.ID)
if err != nil {
log.Print("💀 error: ", err)
}
} else {
log.Print("💀 error: couldn't find event listeners attached to ", event.Name)
}
}Здесь мы проверяем наличие функции event. Если она прикреплена, то мы вызываем функцию event listener. Строки 6–9 удаляют задачу, поэтому при повторном поиске по базе данных слушатель не будет найден.
И наконец, переходим к финальному этапу: проверяем, истекло ли какое-либо событие в заданный интервал времени. Для запуска задач в определенный период используем функцию ticker библиотеки time, которая предоставит канал, получающий новый тик в заданном интервале.
// CheckEventsInInterval проверяет события в заданном интервале
func (s Scheduler) CheckEventsInInterval(ctx context.Context, duration time.Duration) {
ticker := time.NewTicker(duration)
go func() {
for {
select {
case <-ctx.Done():
ticker.Stop()
return
case <-ticker.C:
log.Println("⏰ Ticks Received...")
events := s.checkDueEvents()
for _, e := range events {
s.callListeners(e)
}
}
}
}()
}Здесь мы проверяем, закрыт ли контекст, или же канал ticker получает тики. После получения тиков просматриваем соответствующие события, а затем вызываем слушателей для всех событий.
Теперь мы будем использовать все функции, определенные ранее в файле main.go:
package main
import (
"context"
"log"
"os"
"os/signal"
"time"
"github.com/dipeshdulal/event-scheduling/customevents"
)
var eventListeners = Listeners{
"SendEmail": customevents.SendEmail,
"PayBills": customevents.PayBills,
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)
db := initDBConnection()
scheduler := NewScheduler(db, eventListeners)
scheduler.CheckEventsInInterval(ctx, time.Minute)
scheduler.Schedule("SendEmail", "mail: [email protected]", time.Now().Add(1*time.Minute))
scheduler.Schedule("PayBills", "paybills: $4,000 bill", time.Now().Add(2*time.Minute))
go func() {
for range interrupt {
log.Println("
❌ Interrupt received closing...")
cancel()
}
}()
<-ctx.Done()
}В строках 13–16 мы прикрепляем слушателей к имени событий SendEmail и PayBills, чтобы эти функции вызывались при появлении новой задачи.
В строках 22 и 32–37 мы прикрепляем канал прерывания (interrupt) с помощью os.Interrupt. Когда прерывание в программе выполняется, мы отменяем контекст в строке 19.
В строках 26–30 мы определяем планировщик событий, запускаем функцию polling и планируем выполнение события SendEmail через минуту, а PayBills — через две минуты.
Вывод данной программы выглядит следующим образом:
2021/01/16 11:58:49 💾 Seeding database with table...
2021/01/16 11:58:49 🚀 Scheduling event SendEmail to run at 2021-01-16 11:59:49.344904505 +0545 +0545 m=+60.004623549
2021/01/16 11:58:49 🚀 Scheduling event PayBills to run at 2021-01-16 12:00:49.34773798 +0545 +0545 m=+120.007457039
2021/01/16 11:59:49 ⏰ Ticks Received...
2021/01/16 11:59:49 📨 Sending email with data: mail: [email protected]
2021/01/16 12:00:49 ⏰ Ticks Received...
2021/01/16 12:01:49 ⏰ Ticks Received...
2021/01/16 12:01:49 💲 Pay me a bill: paybills: $4,000 bill
2021/01/16 12:02:49 ⏰ Ticks Received...
2021/01/16 12:03:49 ⏰ Ticks Received...
^C2021/01/16 12:03:57
❌ Interrupt received closing...Мы видим, что событие SendEmail было выполнено через минуту, а PayBills — в следующую минуту.
Таким образом, мы создали базовую систему планирования событий, которая выполняет задачи в определенный временной интервал. Здесь можно найти полный пример кода.

Comments