Skip to content

Commit

Permalink
Connect to the index grpc service
Browse files Browse the repository at this point in the history
  • Loading branch information
lalinsky committed Feb 11, 2024
1 parent 8b02f50 commit 83b9282
Show file tree
Hide file tree
Showing 4 changed files with 178 additions and 17 deletions.
87 changes: 73 additions & 14 deletions fpstore/cmd/fpstore/main.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
package main

import (
"net"
"os"
"strconv"

_ "github.com/lib/pq"
"github.com/pkg/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"github.com/acoustid/go-acoustid/common"
"github.com/acoustid/go-acoustid/fpstore"
index_pb "github.com/acoustid/go-acoustid/proto/index"
"github.com/redis/go-redis/v9"
log "github.com/sirupsen/logrus"
"github.com/urfave/cli"
Expand Down Expand Up @@ -54,13 +59,20 @@ var PostgresDatabase = cli.StringFlag{
EnvVar: "FPSTORE_POSTGRES_DATABASE",
}

var RedisAddrFlag = cli.StringFlag{
Name: "redis-addr",
var RedisHostFlag = cli.StringFlag{
Name: "redis-host",
Usage: "Redis server address",
Value: "localhost:6379",
EnvVar: "FPSTORE_REDIS_ADDR",
}

var RedisPortFlag = cli.IntFlag{
Name: "redis-port",
Usage: "Redis server port",
Value: 6379,
EnvVar: "FPSTORE_REDIS_PORT",
}

var RedisDatabaseFlag = cli.IntFlag{
Name: "redis-database",
Usage: "Redis server database",
Expand All @@ -75,15 +87,37 @@ var RedisPasswordFlag = cli.StringFlag{
EnvVar: "FPSTORE_REDIS_PASSWORD",
}

var ListenAddrFlag = cli.StringFlag{
Name: "listen-addr",
Usage: "Listen address",
Value: "localhost:4659",
var ListenHostFlag = cli.StringFlag{
Name: "listen-host",
Usage: "Listen address",
Value: "localhost",
EnvVar: "FPSTORE_LISTEN_HOST",
}

var ListenPortFlag = cli.IntFlag{
Name: "listen-port",
Usage: "Listen port",
Value: 4659,
EnvVar: "FPSTORE_LISTEN_PORT",
}

var IndexHostFlag = cli.StringFlag{
Name: "index-host",
Usage: "Index server address",
Value: "localhost",
EnvVar: "FPSTORE_INDEX_HOST",
}

var IndexPortFlag = cli.IntFlag{
Name: "index-port",
Usage: "Index server port",
Value: 6080,
EnvVar: "FPSTORE_INDEX_PORT",
}

func PrepareFingerprintCache(c *cli.Context) (fpstore.FingerprintCache, error) {
return fpstore.NewRedisFingerprintCache(&redis.Options{
Addr: c.String(RedisAddrFlag.Name),
Addr: net.JoinHostPort(c.String(RedisHostFlag.Name), strconv.Itoa(c.Int(RedisPortFlag.Name))),
Password: c.String(RedisPasswordFlag.Name),
DB: c.Int(RedisDatabaseFlag.Name),
}), nil
Expand All @@ -105,20 +139,36 @@ func PrepareFingerprintStore(c *cli.Context) (fpstore.FingerprintStore, error) {
return fpstore.NewPostgresFingerprintStore(db), nil
}

func PrepareAndRunServer(c *cli.Context) error {
fingerprintCache, err := PrepareFingerprintCache(c)
func PrepareFingerprintIndex(c *cli.Context) (fpstore.FingerprintIndex, error) {
addr := net.JoinHostPort(c.String(IndexHostFlag.Name), strconv.Itoa(c.Int(IndexPortFlag.Name)))
conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return errors.WithMessage(err, "failed to initialize fingerprint cache")
return nil, errors.WithMessage(err, "failed to connect to index server")
}
client := index_pb.NewIndexClient(conn)
return fpstore.NewFingerprintIndexClient(client), nil
}

func PrepareAndRunServer(c *cli.Context) error {
fingerprintStore, err := PrepareFingerprintStore(c)
if err != nil {
return errors.WithMessage(err, "failed to initialize fingerprint store")
}

service := fpstore.NewFingerprintStoreService(fingerprintStore, fingerprintCache)
fingerprintIndex, err := PrepareFingerprintIndex(c)
if err != nil {
return errors.WithMessage(err, "failed to initialize fingerprint index")
}

fingerprintCache, err := PrepareFingerprintCache(c)
if err != nil {
return errors.WithMessage(err, "failed to initialize fingerprint cache")
}

service := fpstore.NewFingerprintStoreService(fingerprintStore, fingerprintIndex, fingerprintCache)

return fpstore.RunFingerprintStoreServer(c.String(ListenAddrFlag.Name), service)
listenAddr := net.JoinHostPort(c.String(ListenHostFlag.Name), strconv.Itoa(c.Int(ListenPortFlag.Name)))
return fpstore.RunFingerprintStoreServer(listenAddr, service)
}

func CreateApp() *cli.App {
Expand All @@ -133,10 +183,19 @@ func CreateApp() *cli.App {
Name: "server",
Usage: "Runs fpstore service",
Flags: []cli.Flag{
ListenAddrFlag,
RedisAddrFlag,
ListenHostFlag,
ListenPortFlag,
PostgresHost,
PostgresPort,
PostgresUser,
PostgresPassword,
PostgresDatabase,
RedisHostFlag,
RedisPortFlag,
RedisPasswordFlag,
RedisDatabaseFlag,
IndexHostFlag,
IndexPortFlag,
},
Action: PrepareAndRunServer,
},
Expand Down
99 changes: 99 additions & 0 deletions fpstore/index.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package fpstore

import (
"context"
"sort"

pb "github.com/acoustid/go-acoustid/proto/fpstore"
index_pb "github.com/acoustid/go-acoustid/proto/index"
)

type FingerprintIndex interface {
Search(ctx context.Context, fp *pb.Fingerprint, limit int) ([]uint64, error)
}

type FingerprintIndexClient struct {
client index_pb.IndexClient
}

func NewFingerprintIndexClient(client index_pb.IndexClient) *FingerprintIndexClient {
return &FingerprintIndexClient{client: client}
}

// Legacy compatibility with https://github.com/acoustid/pg_acoustid/blob/main/acoustid_compare.c#L348
func ExtractLegacyQuery(fp *pb.Fingerprint) []uint32 {
const QuerySize = 120
const QueryStart = 80

const NumQueryBits = 26
const QueryBitMask = ((1 << NumQueryBits) - 1) << (32 - NumQueryBits)

const SilenceHash = 627964279

cleanSize := 0
for _, hash := range fp.Hashes {
if hash != SilenceHash {
cleanSize++
}
}

if cleanSize <= 0 {
return nil
}

query := make([]uint32, QuerySize)
queryHashes := make(map[uint32]struct{})
querySize := 0

for i := max(0, min(cleanSize-QuerySize, QueryStart)); i < len(fp.Hashes) && querySize < QuerySize; i++ {
hash := fp.Hashes[i]
if hash == SilenceHash {
continue
}
hash &= QueryBitMask
if _, ok := queryHashes[hash]; ok {
continue
}
queryHashes[hash] = struct{}{}
query[querySize] = hash
querySize++
}

query = query[:querySize]
return query
}

func filterIndexSearchResults(results []*index_pb.Result, limit int) []*index_pb.Result {
sort.Slice(results, func(i, j int) bool {
return results[i].Hits >= results[j].Hits && results[i].Id < results[j].Id
})
if limit == 0 || len(results) > limit {
threshold := (results[0].Hits*10 + 50) / 100
thresholdIndex := sort.Search(len(results), func(i int) bool {
return results[i].Hits < threshold
})
if limit == 0 || limit > thresholdIndex {
limit = thresholdIndex
}
}
if limit > 0 && len(results) > limit {
results = results[:limit]
}
return results
}

func (c *FingerprintIndexClient) Search(ctx context.Context, fp *pb.Fingerprint, limit int) ([]uint64, error) {
req := &index_pb.SearchRequest{Hashes: ExtractLegacyQuery(fp)}
resp, err := c.client.Search(ctx, req)
if err != nil {
return nil, err
}

results := filterIndexSearchResults(resp.Results, limit)

ids := make([]uint64, len(results))
for i, result := range results {
ids[i] = uint64(result.Id)
}
return ids, nil
}
5 changes: 3 additions & 2 deletions fpstore/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
type FingerprintStoreService struct {
pb.UnimplementedFingerprintStoreServer
store FingerprintStore
index FingerprintIndex
cache FingerprintCache
}

Expand All @@ -29,8 +30,8 @@ func RunFingerprintStoreServer(listenAddr string, service pb.FingerprintStoreSer
return server.Serve(lis)
}

func NewFingerprintStoreService(store FingerprintStore, cache FingerprintCache) *FingerprintStoreService {
return &FingerprintStoreService{store: store, cache: cache}
func NewFingerprintStoreService(store FingerprintStore, index FingerprintIndex, cache FingerprintCache) *FingerprintStoreService {
return &FingerprintStoreService{store: store, index: index, cache: cache}
}

// Implement Insert method
Expand Down
4 changes: 3 additions & 1 deletion fpstore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

log "github.com/sirupsen/logrus"

"github.com/acoustid/go-acoustid/index"
pb "github.com/acoustid/go-acoustid/proto/fpstore"
)

Expand Down Expand Up @@ -69,7 +70,8 @@ func (a *Uint32Array) scanString(src string) error {
}

type PostgresFingerprintStore struct {
db *sql.DB
db *sql.DB
idx *index.IndexClientPool
}

func NewPostgresFingerprintStore(db *sql.DB) *PostgresFingerprintStore {
Expand Down

0 comments on commit 83b9282

Please sign in to comment.