欢迎访问 生活随笔!

凯发k8官方网

当前位置: 凯发k8官方网 > 编程语言 > c# >内容正文

c#

基于c# socket实现的简单的redis客户端 -凯发k8官方网

发布时间:2023/11/16 c# 35 coder
凯发k8官方网 收集整理的这篇文章主要介绍了 基于c# socket实现的简单的redis客户端 小编觉得挺不错的,现在分享给大家,帮大家做个参考.

前言

    redis是一款强大的高性能键值存储数据库,也是目前nosql最流行比较流行的一款数据库,它在广泛的应用场景中扮演着至关重要的角色,包括但不限于缓存、消息队列、会话存储等。在本文中,我们将介绍如何基于c# socket来实现一个简单的redis客户端类redisclient,来演示构建请求和输出的相关通信机制。需要注意的是本文只是着重展示如何基于原生的socket方式与redis server进行通信,并不是构建一个强大的redis开发工具包

redis简介

    redis(remote dictionary server)是一个内存数据库,它支持了非常丰富的数据结构,包括字符串、列表、集合、散列、有序集合等。redis 提供了高性能的读写操作,可以用于缓存数据、消息队列、分布式锁、会话管理等多种用途。redis 通常以键值对的方式存储数据,每个键都与一个值相关联,值的类型可以是字符串、列表、散列等。redis不仅提供了丰富的命令集,用于操作存储在数据库中的数据,还提供了redis serialization protocol (resp) 协议来解析redis server返回的数据。相关的文档地址如下所示:

  • redis凯发k8官方网官网地址 https://redis.io/
  • redis官方文档地址 https://redis.io/docs/
  • redis命令文档地址 https://redis.io/commands/
  • redis序列化协议规范文档地址 https://redis.io/docs/reference/protocol-spec/

redis 命令指南

    redis命令是与redis服务器进行通信的主要方式,通俗点就是发送指定格式的指令用于执行各种操作,包括数据存储、检索、修改和删除等。以下是一些日常使用过程中常见的redis命令及其用途:

  1. get 和 set 命令

    • get key: 用于获取指定键的值。
    • set key value: 用于设置指定键的值.
  2. del 命令

    • del key: 用于删除指定键.
  3. expire 和 ttl 命令

    • expire key seconds: 用于为指定键设置过期时间(秒).
    • ttl key: 用于获取指定键的剩余过期时间(秒).

    注意这里的时间单位是秒

  4. incr 和 decr 命令

    • incr key: 用于递增指定键的值.
    • decr key: 用于递减指定键的值.
  5. rpush 和 lpop 命令

    • rpush key value: 用于将值添加到列表的右侧.
    • lpop key: 用于从列表的左侧弹出一个值.
  6. hset 和 hget 命令

    • hset key field value: 用于设置哈希表中指定字段的值.
    • hget key field: 用于获取哈希表中指定字段的值.
  7. publish 和 subscribe 命令

    • publish channel message: 用于向指定频道发布消息.
    • subscribe channel: 用于订阅指定频道的消息.

当然 redis 支持的命令远不止这些,它还包括对集合、有序集合、位图、hyperloglog 等数据结构的操作,以及事务、lua 脚本执行等高级功能。我们接下来演示的时候也只是展示几个大家比较熟悉的指令,这也是我们学习新知识的时候经常使用的方式,先从最简单最容易的开始入手,循序渐进,这也是微精通所提倡的方式。

redis协议(resp)

redis serialization protocol (resp) 是 redis 使用的二进制协议,用于客户端和服务器之间的通信。我们可以通过该协议解析redis服务器返回的命令格式,解析我们想要的数据。resp具有简洁易解析的特点

  • 简单字符串协议:

    • 格式: ok\r\n
    • 第一个字节是" ”,后跟消息内容,以"\r\n"(回车和换行)结束。
    • 示例: ok\r\n
  • 批量字符串协议:

    • 格式: $5\r\nhello\r\n
    • 第一个字节是"$",后跟字符串的字节长度,然后是实际的字符串内容,最后以"\r\n"结束。
    • 示例:$5\r\nhello\r\n
  • 整数协议:

    • 格式: :42\r\n
    • 第一个字节是":",后跟整数的文本表示,以"\r\n"结束。
    • 示例::42\r\n
  • 数组协议:

    • 格式: *3\r\n:1\r\n:2\r\n:3\r\n
    • 第一个字节是"*",后跟数组中元素的数量,然后是数组中每个元素的 resp 表示,以"\r\n"结束。
    • 示例:*3\r\n:1\r\n:2\r\n:3\r\n
  • 错误协议:

    • 格式: -error message\r\n
    • 第一个字节是"-",后跟错误消息内容,以"\r\n"结束。
    • 示例:-error message\r\n

需要注意的是字符串协议里面的长度不是具体字符的长度,而是对应的utf8对应的字节数组的长度,这一点对于我们解析返回的数据很重要,否则获取数据的时候会影响数据的完整性。

resp协议是redis高效性能的关键之一,它相对比较加单,不需要解析各种头信息等,这使得redis能够在处理大规模数据和请求时表现出色。了解resp协议可以帮助您更好地理解redis客户端类 redisclient 的内部工作原理。可以理解为它属于一种应用层面的协议,通过给定的数据格式解析出想要的数据,这也对我们在实际编程过程中,解决类似的问题,提供了一个不错的思路。

实现redisclient

    上面我们介绍了一些关于redis的基础概念,重点介绍了一下关于redis的命令和resp,接下来我们就结合上面的理论,基于c# socket来简单的模拟一下如何和redis server进行数据交互。主要就是结合redis命令redis 协议(resp)来简单的实现。

通信架子

首先来看一下类的结构

public class redisclient : idisposable, iasyncdisposable
{
    //定义默认端口
    private readonly int defaultport = 6379;
    //定义默认地址
    private readonly string host = "localhost";
    //心跳间隔,单位为毫秒
    private readonly int heartbeatinterval = 30000; 
    private bool _isconnected;
    //心跳定时器
    private timer _heartbeattimer;
    private socket _socket;
    public redisclient(string host = "localhost", int defaultport = 6379)
    {
        host = host;
        defaultport = defaultport;
        // 初始化心跳定时器
        _heartbeattimer = new timer(heartbeatcallback, null, heartbeatinterval, heartbeatinterval);
    }
    //连接方法
    public async task connectasync(int timeoutmilliseconds = 5000)
    {
        _socket = new socket(addressfamily.internetwork, sockettype.stream, protocoltype.tcp);
        var cts = new cancellationtokensource(timeoutmilliseconds);
        await _socket.connectasync(host, defaultport, cts.token);
        _isconnected = true;
    }
    //心跳方法
    private async void heartbeatcallback(object state)
    {
        if (_isconnected)
        {
            var pingcommand = "ping\r\n";
            await sendcommandasync(pingcommand);
        }
    }
    //释放逻辑
    public void dispose()
    {
        disposeasync().getawaiter().getresult();
    }
    public valuetask disposeasync()
    {
        // 停止心跳定时器
        _heartbeattimer.dispose();
        if (_socket != null)
        {
            _socket.shutdown(socketshutdown.both);
            _socket.close();
        }
        return valuetask.completedtask;
    }
}

上面的类定义了实现的大致通信结构,结构中主要涉及到的是通信相关的功能实现,包含socket的初始化信息、默认的连连接信息、心跳方法、释放逻辑等。首先,在构造函数中,指定了默认的redis端口(6379)、地址(localhost),并初始化了心跳定时器。连接方法connectasync通过socket建立与redis服务器的tcp连接。心跳定时器heartbeatcallback定期发送ping命令,确保与服务器的连接保持活动。最后,dispose方法用于释放资源,包括停止心跳定时器和关闭socket连接,实现了idisposableiasyncdisposable接口。这些功能为redisclient类提供了基本的连接和资源管理能力。由于我对socket编程也不是很熟悉,所以定义的可能不是很完善,有比较熟悉的同学,可以多多指导。

发送和解析

有了这个基础的架子之后,我们可以在里面填写具体的实现逻辑了。首先我们来定义发送redis命令和解析resp的逻辑

//发送命令
public async task sendcommandasync(string command)
{
    // 发送命令的实现
    if (!_isconnected)
    {
        // 如果连接已断开,可以进行重连
        await connectasync();
    }
    
    //redis的命令是以\r\n为结尾的
    var request = encoding.utf8.getbytes(command   "\r\n");
    //发送命令
    await _socket.sendasync(new arraysegment(request), socketflags.none);
    var response = new stringbuilder();
    var remainingdata = string.empty;
    //初始化响应字符串和剩余数据
    byte[] receivebuffer = arraypool.shared.rent(1024);
    try
    {
        while (true)
        {
            //读取返回信息
            var bytesread = await _socket.receiveasync(new arraysegment(receivebuffer), socketflags.none);
            //将接收到的数据添加到响应字符串
            var responsedata = remainingdata   encoding.utf8.getstring(receivebuffer, 0, bytesread);
            //提取完整的响应并添加到响应字符串中
            var completeresponses = extractcompleteresponses(ref responsedata);
            foreach (var completeresponse in completeresponses)
            {
                response.append(completeresponse);
            }
            remainingdata = responsedata;
            //结果为\r\n读取结束
            if (response.tostring().endswith("\r\n"))
            {
                break;
            }
        }
    }
    finally
    {
        //释放缓冲区
        arraypool.shared.return(receivebuffer);
    }
    //返回完整的响应字符串
    return response.tostring();
}
private list extractcompleteresponses(ref string data)
{
    var completeresponses = new list();
    while (true)
    {
        var index = data.indexof("\r\n");
        if (index >= 0)
        {
             // 提取一个完整的响应
            var completeresponse = data.substring(0, index   2);
            //将完整的响应添加到列表中
            completeresponses.add(completeresponse);
            data = data.substring(index   2);
        }
        else
        {
            break;
        }
    }
    return completeresponses;
}
private string parseresponse(string response)
{
    if (response.startswith("$"))
    {
        // 处理 bulk strings($)
        var lengthstr = response.substring(1, response.indexof('\r') - 1);
        if (int.tryparse(lengthstr, out int length))
        {
            if (length == -1)
            {
                return null!;
            }
            string rawredisdata = response.substring(response.indexof('\n')   1);
            byte[] utf8bytes = encoding.utf8.getbytes(rawredisdata);
            string value = encoding.utf8.getstring(utf8bytes, 0, length);
            return value;
        }
    }
    else if (response.startswith(" "))
    {
        // 处理 simple strings( )
        return response.substring(1, response.length - 3);
    }
    else if (response.startswith(":"))
    {
        // 处理 integers(:)
        var valuestr = response.substring(1, response.indexof('\r') - 1);
        if (int.tryparse(valuestr, out int value))
        {
            return value.tostring();
        }
    }
    // 如果响应格式不符合预期,抛出异常
    throw new invalidoperationexception(response);
}

上面逻辑涉及到发送和接收redis消息的三个方法sendcommandasyncextractcompleteresponsesparseresponse。虽然上面代码中有注释,但是咱们分别i简单的讲解一下这三个方法

  • sendcommandasync

    该方法主要目的是向 redis 服务器发送命令并异步接收响应

    • 连接检查:首先,检查连接状态 (_isconnected),如果连接已断开,则调用 connectasync 方法进行重连。
    • 命令转换:将传入的命令字符串转换为 utf-8 编码的字节数组,附加回车换行符 ("\r\n")。
    • 接收响应:使用异步循环接收来自服务器的响应。在每次接收之后,将接收到的数据添加到响应字符串中,并提取其中的完整响应。
    • 缓冲区管理:为了有效地处理接收到的数据,使用了一个缓冲区 (receivebuffer),并在方法结束时通过 arraypool.shared.return 进行释放。
    • 提取完整响应:调用 extractcompleteresponses 方法,该方法从响应数据中提取出一个或多个完整的响应,将其从数据中移除,并返回一个列表。
  • extractcompleteresponses

    该方法主要用于从接收到的数据中提取出一个或多个完整的响应。

    • completeresponses 列表:用于存储提取出的完整响应的列表。
    • while 循环:循环进行以下操作,直到数据中没有换行符为止。
    • 提取完整响应:如果找到换行符,就提取从数据开头到换行符位置的子字符串,包括换行符本身,构成一个完整的响应。
    • 添加到列表:将提取出的完整响应添加到 completeresponses 列表中。
  • parseresponse

    该方法主要用于解析从 redis 服务器接收到的响应字符串。

    • 如果响应以 $ 开头,表示这是一个 bulk string 类型的响应。
    • 如果响应以 开头,表示这是一个 simple string 类型的响应。
    • 如果响应以 : 开头,表示这是一个 integer 类型的响应。

简单操作方法

上面有了和redis通信的基本方法,也有了解析resp协议的基础方法,接下来咱们实现几个简单的redis操作指令来展示一下redis客户端具体是如何工作的,简单的几个方法如下所示

//切换db操作
 public async task selectasync(int dbindex)
 {
     var command = $"select {dbindex}";
     await sendcommandasync(command);
 }
//get操作
 public async task getasync(string key)
 {
     var command = $"get {key}";
     return parseresponse(await sendcommandasync(command));
 }
//set操作
 public async task setasync(string key, string value, timespan? expiry = null)
 {
     var command = $"set {key} '{value}'";
     //判断会否追加过期时间
     if (expiry.hasvalue)
     {
         command  = $" ex {expiry.value.totalseconds}";
     }
     var response = parseresponse(await sendcommandasync(command));
     return response == "ok";
 }
 //支持过期时间的setnx操作
 public async task setnxasync(string key, string value, timespan? expiry = null)
 {
    //因为默认的setnx方法不支持添加过期时间,为了保证操作的原子性,使用了lua
     var command = $"eval \"if redis.call('setnx', keys[1], argv[1]) == 1 then if argv[2] then redis.call('expire', keys[1], argv[2]) end return true else return false end\" 1 {key} '{value}'";
     if (expiry.hasvalue)
     {
         command  = $" {expiry.value.totalseconds}";
     }
     var response = parseresponse(await sendcommandasync(command));
     return response == "1";
 }
 
 //添加支持函过期时间的list push操作
 public async task listpushasync(string key, string value, timespan? expiry = null)
 {
     var script = @"local len = redis.call('lpush', keys[1], argv[1])
                     if tonumber(argv[2]) > 0 then
                         redis.call('expire', keys[1], argv[2])
                     end
                     return len";
     var keys = new string[] { key };
     var args = new string[] { value, (expiry?.totalseconds ?? 0).tostring() };
     var response = await executeluascriptasync(script, keys, args);
     return long.parse(response);
 }
//list pop操作
 public async task listpopasync(string key)
 {
     var command = $"lpop {key}";
     return parseresponse(await sendcommandasync(command));
 }
 //listrange操作
 public async task> listrangeasync(string key, int start, int end)
 {
     var command = $"lrange {key} {start} {end}";
     var response = await sendcommandasync(command);
     if (response.startswith("*0\r\n"))
     {
         return new list();
     }
     
     //由于list range返回了是一个数组,所以单独处理了一下,这里我使用了正则,解析字符串也可以,方法随意
     var values = new list();
     var pattern = @"\$\d \r\n(.*?)\r\n";
     matchcollection matches = regex.matches(response, pattern);
     foreach (match match in matches)
     {
         values.add(match.groups[1].value);
     }
     return values;
 }
 //执行lua脚本的方法
 public async task executeluascriptasync(string script, string[]? keys = null, string[]? args = null)
 {
    //去除lua里的换行
     script = regex.replace(script, @"[\r\n]", "");
     // 构建eval命令,将lua脚本、keys和args发送到redis服务器
     var command = $"eval \"{script}\" { keys?.length??0 } ";
     //拼接key和value参数
     if (keys != null && keys.length != 0)
     {
         command  = string.join(" ", keys.select(key => $"{key}"));
     }
     if (args != null && args.length != 0)
     {
         command  = " "   string.join(" ", args.select(arg => $"{arg}"));
     }
     return parseresponse(await sendcommandasync(command));
 }
 //redis发布操作
 public async task subscribeasync(string channel, action handler)
 {
     await sendcommandasync($"subscribe {channel}");
     while (true)
     {
         var response = await sendcommandasync(string.empty);
         string pattern = @"\*\d \r\n\$\d \r\n(.*?)\r\n\$\d \r\n(.*?)\r\n\$\d \r\n(.*?)\r\n";
         match match = regex.match(response, pattern);
         if (match.success)
         {
             string ch = match.groups[2].value;
             string message = match.groups[3].value;
             handler(ch, message);
         }
     }
 }
//redis订阅操作
 public async task publishasync(string channel, string message)
 {
     await sendcommandasync($"publish {channel} {message}");
 }

上面方法中演示了几个比较常见的操作,很简单,主要是向大家展示redis命令是如何发送的,从最简单的getsetlist发布订阅执行lua操作方面着手,如果对redis命令比较熟悉的话,操作起来还是比较简单的,这里给大家讲解几个比较有代表的方法

  • 首先关于setnx方法,由于自带的setnx方法不支持添加过期时间,为了保证操作的原子性,使用了lua脚本的方式
  • 自带的lpush也就是上面listpushasync方法中封装的操作,自带的也是没办法给定过期时间的,为了保证操作的原子性,我在这里也是用lua进行封装
  • 关于执行lua脚本的时候的时候需要注意lua脚本的格式eval script numkeys [key [key ...]] [arg [arg ...]]脚本后面紧跟着的长度是key的个数这个需要注意
  • 最后,自行编写命令的时候需要注意\r\n的处理和引号的转义问题,当然研究的越深,遇到的问题越多

相信大家也看到了,这里我封装的都是几个简单的操作,难度系数不大,因为主要是向大家演示redis客户端的发送和接收操作是什么样的,甚至我都是直接返回的字符串,真实使用的时候我们使用都是需要封装序列化和反序列化操作的。

完整代码

上面分别对redisclient类中的方法进行了讲解,接下来我把我封装的类完整的给大家贴出来,由于封装的只是几个简单的方法用于演示,所以也只有一个类,代码量也不多,主要是为了方便大家理解,有想试验的同学可以直接拿走

public class redisclient : idisposable, iasyncdisposable
{
    private readonly int defaultport = 6379;
    private readonly string host = "localhost";
    private readonly int heartbeatinterval = 30000; 
    private bool _isconnected;
    private timer _heartbeattimer;
    private socket _socket;
    public redisclient(string host = "localhost", int defaultport = 6379)
    {
        host = host;
        defaultport = defaultport;
        _heartbeattimer = new timer(heartbeatcallback, null, heartbeatinterval, heartbeatinterval);
    }
    public async task connectasync(int timeoutmilliseconds = 5000)
    {
        _socket = new socket(addressfamily.internetwork, sockettype.stream, protocoltype.tcp);
        var cts = new cancellationtokensource(timeoutmilliseconds);
        await _socket.connectasync(host, defaultport, cts.token);
        _isconnected = true;
    }
    public async task selectasync(int dbindex)
    {
        var command = $"select {dbindex}";
        await sendcommandasync(command);
    }
    public async task getasync(string key)
    {
        var command = $"get {key}";
        return parseresponse(await sendcommandasync(command));
    }
    public async task setasync(string key, string value, timespan? expiry = null)
    {
        var command = $"set {key} '{value}'";
        if (expiry.hasvalue)
        {
            command  = $" ex {expiry.value.totalseconds}";
        }
        var response = parseresponse(await sendcommandasync(command));
        return response == "ok";
    }
    public async task setnxasync(string key, string value, timespan? expiry = null)
    {
        var command = $"eval \"if redis.call('setnx', keys[1], argv[1]) == 1 then if argv[2] then redis.call('expire', keys[1], argv[2]) end return true else return false end\" 1 {key} '{value}'";
        if (expiry.hasvalue)
        {
            command  = $" {expiry.value.totalseconds}";
        }
        var response = parseresponse(await sendcommandasync(command));
        return response == "1";
    }
    public async task listpushasync(string key, string value, timespan? expiry = null)
    {
        var script = @"local len = redis.call('lpush', keys[1], argv[1])
                        if tonumber(argv[2]) > 0 then
                            redis.call('expire', keys[1], argv[2])
                        end
                        return len";
        var keys = new string[] { key };
        var args = new string[] { value, (expiry?.totalseconds ?? 0).tostring() };
        var response = await executeluascriptasync(script, keys, args);
        return long.parse(response);
    }
    public async task listpopasync(string key)
    {
        var command = $"lpop {key}";
        return parseresponse(await sendcommandasync(command));
    }
    public async task listlengthasync(string key)
    {
        var command = $"llen {key}";
        return long.parse(parseresponse(await sendcommandasync(command)));
    }
    public async task> listrangeasync(string key, int start, int end)
    {
        var command = $"lrange {key} {start} {end}";
        var response = await sendcommandasync(command);
        if (response.startswith("*0\r\n"))
        {
            return new list();
        }
        var values = new list();
        var pattern = @"\$\d \r\n(.*?)\r\n";
        matchcollection matches = regex.matches(response, pattern);
        foreach (match match in matches)
        {
            values.add(match.groups[1].value);
        }
        return values;
    }
    public async task executeluascriptasync(string script, string[]? keys = null, string[]? args = null)
    {
        script = regex.replace(script, @"[\r\n]", "");
        var command = $"eval \"{script}\" { keys?.length??0 } ";
        if (keys != null && keys.length != 0)
        {
            command  = string.join(" ", keys.select(key => $"{key}"));
        }
        if (args != null && args.length != 0)
        {
            command  = " "   string.join(" ", args.select(arg => $"{arg}"));
        }
        return parseresponse(await sendcommandasync(command));
    }
    public async task subscribeasync(string channel, action handler)
    {
        await sendcommandasync($"subscribe {channel}");
        while (true)
        {
            var response = await sendcommandasync(string.empty);
            string pattern = @"\*\d \r\n\$\d \r\n(.*?)\r\n\$\d \r\n(.*?)\r\n\$\d \r\n(.*?)\r\n";
            match match = regex.match(response, pattern);
            if (match.success)
            {
                string ch = match.groups[2].value;
                string message = match.groups[3].value;
                handler(ch, message);
            }
        }
    }
    public async task publishasync(string channel, string message)
    {
        await sendcommandasync($"publish {channel} {message}");
    }
    public async task sendcommandasync(string command)
    {
        if (!_isconnected)
        {
            await connectasync();
        }
        var request = encoding.utf8.getbytes(command   "\r\n");
        await _socket.sendasync(new arraysegment(request), socketflags.none);
        var response = new stringbuilder();
        var remainingdata = string.empty;
        byte[] receivebuffer = arraypool.shared.rent(1024);
        try
        {
            while (true)
            {
                var bytesread = await _socket.receiveasync(new arraysegment(receivebuffer), socketflags.none);
                var responsedata = remainingdata   encoding.utf8.getstring(receivebuffer, 0, bytesread);
                var completeresponses = extractcompleteresponses(ref responsedata);
                foreach (var completeresponse in completeresponses)
                {
                    response.append(completeresponse);
                }
                remainingdata = responsedata;
                if (response.tostring().endswith("\r\n"))
                {
                    break;
                }
            }
        }
        finally
        {
            arraypool.shared.return(receivebuffer);
        }
        return response.tostring();
    }
    private list extractcompleteresponses(ref string data)
    {
        var completeresponses = new list();
        while (true)
        {
            var index = data.indexof("\r\n");
            if (index >= 0)
            {
                var completeresponse = data.substring(0, index   2);
                completeresponses.add(completeresponse);
                data = data.substring(index   2);
            }
            else
            {
                break;
            }
        }
        return completeresponses;
    }
    private string parseresponse(string response)
    {
        if (response.startswith("$"))
        {
            var lengthstr = response.substring(1, response.indexof('\r') - 1);
            if (int.tryparse(lengthstr, out int length))
            {
                if (length == -1)
                {
                    return null!;
                }
                string rawredisdata = response.substring(response.indexof('\n')   1);
                byte[] utf8bytes = encoding.utf8.getbytes(rawredisdata);
                string value = encoding.utf8.getstring(utf8bytes, 0, length);
                return value;
            }
        }
        else if (response.startswith(" "))
        {
            return response.substring(1, response.length - 3);
        }
        else if (response.startswith(":"))
        {
            var valuestr = response.substring(1, response.indexof('\r') - 1);
            if (int.tryparse(valuestr, out int value))
            {
                return value.tostring();
            }
        }
        throw new invalidoperationexception(response);
    }
    private async void heartbeatcallback(object state)
    {
        if (_isconnected)
        {
            var pingcommand = "ping\r\n";
            await sendcommandasync(pingcommand);
        }
    }
    public void dispose()
    {
        disposeasync().getawaiter().getresult();
    }
    public valuetask disposeasync()
    {
        _heartbeattimer.dispose();
        if (_socket != null)
        {
            _socket.shutdown(socketshutdown.both);
            _socket.close();
        }
        return valuetask.completedtask;
    }
}

简单使用redisclient

上面我们封装了redisclient类,也讲解了里面实现的几个简单的方法,接下来我们就简单的使用一下它,比较简单直接上代码

get/set

get/set是最基础和最简单的指令,没啥可说的直接上代码

using redisclient redisclient = new redisclient();
await redisclient.connectasync();
//切换db
await redisclient.selectasync(3);
bool setresult = await redisclient.setasync("key:foo", "are you ok,你好吗?", timespan.fromseconds(120));
string getresult = await redisclient.getasync("key:foo");
console.writeline("get key:foo:"   getresult);

setnx

setnx比较常用,很多时候用在做分布式锁的场景,判断资源存不存在的时候经常使用

//第一次setnx返回true
bool setnxresult = await redisclient.setnxasync("order:lock", "123_lock", timespan.fromseconds(120));
console.writeline("first setnx order:lock:"   setnxresult);
//第一次setnx返回false
setnxresult = await redisclient.setnxasync("order:lock", "123_lock", timespan.fromseconds(120));
console.writeline("second setnx aname:foo:"   setnxresult);

pub/sub

这里实现的subscribeasyncpublishasync需要使用两个redisclient实例,因为我上面封装的每个redisclient只包含一个socket实例所以receiveasync方法是阻塞的。如果同一个实例的话subscribeasync的时候,在使用publishasync方法的时候会被阻塞,所以演示的时候使用了两个redisclient实例

_ = redisclient.subscribeasync("order_msg_ch", (ch, msg) => { console.writeline($"接收消息:[{ch}]---[{msg}]"); });
thread.sleep(2000);
using redisclient redisclient2 = new redisclient();
await redisclient2.connectasync();
for (int i = 0; i < 5; i  )
{
    await redisclient2.publishasync("order_msg_ch", $"发送消息{i}");
    thread.sleep(2000);
}

executeluascriptasync

动态执行lua的功能还是比较强大的,在之前的项目中,我也使用类似的功能。我们是模拟抢单/完成的场景,比如业务人员需要自行抢单,每个人最多抢几单,超过阈值则抢单失败,你需要把抢到的完成了才能继续抢单,这种操作就需要借助lua进行操作

//抢单的lua
string takeorderluascript = @"
        local orderstaken = tonumber(redis.call('get', keys[1]) or '0')
        if orderstaken < tonumber(argv[1]) then
            redis.call('incr', keys[1])
            return 1
        else
            return 0
        end";
//完成你手里的订单操作
string completeorderluascript = @"
        local orderstaken = tonumber(redis.call('get', keys[1]) or '0')
        if orderstaken > 0 then
            redis.call('decr', keys[1])
            return 1
        else
            return 0
        end";
//模拟抢单,最多抢两单
string result = await redisclient.executeluascriptasync(takeorderluascript, new[] { "user:123" }, new[] { "2" });
result = await redisclient.executeluascriptasync(takeorderluascript, new[] { "user:123" }, new[] { "2" });
result = await redisclient.executeluascriptasync(takeorderluascript, new[] { "user:123" }, new[] { "2" });
result = await redisclient.executeluascriptasync(takeorderluascript, new[] { "user:123" }, new[] { "2" });
//完成订单
string anotherresult = await redisclient.executeluascriptasync(completeorderluascript, keys: new[] { "user:123" });
anotherresult = await redisclient.executeluascriptasync(completeorderluascript, keys: new[] { "user:123" });
anotherresult = await redisclient.executeluascriptasync(completeorderluascript, keys: new[] { "user:123" });
anotherresult = await redisclient.executeluascriptasync(completeorderluascript, keys: new[] { "user:123" });

还有一个功能也是我们之前遇到的,就是使用redis实现缓存最新的n条消息,旧的则被抛弃,实现这个功能也需要使用redis的list结构结合lua的方式

string luascript = @"
            local record_key = keys[1]
            local max_records = tonumber(argv[1])
            local new_record = argv[2]
            local current_count = redis.call('llen', record_key)
            if current_count >= max_records then
                redis.call('lpop', record_key)
            end
            redis.call('rpush', record_key, new_record)
        ";
//这里限制保存最新的50条数据,旧的数据则被抛弃
for (int i = 0; i < 60; i  )
{
    _ = await redisclient.executeluascriptasync(luascript, keys: new[] { "msg:list" }, new[] { "50", i.tostring() });
}

list

list很多时候会把它当做分布式队列来使用,它提供的操作也比较灵活,咱们这里只是封装了几个最简单的操作,大致的效果如下所示

//lis入队操作
var res = await redisclient.listpushasync("list:2", "123", timespan.fromhours(1));
res = await redisclient.listpushasync("list:2", "1234", timespan.fromhours(1));
res = await redisclient.listpushasync("list:2", "12345", timespan.fromhours(1));
//list出队操作
var str = await redisclient.listpopasync("list:2");
//list长度
var length = await redisclient.listlengthasync("list:2");
//list range操作
var list = await redisclient.listrangeasync("article:list", 0, 10);

总结

    本文我们通过理解redis命令resp协议来构建了一个简单redisclient的实现,方便我们更容易的理解redis客户端如何与redis服务器进行通信,这个实现也可以作为学习和理解·redis客户端·的一个很好的例子。当然我们的这个redisclient这是了解和学习使用,很多场景我们并没有展示,实际的项目我们还是尽量使用开源的redis sdk, .net中常用的有stackexchange.redisfreerediscsredisnewlife.redisservice.stack.redis,其中我经常使用的是stackexchange.redisfreeredis整体来说效果还是不错的。总结一下我们文章的主要内容

  • 首先我们讲解了redis命令的格式
  • 其次我们讲解了redis协议(resp)的主要格式以及如何解析
  • 然后我们基于上面的理论简单的封装了一个redisclient类来演示相关概念
  • 最后我们通过几个示例和我用过的两个lua来简单的演示redisclient类的使用

    作为新时代的职场人,我乐在探究自己感兴趣的领域,对未知的事物充满好奇,并渴望深入了解。对于常用的核心技术,我不仅要求自己能够熟练运用,更追求深入理解其实现原理。面对新的技术趋势,我决不会视而不见,而是在熟悉更多常用技术栈的同时,努力深入掌握一些重要的知识。我坚信,学无止境,每一步的进步都带来无比的喜悦与成就感。

👇欢迎扫码关注我的公众号👇

总结

以上是凯发k8官方网为你收集整理的基于c# socket实现的简单的redis客户端的全部内容,希望文章能够帮你解决所遇到的问题。

如果觉得凯发k8官方网网站内容还不错,欢迎将凯发k8官方网推荐给好友。

  • 上一篇:
  • 下一篇:
网站地图