From 83b92825fe008160719b1ed5ce08c8bfa02cd40a Mon Sep 17 00:00:00 2001 From: Lukas Lalinsky Date: Sun, 11 Feb 2024 16:25:33 +0100 Subject: [PATCH] Connect to the index grpc service --- fpstore/cmd/fpstore/main.go | 87 ++++++++++++++++++++++++++------ fpstore/index.go | 99 +++++++++++++++++++++++++++++++++++++ fpstore/service.go | 5 +- fpstore/store.go | 4 +- 4 files changed, 178 insertions(+), 17 deletions(-) create mode 100644 fpstore/index.go diff --git a/fpstore/cmd/fpstore/main.go b/fpstore/cmd/fpstore/main.go index 322fd54..ad026e3 100644 --- a/fpstore/cmd/fpstore/main.go +++ b/fpstore/cmd/fpstore/main.go @@ -1,13 +1,18 @@ package main import ( + "net" "os" + "strconv" _ "github.com/lib/pq" "github.com/pkg/errors" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" "github.com/acoustid/go-acoustid/common" "github.com/acoustid/go-acoustid/fpstore" + index_pb "github.com/acoustid/go-acoustid/proto/index" "github.com/redis/go-redis/v9" log "github.com/sirupsen/logrus" "github.com/urfave/cli" @@ -54,13 +59,20 @@ var PostgresDatabase = cli.StringFlag{ EnvVar: "FPSTORE_POSTGRES_DATABASE", } -var RedisAddrFlag = cli.StringFlag{ - Name: "redis-addr", +var RedisHostFlag = cli.StringFlag{ + Name: "redis-host", Usage: "Redis server address", Value: "localhost:6379", EnvVar: "FPSTORE_REDIS_ADDR", } +var RedisPortFlag = cli.IntFlag{ + Name: "redis-port", + Usage: "Redis server port", + Value: 6379, + EnvVar: "FPSTORE_REDIS_PORT", +} + var RedisDatabaseFlag = cli.IntFlag{ Name: "redis-database", Usage: "Redis server database", @@ -75,15 +87,37 @@ var RedisPasswordFlag = cli.StringFlag{ EnvVar: "FPSTORE_REDIS_PASSWORD", } -var ListenAddrFlag = cli.StringFlag{ - Name: "listen-addr", - Usage: "Listen address", - Value: "localhost:4659", +var ListenHostFlag = cli.StringFlag{ + Name: "listen-host", + Usage: "Listen address", + Value: "localhost", + EnvVar: "FPSTORE_LISTEN_HOST", +} + +var ListenPortFlag = cli.IntFlag{ + Name: "listen-port", + Usage: "Listen port", + Value: 4659, + EnvVar: "FPSTORE_LISTEN_PORT", +} + +var IndexHostFlag = cli.StringFlag{ + Name: "index-host", + Usage: "Index server address", + Value: "localhost", + EnvVar: "FPSTORE_INDEX_HOST", +} + +var IndexPortFlag = cli.IntFlag{ + Name: "index-port", + Usage: "Index server port", + Value: 6080, + EnvVar: "FPSTORE_INDEX_PORT", } func PrepareFingerprintCache(c *cli.Context) (fpstore.FingerprintCache, error) { return fpstore.NewRedisFingerprintCache(&redis.Options{ - Addr: c.String(RedisAddrFlag.Name), + Addr: net.JoinHostPort(c.String(RedisHostFlag.Name), strconv.Itoa(c.Int(RedisPortFlag.Name))), Password: c.String(RedisPasswordFlag.Name), DB: c.Int(RedisDatabaseFlag.Name), }), nil @@ -105,20 +139,36 @@ func PrepareFingerprintStore(c *cli.Context) (fpstore.FingerprintStore, error) { return fpstore.NewPostgresFingerprintStore(db), nil } -func PrepareAndRunServer(c *cli.Context) error { - fingerprintCache, err := PrepareFingerprintCache(c) +func PrepareFingerprintIndex(c *cli.Context) (fpstore.FingerprintIndex, error) { + addr := net.JoinHostPort(c.String(IndexHostFlag.Name), strconv.Itoa(c.Int(IndexPortFlag.Name))) + conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { - return errors.WithMessage(err, "failed to initialize fingerprint cache") + return nil, errors.WithMessage(err, "failed to connect to index server") } + client := index_pb.NewIndexClient(conn) + return fpstore.NewFingerprintIndexClient(client), nil +} +func PrepareAndRunServer(c *cli.Context) error { fingerprintStore, err := PrepareFingerprintStore(c) if err != nil { return errors.WithMessage(err, "failed to initialize fingerprint store") } - service := fpstore.NewFingerprintStoreService(fingerprintStore, fingerprintCache) + fingerprintIndex, err := PrepareFingerprintIndex(c) + if err != nil { + return errors.WithMessage(err, "failed to initialize fingerprint index") + } + + fingerprintCache, err := PrepareFingerprintCache(c) + if err != nil { + return errors.WithMessage(err, "failed to initialize fingerprint cache") + } + + service := fpstore.NewFingerprintStoreService(fingerprintStore, fingerprintIndex, fingerprintCache) - return fpstore.RunFingerprintStoreServer(c.String(ListenAddrFlag.Name), service) + listenAddr := net.JoinHostPort(c.String(ListenHostFlag.Name), strconv.Itoa(c.Int(ListenPortFlag.Name))) + return fpstore.RunFingerprintStoreServer(listenAddr, service) } func CreateApp() *cli.App { @@ -133,10 +183,19 @@ func CreateApp() *cli.App { Name: "server", Usage: "Runs fpstore service", Flags: []cli.Flag{ - ListenAddrFlag, - RedisAddrFlag, + ListenHostFlag, + ListenPortFlag, + PostgresHost, + PostgresPort, + PostgresUser, + PostgresPassword, + PostgresDatabase, + RedisHostFlag, + RedisPortFlag, RedisPasswordFlag, RedisDatabaseFlag, + IndexHostFlag, + IndexPortFlag, }, Action: PrepareAndRunServer, }, diff --git a/fpstore/index.go b/fpstore/index.go new file mode 100644 index 0000000..690c68a --- /dev/null +++ b/fpstore/index.go @@ -0,0 +1,99 @@ +package fpstore + +import ( + "context" + "sort" + + pb "github.com/acoustid/go-acoustid/proto/fpstore" + index_pb "github.com/acoustid/go-acoustid/proto/index" +) + +type FingerprintIndex interface { + Search(ctx context.Context, fp *pb.Fingerprint, limit int) ([]uint64, error) +} + +type FingerprintIndexClient struct { + client index_pb.IndexClient +} + +func NewFingerprintIndexClient(client index_pb.IndexClient) *FingerprintIndexClient { + return &FingerprintIndexClient{client: client} +} + +// Legacy compatibility with https://github.com/acoustid/pg_acoustid/blob/main/acoustid_compare.c#L348 +func ExtractLegacyQuery(fp *pb.Fingerprint) []uint32 { + const QuerySize = 120 + const QueryStart = 80 + + const NumQueryBits = 26 + const QueryBitMask = ((1 << NumQueryBits) - 1) << (32 - NumQueryBits) + + const SilenceHash = 627964279 + + cleanSize := 0 + for _, hash := range fp.Hashes { + if hash != SilenceHash { + cleanSize++ + } + } + + if cleanSize <= 0 { + return nil + } + + query := make([]uint32, QuerySize) + queryHashes := make(map[uint32]struct{}) + querySize := 0 + + for i := max(0, min(cleanSize-QuerySize, QueryStart)); i < len(fp.Hashes) && querySize < QuerySize; i++ { + hash := fp.Hashes[i] + if hash == SilenceHash { + continue + } + hash &= QueryBitMask + if _, ok := queryHashes[hash]; ok { + continue + } + queryHashes[hash] = struct{}{} + query[querySize] = hash + querySize++ + } + + query = query[:querySize] + return query +} + +func filterIndexSearchResults(results []*index_pb.Result, limit int) []*index_pb.Result { + sort.Slice(results, func(i, j int) bool { + return results[i].Hits >= results[j].Hits && results[i].Id < results[j].Id + }) + if limit == 0 || len(results) > limit { + threshold := (results[0].Hits*10 + 50) / 100 + thresholdIndex := sort.Search(len(results), func(i int) bool { + return results[i].Hits < threshold + }) + if limit == 0 || limit > thresholdIndex { + limit = thresholdIndex + } + } + if limit > 0 && len(results) > limit { + results = results[:limit] + } + return results +} + +func (c *FingerprintIndexClient) Search(ctx context.Context, fp *pb.Fingerprint, limit int) ([]uint64, error) { + req := &index_pb.SearchRequest{Hashes: ExtractLegacyQuery(fp)} + resp, err := c.client.Search(ctx, req) + if err != nil { + return nil, err + } + + results := filterIndexSearchResults(resp.Results, limit) + + ids := make([]uint64, len(results)) + for i, result := range results { + ids[i] = uint64(result.Id) + } + return ids, nil +} diff --git a/fpstore/service.go b/fpstore/service.go index 7d6392b..6085de6 100644 --- a/fpstore/service.go +++ b/fpstore/service.go @@ -16,6 +16,7 @@ import ( type FingerprintStoreService struct { pb.UnimplementedFingerprintStoreServer store FingerprintStore + index FingerprintIndex cache FingerprintCache } @@ -29,8 +30,8 @@ func RunFingerprintStoreServer(listenAddr string, service pb.FingerprintStoreSer return server.Serve(lis) } -func NewFingerprintStoreService(store FingerprintStore, cache FingerprintCache) *FingerprintStoreService { - return &FingerprintStoreService{store: store, cache: cache} +func NewFingerprintStoreService(store FingerprintStore, index FingerprintIndex, cache FingerprintCache) *FingerprintStoreService { + return &FingerprintStoreService{store: store, index: index, cache: cache} } // Implement Insert method diff --git a/fpstore/store.go b/fpstore/store.go index 8bf1023..f78a076 100644 --- a/fpstore/store.go +++ b/fpstore/store.go @@ -11,6 +11,7 @@ import ( log "github.com/sirupsen/logrus" + "github.com/acoustid/go-acoustid/index" pb "github.com/acoustid/go-acoustid/proto/fpstore" ) @@ -69,7 +70,8 @@ func (a *Uint32Array) scanString(src string) error { } type PostgresFingerprintStore struct { - db *sql.DB + db *sql.DB + idx *index.IndexClientPool } func NewPostgresFingerprintStore(db *sql.DB) *PostgresFingerprintStore {