Skip to content

Commit

Permalink
proxyd: Support Redis read endpoint for caching (#45)
Browse files Browse the repository at this point in the history
  • Loading branch information
ImTei authored Aug 19, 2024
1 parent 30560d3 commit 98e261e
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 8 deletions.
15 changes: 8 additions & 7 deletions proxyd/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,14 @@ func (c *cache) Put(ctx context.Context, key string, value string) error {
}

type redisCache struct {
rdb *redis.Client
prefix string
ttl time.Duration
redisClient *redis.Client
redisReadClient *redis.Client
prefix string
ttl time.Duration
}

func newRedisCache(rdb *redis.Client, prefix string, ttl time.Duration) *redisCache {
return &redisCache{rdb, prefix, ttl}
func newRedisCache(redisClient *redis.Client, redisReadClient *redis.Client, prefix string, ttl time.Duration) *redisCache {
return &redisCache{redisClient, redisReadClient, prefix, ttl}
}

func (c *redisCache) namespaced(key string) string {
Expand All @@ -63,7 +64,7 @@ func (c *redisCache) namespaced(key string) string {

func (c *redisCache) Get(ctx context.Context, key string) (string, error) {
start := time.Now()
val, err := c.rdb.Get(ctx, c.namespaced(key)).Result()
val, err := c.redisReadClient.Get(ctx, c.namespaced(key)).Result()
redisCacheDurationSumm.WithLabelValues("GET").Observe(float64(time.Since(start).Milliseconds()))

if err == redis.Nil {
Expand All @@ -77,7 +78,7 @@ func (c *redisCache) Get(ctx context.Context, key string) (string, error) {

func (c *redisCache) Put(ctx context.Context, key string, value string) error {
start := time.Now()
err := c.rdb.SetEx(ctx, c.namespaced(key), value, c.ttl).Err()
err := c.redisClient.SetEx(ctx, c.namespaced(key), value, c.ttl).Err()
redisCacheDurationSumm.WithLabelValues("SETEX").Observe(float64(time.Since(start).Milliseconds()))

if err != nil {
Expand Down
1 change: 1 addition & 0 deletions proxyd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type CacheConfig struct {
type RedisConfig struct {
URL string `toml:"url"`
Namespace string `toml:"namespace"`
ReadURL string `toml:"read_url"`
}

type MetricsConfig struct {
Expand Down
57 changes: 57 additions & 0 deletions proxyd/integration_tests/caching_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,63 @@ func TestBatchCaching(t *testing.T) {
require.Equal(t, 1, countRequests(backend, "eth_call"))
}

func TestCachingWithReadReplica(t *testing.T) {
primary, err := miniredis.Run()
require.NoError(t, err)
defer primary.Close()

replica, err := miniredis.Run()
require.NoError(t, err)
defer replica.Close()

hdlr := NewBatchRPCResponseRouter()
hdlr.SetRoute("eth_getBlockByHash", "999", "eth_getBlockByHash")

backend := NewMockBackend(hdlr)
defer backend.Close()

require.NoError(t, os.Setenv("GOOD_BACKEND_RPC_URL", backend.URL()))
require.NoError(t, os.Setenv("REDIS_URL", fmt.Sprintf("redis://%s", primary.Addr())))
require.NoError(t, os.Setenv("REDIS_READ_URL", fmt.Sprintf("redis://%s", replica.Addr())))

config := ReadConfig("caching_replica")
client := NewProxydClient("http://127.0.0.1:8545")
_, shutdown, err := proxyd.Start(config)
require.NoError(t, err)
defer shutdown()

// allow time for the block number fetcher to fire
time.Sleep(1500 * time.Millisecond)

params := []interface{}{"0xc6ef2fc5426d6ad6fd9e2a26abeab0aa2411b7ab17f30a99d3cb96aed1d1055b", "false"}
response := "{\"jsonrpc\": \"2.0\", \"result\": \"eth_getBlockByHash\", \"id\": 999}"
resRaw, _, err := client.SendRPC("eth_getBlockByHash", params)
require.NoError(t, err)

// because the cache is not replicated to the replica, count request must be increased
resCache, _, err := client.SendRPC("eth_getBlockByHash", params)
require.NoError(t, err)
RequireEqualJSON(t, []byte(response), resCache)
RequireEqualJSON(t, resRaw, resCache)
require.Equal(t, 2, countRequests(backend, "eth_getBlockByHash"))

// replicate cache data
for _, key := range primary.Keys() {
value, err := primary.Get(key)
require.NoError(t, err)

err = replica.Set(key, value)
require.NoError(t, err)
}

// now cache hit. count request must be same
resCache, _, err = client.SendRPC("eth_getBlockByHash", params)
require.NoError(t, err)
RequireEqualJSON(t, []byte(response), resCache)
RequireEqualJSON(t, resRaw, resCache)
require.Equal(t, 2, countRequests(backend, "eth_getBlockByHash"))
}

func countRequests(backend *MockBackend, name string) int {
var count int
for _, req := range backend.Requests() {
Expand Down
37 changes: 37 additions & 0 deletions proxyd/integration_tests/testdata/caching_replica.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
[server]
rpc_port = 8545

[backend]
response_timeout_seconds = 1

[redis]
url = "$REDIS_URL"
read_url = "$REDIS_READ_URL"
namespace = "proxyd"

[cache]
enabled = true

[backends]
[backends.good]
rpc_url = "$GOOD_BACKEND_RPC_URL"
ws_url = "$GOOD_BACKEND_RPC_URL"

[backend_groups]
[backend_groups.main]
backends = ["good"]

[rpc_method_mappings]
eth_chainId = "main"
net_version = "main"
eth_getBlockByNumber = "main"
eth_blockNumber = "main"
eth_call = "main"
eth_getBlockTransactionCountByHash = "main"
eth_getUncleCountByBlockHash = "main"
eth_getBlockByHash = "main"
eth_getTransactionByHash = "main"
eth_getTransactionByBlockHashAndIndex = "main"
eth_getUncleByBlockHashAndIndex = "main"
eth_getTransactionReceipt = "main"
debug_getRawReceipts = "main"
20 changes: 19 additions & 1 deletion proxyd/proxyd.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func Start(config *Config) (*Server, func(), error) {
}
}

// redis primary client
var redisClient *redis.Client
if config.Redis.URL != "" {
rURL, err := ReadFromEnvOrConfig(config.Redis.URL)
Expand All @@ -51,6 +52,23 @@ func Start(config *Config) (*Server, func(), error) {
}
}

// redis read replica client
// if read endpoint is not set, use primary endpoint
var redisReadClient = redisClient
if config.Redis.ReadURL != "" {
if redisClient == nil {
return nil, nil, errors.New("must specify a Redis primary URL. only read endpoint is set")
}
rURL, err := ReadFromEnvOrConfig(config.Redis.ReadURL)
if err != nil {
return nil, nil, err
}
redisReadClient, err = NewRedisClient(rURL)
if err != nil {
return nil, nil, err
}
}

if redisClient == nil && config.RateLimit.UseRedis {
return nil, nil, errors.New("must specify a Redis URL if UseRedis is true in rate limit config")
}
Expand Down Expand Up @@ -276,7 +294,7 @@ func Start(config *Config) (*Server, func(), error) {
if config.Cache.TTL != 0 {
ttl = time.Duration(config.Cache.TTL)
}
cache = newRedisCache(redisClient, config.Redis.Namespace, ttl)
cache = newRedisCache(redisClient, redisReadClient, config.Redis.Namespace, ttl)
}
rpcCache = newRPCCache(newCacheWithCompression(cache))
}
Expand Down

0 comments on commit 98e261e

Please sign in to comment.