Skip to content

Commit

Permalink
ignore duplicate pageviews
Browse files Browse the repository at this point in the history
  • Loading branch information
negrel committed Sep 18, 2024
1 parent c58700c commit dc0ed5f
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 73 deletions.
24 changes: 20 additions & 4 deletions pkg/handlers/events_pageviews.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,22 +90,28 @@ func eventsPageviewsHandler(

// Internal traffic, session may already exists.
if isInternalTraffic {
sessionExists := false
var result sessionstorage.AddPageviewResult
var sessionExists bool

if visitorId != "" { // Identify session.
visitorId = utils.CopyString(visitorId)
_, sessionExists = sessionStorage.IdentifySession(deviceId, visitorId)
if sessionExists {

// Increment pageview count.
pageView.Session, sessionExists = sessionStorage.IncSessionPageviewCount(deviceId)
result, sessionExists = sessionStorage.AddPageview(deviceId, pageView.PageUri)
if !sessionExists { // Should never happend.
logger.Panic().Msg("failed to increment session pageview count after IdentifySession returned a session")
}
}
} else { // Anon session.
// Increment pageview count.
pageView.Session, sessionExists = sessionStorage.IncSessionPageviewCount(deviceId)
result, sessionExists = sessionStorage.AddPageview(deviceId, pageView.PageUri)
}

if result.DuplicatePageview {
return nil
}
pageView.Session = result.Session

if !sessionExists {
newSession = true
Expand All @@ -124,6 +130,16 @@ func eventsPageviewsHandler(
return fiber.NewError(fiber.StatusBadRequest, "bot session filtered")
}

// Ignore duplicated pageview (page refresh or duplicate tab).
{
sess, found := sessionStorage.WaitSession(deviceId, time.Duration(0))
if found && sess.PageviewCount == 1 && sess.PageUri.Path() == pageView.PageUri.Path() &&
sess.ReferrerUri.IsValid() == referrerUri.IsValid() {
if !referrerUri.IsValid() || referrerUri.Path() == sess.ReferrerUri.Path() {
return nil
}
}
}
sessionUuid, err := uuid.NewV7()
if err != nil {
return fmt.Errorf("failed to generate session uuid: %w", err)
Expand Down
78 changes: 48 additions & 30 deletions pkg/services/sessionstorage/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,23 @@ import (
"time"

"github.com/prismelabs/analytics/pkg/event"
"github.com/prismelabs/analytics/pkg/uri"
"github.com/prometheus/client_golang/prometheus"
"github.com/rs/zerolog"
)

type AddPageviewResult struct {
Session event.Session
DuplicatePageview bool
}

// Service define an in memory session storage.
type Service interface {
// InsertSession inserts session in session storage and associate it to the
// given deviceId.
InsertSession(deviceId uint64, session event.Session)
// IncSessionPageviewCount increments pageview and returns it.
IncSessionPageviewCount(deviceId uint64) (event.Session, bool)
AddPageview(deviceId uint64, pageUri uri.Uri) (AddPageviewResult, bool)
// IdentifySession updates stored session visitor id. Updated session and
// boolean found flag are returned.
IdentifySession(deviceId uint64, visitorId string) (event.Session, bool)
Expand All @@ -35,9 +41,10 @@ type service struct {
}

type entry struct {
session event.Session
expiry uint32
wait chan struct{}
session event.Session
latestUri uri.Uri
expiry uint32
wait chan struct{}
}

// ProvideService is a wire provider for in memory session storage.
Expand Down Expand Up @@ -78,12 +85,12 @@ func (s *service) getSessionEntry(deviceId uint64) (entry, bool) {
entry.wait == nil // Someone is waiting on this session but none exists.
}

// getSession is the same as getSessionEntry but returns only the session and
// checks that it hasn't expired.
// getValidSessionEntry is the same as getSessionEntry but returns true only if
// the session is valid.
// You must hold mutex while calling this function.
func (s *service) getSession(deviceId uint64) (event.Session, bool) {
func (s *service) getValidSessionEntry(deviceId uint64) (entry, bool) {
entry, ok := s.getSessionEntry(deviceId)
return entry.session, ok && uint32(time.Now().Unix()) < entry.expiry // Not expired session.
return entry, ok && uint32(time.Now().Unix()) < entry.expiry // Not expired session.
}

// InsertSession implements Service.
Expand All @@ -93,9 +100,10 @@ func (s *service) InsertSession(deviceId uint64, session event.Session) {

// Store session.
s.data[deviceId] = entry{
session: session,
expiry: s.newExpiry(),
wait: nil,
session: session,
latestUri: session.PageUri,
expiry: s.newExpiry(),
wait: nil,
}
s.mu.Unlock()

Expand All @@ -113,52 +121,62 @@ func (s *service) InsertSession(deviceId uint64, session event.Session) {
}
}

// IncSessionPageviewCount implements Service.
func (s *service) IncSessionPageviewCount(deviceId uint64) (event.Session, bool) {
// AddPageview implements Service.
func (s *service) AddPageview(deviceId uint64, pageUri uri.Uri) (AddPageviewResult, bool) {
s.mu.Lock()
session, ok := s.getSession(deviceId)
sessionEntry, ok := s.getValidSessionEntry(deviceId)
// Session not found.
if !ok {
s.mu.Unlock()
return event.Session{}, false
return AddPageviewResult{}, false
}

session.PageviewCount++
// Duplicate pageview.
if sessionEntry.latestUri.Path() == pageUri.Path() {
s.mu.Unlock()
return AddPageviewResult{
Session: sessionEntry.session,
DuplicatePageview: true,
}, true
}

sessionEntry.session.PageviewCount++

s.data[deviceId] = entry{
session: session,
expiry: s.newExpiry(),
session: sessionEntry.session,
latestUri: pageUri,
expiry: s.newExpiry(),
}

s.mu.Unlock()

return session, true
return AddPageviewResult{
Session: sessionEntry.session,
}, true
}

// IdentifySession implements Service.
func (s *service) IdentifySession(deviceId uint64, visitorId string) (event.Session, bool) {
s.mu.Lock()
session, ok := s.getSession(deviceId)
sessionEntry, ok := s.getValidSessionEntry(deviceId)
if !ok {
s.mu.Unlock()
return event.Session{}, false
}

// No need for update.
if session.VisitorId == visitorId {
if sessionEntry.session.VisitorId == visitorId {
s.mu.Unlock()
return session, true
return sessionEntry.session, true
}

// Update visitor id.
session.VisitorId = visitorId
s.data[deviceId] = entry{
session: session,
expiry: s.newExpiry(),
}
sessionEntry.session.VisitorId = visitorId
sessionEntry.expiry = s.newExpiry()
s.data[deviceId] = sessionEntry
s.mu.Unlock()

return session, true
return sessionEntry.session, true
}

// WaitSession implements Service.
Expand Down Expand Up @@ -208,10 +226,10 @@ func (s *service) WaitSession(deviceId uint64, timeout time.Duration) (event.Ses
}

s.mu.RLock()
session, ok := s.getSession(deviceId)
sessionEntry, ok = s.getValidSessionEntry(deviceId)
s.mu.RUnlock()

return session, ok
return sessionEntry.session, ok
}

// session garbage collector.
Expand Down
121 changes: 82 additions & 39 deletions pkg/services/sessionstorage/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/prismelabs/analytics/pkg/event"
"github.com/prismelabs/analytics/pkg/log"
"github.com/prismelabs/analytics/pkg/testutils"
"github.com/prismelabs/analytics/pkg/uri"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
)
Expand All @@ -20,6 +21,8 @@ func TestService(t *testing.T) {
sessionInactiveTtl: 24 * time.Hour,
}

mustParseUri := testutils.Must(uri.Parse)

t.Run("InsertSession", func(t *testing.T) {
t.Run("NonExistent", func(t *testing.T) {
promRegistry := prometheus.NewRegistry()
Expand All @@ -33,9 +36,9 @@ func TestService(t *testing.T) {

service.InsertSession(deviceId, session)

storedSession, ok := service.getSession(deviceId)
sessionEntry, ok := service.getSessionEntry(deviceId)
require.True(t, ok)
require.Equal(t, storedSession, session)
require.Equal(t, sessionEntry.session, session)

require.Equal(t, float64(1),
testutils.GaugeValue(t, promRegistry, "sessionstorage_active_sessions", nil))
Expand Down Expand Up @@ -74,9 +77,9 @@ func TestService(t *testing.T) {
// Overwrite session A.
service.InsertSession(deviceId, sessionB)

storedSession, ok := service.getSession(deviceId)
sessionEntry, ok := service.getSessionEntry(deviceId)
require.True(t, ok)
require.Equal(t, storedSession, sessionB)
require.Equal(t, sessionEntry.session, sessionB)

require.Equal(t, float64(1),
testutils.GaugeValue(t, promRegistry, "sessionstorage_active_sessions", nil))
Expand All @@ -98,41 +101,81 @@ func TestService(t *testing.T) {
})
})

t.Run("IncSessionPageviewCount", func(t *testing.T) {
promRegistry := prometheus.NewRegistry()
service := ProvideService(logger, cfg, promRegistry)

deviceId := rand.Uint64()
sessionV1 := event.Session{
VisitorId: "prisme_XXX",
PageviewCount: 1,
}

service.InsertSession(deviceId, sessionV1)

sessionV2, ok := service.IncSessionPageviewCount(deviceId)
require.True(t, ok)
require.Equal(t, sessionV1.VisitorId, sessionV2.VisitorId)

require.Equal(t, sessionV1.PageviewCount+1, sessionV2.PageviewCount)

require.Equal(t, float64(1),
testutils.GaugeValue(t, promRegistry, "sessionstorage_active_sessions", nil))
require.Equal(t, float64(0),
testutils.GaugeValue(t, promRegistry, "sessionstorage_sessions_wait", nil))
require.Equal(t, float64(1),
testutils.CounterValue(t, promRegistry, "sessionstorage_sessions_total",
prometheus.Labels{"type": "inserted"}))
require.Equal(t, float64(0),
testutils.CounterValue(t, promRegistry, "sessionstorage_sessions_total",
prometheus.Labels{"type": "overwritten"}))
require.Equal(t, float64(0),
testutils.CounterValue(t, promRegistry, "sessionstorage_sessions_total",
prometheus.Labels{"type": "expired"}))
require.Equal(t, float64(0),
testutils.HistogramSumValue(t, promRegistry, "sessionstorage_sessions_pageviews", nil))
require.Equal(t, float64(0),
testutils.CounterValue(t, promRegistry, "sessionstorage_get_session_misses", nil))
t.Run("AddPageview", func(t *testing.T) {
t.Run("Duplicate", func(t *testing.T) {
promRegistry := prometheus.NewRegistry()
service := ProvideService(logger, cfg, promRegistry)

deviceId := rand.Uint64()
session := event.Session{
PageUri: mustParseUri("https://example.com"),
VisitorId: "prisme_XXX",
PageviewCount: 1,
}

service.InsertSession(deviceId, session)

result, ok := service.AddPageview(deviceId, session.PageUri)
require.True(t, ok)
require.True(t, result.DuplicatePageview)
require.Equal(t, session.VisitorId, result.Session.VisitorId)
require.Equal(t, session.PageviewCount, result.Session.PageviewCount)

require.Equal(t, float64(1),
testutils.GaugeValue(t, promRegistry, "sessionstorage_active_sessions", nil))
require.Equal(t, float64(0),
testutils.GaugeValue(t, promRegistry, "sessionstorage_sessions_wait", nil))
require.Equal(t, float64(1),
testutils.CounterValue(t, promRegistry, "sessionstorage_sessions_total",
prometheus.Labels{"type": "inserted"}))
require.Equal(t, float64(0),
testutils.CounterValue(t, promRegistry, "sessionstorage_sessions_total",
prometheus.Labels{"type": "overwritten"}))
require.Equal(t, float64(0),
testutils.CounterValue(t, promRegistry, "sessionstorage_sessions_total",
prometheus.Labels{"type": "expired"}))
require.Equal(t, float64(0),
testutils.HistogramSumValue(t, promRegistry, "sessionstorage_sessions_pageviews", nil))
require.Equal(t, float64(0),
testutils.CounterValue(t, promRegistry, "sessionstorage_get_session_misses", nil))
})
t.Run("Unique", func(t *testing.T) {
promRegistry := prometheus.NewRegistry()
service := ProvideService(logger, cfg, promRegistry)

deviceId := rand.Uint64()
session := event.Session{
PageUri: mustParseUri("https://example.com"),
VisitorId: "prisme_XXX",
PageviewCount: 1,
}

service.InsertSession(deviceId, session)

result, ok := service.AddPageview(deviceId, mustParseUri("https://example.com/foo"))
require.True(t, ok)
require.False(t, result.DuplicatePageview)
require.Equal(t, session.VisitorId, result.Session.VisitorId)
require.Equal(t, session.PageviewCount+1, result.Session.PageviewCount)

require.Equal(t, float64(1),
testutils.GaugeValue(t, promRegistry, "sessionstorage_active_sessions", nil))
require.Equal(t, float64(0),
testutils.GaugeValue(t, promRegistry, "sessionstorage_sessions_wait", nil))
require.Equal(t, float64(1),
testutils.CounterValue(t, promRegistry, "sessionstorage_sessions_total",
prometheus.Labels{"type": "inserted"}))
require.Equal(t, float64(0),
testutils.CounterValue(t, promRegistry, "sessionstorage_sessions_total",
prometheus.Labels{"type": "overwritten"}))
require.Equal(t, float64(0),
testutils.CounterValue(t, promRegistry, "sessionstorage_sessions_total",
prometheus.Labels{"type": "expired"}))
require.Equal(t, float64(0),
testutils.HistogramSumValue(t, promRegistry, "sessionstorage_sessions_pageviews", nil))
require.Equal(t, float64(0),
testutils.CounterValue(t, promRegistry, "sessionstorage_get_session_misses", nil))
})
})

t.Run("WaitSession", func(t *testing.T) {
Expand Down

0 comments on commit dc0ed5f

Please sign in to comment.