基于c# socket实现的简单的redis客户端 -尊龙游戏旗舰厅官网
前言
redis
是一款强大的高性能键值存储数据库,也是目前nosql
中最流行比较流行的一款数据库,它在广泛的应用场景中扮演着至关重要的角色,包括但不限于缓存、消息队列、会话存储等。在本文中,我们将介绍如何基于c# socket
来实现一个简单的redis客户端类redisclient
,来演示构建请求和输出的相关通信机制。需要注意的是本文只是着重展示如何基于原生的socket
方式与redis server
进行通信,并不是构建一个强大的redis开发工具包
。
redis简介
redis(remote dictionary server)
是一个内存数据库,它支持了非常丰富的数据结构,包括字符串、列表、集合、散列、有序集合等。redis 提供了高性能的读写操作,可以用于缓存数据、消息队列、分布式锁、会话管理等多种用途。redis 通常以键值对的方式存储数据,每个键都与一个值相关联,值的类型可以是字符串、列表、散列等。redis
不仅提供了丰富的命令集,用于操作存储在数据库中的数据,还提供了redis serialization protocol (resp)
协议来解析redis server
返回的数据。相关的文档地址如下所示:
- redis尊龙游戏旗舰厅官网地址 https://redis.io/
- redis官方文档地址 https://redis.io/docs/
- redis命令文档地址 https://redis.io/commands/
- redis序列化协议规范文档地址 https://redis.io/docs/reference/protocol-spec/
redis 命令指南
redis命令
是与redis服务器进行通信的主要方式,通俗点就是发送指定格式的指令用于执行各种操作,包括数据存储、检索、修改和删除等。以下是一些日常使用过程中常见的redis命令及其用途:
-
get 和 set 命令
-
get key
: 用于获取指定键的值。 -
set key value
: 用于设置指定键的值.
-
-
del 命令
-
del key
: 用于删除指定键.
-
-
expire 和 ttl 命令
-
expire key seconds
: 用于为指定键设置过期时间(秒). -
ttl key
: 用于获取指定键的剩余过期时间(秒).
注意这里的时间单位是秒
-
-
incr 和 decr 命令
-
incr key
: 用于递增指定键的值. -
decr key
: 用于递减指定键的值.
-
-
rpush 和 lpop 命令
-
rpush key value
: 用于将值添加到列表的右侧. -
lpop key
: 用于从列表的左侧弹出一个值.
-
-
hset 和 hget 命令
-
hset key field value
: 用于设置哈希表中指定字段的值. -
hget key field
: 用于获取哈希表中指定字段的值.
-
-
publish 和 subscribe 命令
-
publish channel message
: 用于向指定频道发布消息. -
subscribe channel
: 用于订阅指定频道的消息.
-
当然 redis 支持的命令远不止这些,它还包括对集合、有序集合、位图、hyperloglog 等数据结构的操作,以及事务、lua 脚本执行等高级功能。我们接下来演示的时候也只是展示几个大家比较熟悉的指令,这也是我们学习新知识的时候经常使用的方式,先从最简单最容易的开始入手,循序渐进,这也是微精通
所提倡的方式。
redis协议(resp)
redis serialization protocol (resp)
是 redis 使用的二进制协议,用于客户端和服务器之间的通信。我们可以通过该协议解析redis服务器
返回的命令格式,解析我们想要的数据。resp具有简洁易解析的特点
-
简单字符串协议:
-
格式:
ok\r\n
- 第一个字节是" ”,后跟消息内容,以"\r\n"(回车和换行)结束。
- 示例:
ok\r\n
-
格式:
-
批量字符串协议:
-
格式:
$5\r\nhello\r\n
- 第一个字节是"$",后跟字符串的字节长度,然后是实际的字符串内容,最后以"\r\n"结束。
- 示例:
$5\r\nhello\r\n
-
格式:
-
整数协议:
-
格式:
:42\r\n
- 第一个字节是":",后跟整数的文本表示,以"\r\n"结束。
- 示例:
:42\r\n
-
格式:
-
数组协议:
-
格式:
*3\r\n:1\r\n:2\r\n:3\r\n
- 第一个字节是"*",后跟数组中元素的数量,然后是数组中每个元素的 resp 表示,以"\r\n"结束。
- 示例:
*3\r\n:1\r\n:2\r\n:3\r\n
-
格式:
-
错误协议:
-
格式:
-error message\r\n
- 第一个字节是"-",后跟错误消息内容,以"\r\n"结束。
- 示例:
-error message\r\n
-
格式:
需要注意的是字符串协议里面的长度不是具体字符的长度,而是对应的
utf8
对应的字节数组的长度,这一点对于我们解析返回的数据很重要,否则获取数据的时候会影响数据的完整性。
resp协议
是redis高效性能的关键之一,它相对比较加单,不需要解析各种头信息等,这使得redis能够在处理大规模数据和请求时表现出色。了解resp协议可以帮助您更好地理解redis客户端类 redisclient
的内部工作原理。可以理解为它属于一种应用层面的协议,通过给定的数据格式解析出想要的数据,这也对我们在实际编程过程中,解决类似的问题,提供了一个不错的思路。
实现redisclient
上面我们介绍了一些关于redis
的基础概念,重点介绍了一下关于redis
的命令和resp
,接下来我们就结合上面的理论,基于c# socket
来简单的模拟一下如何和redis server
进行数据交互。主要就是结合redis命令
和redis 协议(resp)
来简单的实现。
通信架子
首先来看一下类的结构
public class redisclient : idisposable, iasyncdisposable
{
//定义默认端口
private readonly int defaultport = 6379;
//定义默认地址
private readonly string host = "localhost";
//心跳间隔,单位为毫秒
private readonly int heartbeatinterval = 30000;
private bool _isconnected;
//心跳定时器
private timer _heartbeattimer;
private socket _socket;
public redisclient(string host = "localhost", int defaultport = 6379)
{
host = host;
defaultport = defaultport;
// 初始化心跳定时器
_heartbeattimer = new timer(heartbeatcallback, null, heartbeatinterval, heartbeatinterval);
}
//连接方法
public async task connectasync(int timeoutmilliseconds = 5000)
{
_socket = new socket(addressfamily.internetwork, sockettype.stream, protocoltype.tcp);
var cts = new cancellationtokensource(timeoutmilliseconds);
await _socket.connectasync(host, defaultport, cts.token);
_isconnected = true;
}
//心跳方法
private async void heartbeatcallback(object state)
{
if (_isconnected)
{
var pingcommand = "ping\r\n";
await sendcommandasync(pingcommand);
}
}
//释放逻辑
public void dispose()
{
disposeasync().getawaiter().getresult();
}
public valuetask disposeasync()
{
// 停止心跳定时器
_heartbeattimer.dispose();
if (_socket != null)
{
_socket.shutdown(socketshutdown.both);
_socket.close();
}
return valuetask.completedtask;
}
}
上面的类定义了实现的大致通信结构,结构中主要涉及到的是通信相关的功能实现,包含socket
的初始化信息、默认的连连接信息、心跳方法、释放逻辑等。首先,在构造函数中,指定了默认的redis端口(6379)、地址(localhost),并初始化了心跳定时器。连接方法connectasync
通过socket
建立与redis服务器
的tcp连接。心跳定时器heartbeatcallback
定期发送ping
命令,确保与服务器的连接保持活动。最后,dispose方法
用于释放资源,包括停止心跳定时器和关闭socket
连接,实现了idisposable
和iasyncdisposable
接口。这些功能为redisclient
类提供了基本的连接和资源管理能力。由于我对socket
编程也不是很熟悉,所以定义的可能不是很完善,有比较熟悉的同学,可以多多指导。
发送和解析
有了这个基础的架子之后,我们可以在里面填写具体的实现逻辑了。首先我们来定义发送redis
命令和解析resp
的逻辑
//发送命令
public async task sendcommandasync(string command)
{
// 发送命令的实现
if (!_isconnected)
{
// 如果连接已断开,可以进行重连
await connectasync();
}
//redis的命令是以\r\n为结尾的
var request = encoding.utf8.getbytes(command "\r\n");
//发送命令
await _socket.sendasync(new arraysegment(request), socketflags.none);
var response = new stringbuilder();
var remainingdata = string.empty;
//初始化响应字符串和剩余数据
byte[] receivebuffer = arraypool.shared.rent(1024);
try
{
while (true)
{
//读取返回信息
var bytesread = await _socket.receiveasync(new arraysegment(receivebuffer), socketflags.none);
//将接收到的数据添加到响应字符串
var responsedata = remainingdata encoding.utf8.getstring(receivebuffer, 0, bytesread);
//提取完整的响应并添加到响应字符串中
var completeresponses = extractcompleteresponses(ref responsedata);
foreach (var completeresponse in completeresponses)
{
response.append(completeresponse);
}
remainingdata = responsedata;
//结果为\r\n读取结束
if (response.tostring().endswith("\r\n"))
{
break;
}
}
}
finally
{
//释放缓冲区
arraypool.shared.return(receivebuffer);
}
//返回完整的响应字符串
return response.tostring();
}
private list extractcompleteresponses(ref string data)
{
var completeresponses = new list();
while (true)
{
var index = data.indexof("\r\n");
if (index >= 0)
{
// 提取一个完整的响应
var completeresponse = data.substring(0, index 2);
//将完整的响应添加到列表中
completeresponses.add(completeresponse);
data = data.substring(index 2);
}
else
{
break;
}
}
return completeresponses;
}
private string parseresponse(string response)
{
if (response.startswith("$"))
{
// 处理 bulk strings($)
var lengthstr = response.substring(1, response.indexof('\r') - 1);
if (int.tryparse(lengthstr, out int length))
{
if (length == -1)
{
return null!;
}
string rawredisdata = response.substring(response.indexof('\n') 1);
byte[] utf8bytes = encoding.utf8.getbytes(rawredisdata);
string value = encoding.utf8.getstring(utf8bytes, 0, length);
return value;
}
}
else if (response.startswith(" "))
{
// 处理 simple strings( )
return response.substring(1, response.length - 3);
}
else if (response.startswith(":"))
{
// 处理 integers(:)
var valuestr = response.substring(1, response.indexof('\r') - 1);
if (int.tryparse(valuestr, out int value))
{
return value.tostring();
}
}
// 如果响应格式不符合预期,抛出异常
throw new invalidoperationexception(response);
}
上面逻辑涉及到发送和接收redis消息的三个方法sendcommandasync
、extractcompleteresponses
、parseresponse
。虽然上面代码中有注释,但是咱们分别i简单的讲解一下这三个方法
-
sendcommandasync
该方法主要目的是向 redis 服务器发送命令并异步接收响应
- 连接检查:首先,检查连接状态 (_isconnected),如果连接已断开,则调用 connectasync 方法进行重连。
- 命令转换:将传入的命令字符串转换为 utf-8 编码的字节数组,附加回车换行符 ("\r\n")。
- 接收响应:使用异步循环接收来自服务器的响应。在每次接收之后,将接收到的数据添加到响应字符串中,并提取其中的完整响应。
- 缓冲区管理:为了有效地处理接收到的数据,使用了一个缓冲区 (receivebuffer),并在方法结束时通过 arraypool
.shared.return 进行释放。 - 提取完整响应:调用 extractcompleteresponses 方法,该方法从响应数据中提取出一个或多个完整的响应,将其从数据中移除,并返回一个列表。
-
extractcompleteresponses
该方法主要用于从接收到的数据中提取出一个或多个完整的响应。
- completeresponses 列表:用于存储提取出的完整响应的列表。
- while 循环:循环进行以下操作,直到数据中没有换行符为止。
- 提取完整响应:如果找到换行符,就提取从数据开头到换行符位置的子字符串,包括换行符本身,构成一个完整的响应。
- 添加到列表:将提取出的完整响应添加到 completeresponses 列表中。
-
parseresponse
该方法主要用于解析从 redis 服务器接收到的响应字符串。
- 如果响应以 $ 开头,表示这是一个 bulk string 类型的响应。
- 如果响应以 开头,表示这是一个 simple string 类型的响应。
- 如果响应以 : 开头,表示这是一个 integer 类型的响应。
简单操作方法
上面有了和redis通信
的基本方法,也有了解析resp
协议的基础方法,接下来咱们实现几个简单的redis操作指令
来展示一下redis客户端具体是如何工作的,简单的几个方法如下所示
//切换db操作
public async task selectasync(int dbindex)
{
var command = $"select {dbindex}";
await sendcommandasync(command);
}
//get操作
public async task getasync(string key)
{
var command = $"get {key}";
return parseresponse(await sendcommandasync(command));
}
//set操作
public async task setasync(string key, string value, timespan? expiry = null)
{
var command = $"set {key} '{value}'";
//判断会否追加过期时间
if (expiry.hasvalue)
{
command = $" ex {expiry.value.totalseconds}";
}
var response = parseresponse(await sendcommandasync(command));
return response == "ok";
}
//支持过期时间的setnx操作
public async task setnxasync(string key, string value, timespan? expiry = null)
{
//因为默认的setnx方法不支持添加过期时间,为了保证操作的原子性,使用了lua
var command = $"eval \"if redis.call('setnx', keys[1], argv[1]) == 1 then if argv[2] then redis.call('expire', keys[1], argv[2]) end return true else return false end\" 1 {key} '{value}'";
if (expiry.hasvalue)
{
command = $" {expiry.value.totalseconds}";
}
var response = parseresponse(await sendcommandasync(command));
return response == "1";
}
//添加支持函过期时间的list push操作
public async task listpushasync(string key, string value, timespan? expiry = null)
{
var script = @"local len = redis.call('lpush', keys[1], argv[1])
if tonumber(argv[2]) > 0 then
redis.call('expire', keys[1], argv[2])
end
return len";
var keys = new string[] { key };
var args = new string[] { value, (expiry?.totalseconds ?? 0).tostring() };
var response = await executeluascriptasync(script, keys, args);
return long.parse(response);
}
//list pop操作
public async task listpopasync(string key)
{
var command = $"lpop {key}";
return parseresponse(await sendcommandasync(command));
}
//listrange操作
public async task> listrangeasync(string key, int start, int end)
{
var command = $"lrange {key} {start} {end}";
var response = await sendcommandasync(command);
if (response.startswith("*0\r\n"))
{
return new list();
}
//由于list range返回了是一个数组,所以单独处理了一下,这里我使用了正则,解析字符串也可以,方法随意
var values = new list();
var pattern = @"\$\d \r\n(.*?)\r\n";
matchcollection matches = regex.matches(response, pattern);
foreach (match match in matches)
{
values.add(match.groups[1].value);
}
return values;
}
//执行lua脚本的方法
public async task executeluascriptasync(string script, string[]? keys = null, string[]? args = null)
{
//去除lua里的换行
script = regex.replace(script, @"[\r\n]", "");
// 构建eval命令,将lua脚本、keys和args发送到redis服务器
var command = $"eval \"{script}\" { keys?.length??0 } ";
//拼接key和value参数
if (keys != null && keys.length != 0)
{
command = string.join(" ", keys.select(key => $"{key}"));
}
if (args != null && args.length != 0)
{
command = " " string.join(" ", args.select(arg => $"{arg}"));
}
return parseresponse(await sendcommandasync(command));
}
//redis发布操作
public async task subscribeasync(string channel, action handler)
{
await sendcommandasync($"subscribe {channel}");
while (true)
{
var response = await sendcommandasync(string.empty);
string pattern = @"\*\d \r\n\$\d \r\n(.*?)\r\n\$\d \r\n(.*?)\r\n\$\d \r\n(.*?)\r\n";
match match = regex.match(response, pattern);
if (match.success)
{
string ch = match.groups[2].value;
string message = match.groups[3].value;
handler(ch, message);
}
}
}
//redis订阅操作
public async task publishasync(string channel, string message)
{
await sendcommandasync($"publish {channel} {message}");
}
上面方法中演示了几个比较常见的操作,很简单,主要是向大家展示redis
命令是如何发送的,从最简单的get
、set
、list
、发布订阅
、执行lua
操作方面着手,如果对redis命令
比较熟悉的话,操作起来还是比较简单的,这里给大家讲解几个比较有代表的方法
- 首先关于
setnx
方法,由于自带的setnx方法不支持添加过期时间,为了保证操作的原子性,使用了lua脚本的方式 - 自带的
lpush
也就是上面listpushasync
方法中封装的操作,自带的也是没办法给定过期时间的,为了保证操作的原子性,我在这里也是用lua进行封装 - 关于执行
lua脚本
的时候的时候需要注意lua脚本的格式eval script numkeys [key [key ...]] [arg [arg ...]]
脚本后面紧跟着的长度是key的个数
这个需要注意 - 最后,自行编写命令的时候需要注意
\r\n
的处理和引号
的转义问题,当然研究的越深,遇到的问题越多
相信大家也看到了,这里我封装的都是几个简单的操作,难度系数不大,因为主要是向大家演示redis客户端
的发送和接收操作是什么样的,甚至我都是直接返回的字符串,真实使用的时候我们使用都是需要封装序列化和反序列化操作的。
完整代码
上面分别对redisclient
类中的方法进行了讲解,接下来我把我封装的类完整的给大家贴出来,由于封装的只是几个简单的方法用于演示,所以也只有一个类,代码量也不多,主要是为了方便大家理解,有想试验的同学可以直接拿走
public class redisclient : idisposable, iasyncdisposable
{
private readonly int defaultport = 6379;
private readonly string host = "localhost";
private readonly int heartbeatinterval = 30000;
private bool _isconnected;
private timer _heartbeattimer;
private socket _socket;
public redisclient(string host = "localhost", int defaultport = 6379)
{
host = host;
defaultport = defaultport;
_heartbeattimer = new timer(heartbeatcallback, null, heartbeatinterval, heartbeatinterval);
}
public async task connectasync(int timeoutmilliseconds = 5000)
{
_socket = new socket(addressfamily.internetwork, sockettype.stream, protocoltype.tcp);
var cts = new cancellationtokensource(timeoutmilliseconds);
await _socket.connectasync(host, defaultport, cts.token);
_isconnected = true;
}
public async task selectasync(int dbindex)
{
var command = $"select {dbindex}";
await sendcommandasync(command);
}
public async task getasync(string key)
{
var command = $"get {key}";
return parseresponse(await sendcommandasync(command));
}
public async task setasync(string key, string value, timespan? expiry = null)
{
var command = $"set {key} '{value}'";
if (expiry.hasvalue)
{
command = $" ex {expiry.value.totalseconds}";
}
var response = parseresponse(await sendcommandasync(command));
return response == "ok";
}
public async task setnxasync(string key, string value, timespan? expiry = null)
{
var command = $"eval \"if redis.call('setnx', keys[1], argv[1]) == 1 then if argv[2] then redis.call('expire', keys[1], argv[2]) end return true else return false end\" 1 {key} '{value}'";
if (expiry.hasvalue)
{
command = $" {expiry.value.totalseconds}";
}
var response = parseresponse(await sendcommandasync(command));
return response == "1";
}
public async task listpushasync(string key, string value, timespan? expiry = null)
{
var script = @"local len = redis.call('lpush', keys[1], argv[1])
if tonumber(argv[2]) > 0 then
redis.call('expire', keys[1], argv[2])
end
return len";
var keys = new string[] { key };
var args = new string[] { value, (expiry?.totalseconds ?? 0).tostring() };
var response = await executeluascriptasync(script, keys, args);
return long.parse(response);
}
public async task listpopasync(string key)
{
var command = $"lpop {key}";
return parseresponse(await sendcommandasync(command));
}
public async task listlengthasync(string key)
{
var command = $"llen {key}";
return long.parse(parseresponse(await sendcommandasync(command)));
}
public async task> listrangeasync(string key, int start, int end)
{
var command = $"lrange {key} {start} {end}";
var response = await sendcommandasync(command);
if (response.startswith("*0\r\n"))
{
return new list();
}
var values = new list();
var pattern = @"\$\d \r\n(.*?)\r\n";
matchcollection matches = regex.matches(response, pattern);
foreach (match match in matches)
{
values.add(match.groups[1].value);
}
return values;
}
public async task executeluascriptasync(string script, string[]? keys = null, string[]? args = null)
{
script = regex.replace(script, @"[\r\n]", "");
var command = $"eval \"{script}\" { keys?.length??0 } ";
if (keys != null && keys.length != 0)
{
command = string.join(" ", keys.select(key => $"{key}"));
}
if (args != null && args.length != 0)
{
command = " " string.join(" ", args.select(arg => $"{arg}"));
}
return parseresponse(await sendcommandasync(command));
}
public async task subscribeasync(string channel, action handler)
{
await sendcommandasync($"subscribe {channel}");
while (true)
{
var response = await sendcommandasync(string.empty);
string pattern = @"\*\d \r\n\$\d \r\n(.*?)\r\n\$\d \r\n(.*?)\r\n\$\d \r\n(.*?)\r\n";
match match = regex.match(response, pattern);
if (match.success)
{
string ch = match.groups[2].value;
string message = match.groups[3].value;
handler(ch, message);
}
}
}
public async task publishasync(string channel, string message)
{
await sendcommandasync($"publish {channel} {message}");
}
public async task sendcommandasync(string command)
{
if (!_isconnected)
{
await connectasync();
}
var request = encoding.utf8.getbytes(command "\r\n");
await _socket.sendasync(new arraysegment(request), socketflags.none);
var response = new stringbuilder();
var remainingdata = string.empty;
byte[] receivebuffer = arraypool.shared.rent(1024);
try
{
while (true)
{
var bytesread = await _socket.receiveasync(new arraysegment(receivebuffer), socketflags.none);
var responsedata = remainingdata encoding.utf8.getstring(receivebuffer, 0, bytesread);
var completeresponses = extractcompleteresponses(ref responsedata);
foreach (var completeresponse in completeresponses)
{
response.append(completeresponse);
}
remainingdata = responsedata;
if (response.tostring().endswith("\r\n"))
{
break;
}
}
}
finally
{
arraypool.shared.return(receivebuffer);
}
return response.tostring();
}
private list extractcompleteresponses(ref string data)
{
var completeresponses = new list();
while (true)
{
var index = data.indexof("\r\n");
if (index >= 0)
{
var completeresponse = data.substring(0, index 2);
completeresponses.add(completeresponse);
data = data.substring(index 2);
}
else
{
break;
}
}
return completeresponses;
}
private string parseresponse(string response)
{
if (response.startswith("$"))
{
var lengthstr = response.substring(1, response.indexof('\r') - 1);
if (int.tryparse(lengthstr, out int length))
{
if (length == -1)
{
return null!;
}
string rawredisdata = response.substring(response.indexof('\n') 1);
byte[] utf8bytes = encoding.utf8.getbytes(rawredisdata);
string value = encoding.utf8.getstring(utf8bytes, 0, length);
return value;
}
}
else if (response.startswith(" "))
{
return response.substring(1, response.length - 3);
}
else if (response.startswith(":"))
{
var valuestr = response.substring(1, response.indexof('\r') - 1);
if (int.tryparse(valuestr, out int value))
{
return value.tostring();
}
}
throw new invalidoperationexception(response);
}
private async void heartbeatcallback(object state)
{
if (_isconnected)
{
var pingcommand = "ping\r\n";
await sendcommandasync(pingcommand);
}
}
public void dispose()
{
disposeasync().getawaiter().getresult();
}
public valuetask disposeasync()
{
_heartbeattimer.dispose();
if (_socket != null)
{
_socket.shutdown(socketshutdown.both);
_socket.close();
}
return valuetask.completedtask;
}
}
简单使用redisclient
上面我们封装了redisclient
类,也讲解了里面实现的几个简单的方法,接下来我们就简单的使用一下它,比较简单直接上代码
get/set
get/set
是最基础和最简单的指令,没啥可说的直接上代码
using redisclient redisclient = new redisclient();
await redisclient.connectasync();
//切换db
await redisclient.selectasync(3);
bool setresult = await redisclient.setasync("key:foo", "are you ok,你好吗?", timespan.fromseconds(120));
string getresult = await redisclient.getasync("key:foo");
console.writeline("get key:foo:" getresult);
setnx
setnx
比较常用,很多时候用在做分布式锁的场景,判断资源存不存在的时候经常使用
//第一次setnx返回true
bool setnxresult = await redisclient.setnxasync("order:lock", "123_lock", timespan.fromseconds(120));
console.writeline("first setnx order:lock:" setnxresult);
//第一次setnx返回false
setnxresult = await redisclient.setnxasync("order:lock", "123_lock", timespan.fromseconds(120));
console.writeline("second setnx aname:foo:" setnxresult);
pub/sub
这里实现的subscribeasync
和publishasync
需要使用两个redisclient
实例,因为我上面封装的每个redisclient
只包含一个socket
实例所以receiveasync
方法是阻塞的。如果同一个实例的话subscribeasync
的时候,在使用publishasync
方法的时候会被阻塞,所以演示的时候使用了两个redisclient
实例
_ = redisclient.subscribeasync("order_msg_ch", (ch, msg) => { console.writeline($"接收消息:[{ch}]---[{msg}]"); });
thread.sleep(2000);
using redisclient redisclient2 = new redisclient();
await redisclient2.connectasync();
for (int i = 0; i < 5; i )
{
await redisclient2.publishasync("order_msg_ch", $"发送消息{i}");
thread.sleep(2000);
}
executeluascriptasync
动态执行lua的功能还是比较强大的,在之前的项目中,我也使用类似的功能。我们是模拟抢单/完成
的场景,比如业务人员需要自行抢单,每个人最多抢几单,超过阈值则抢单失败,你需要把抢到的完成了才能继续抢单,这种操作就需要借助lua进行操作
//抢单的lua
string takeorderluascript = @"
local orderstaken = tonumber(redis.call('get', keys[1]) or '0')
if orderstaken < tonumber(argv[1]) then
redis.call('incr', keys[1])
return 1
else
return 0
end";
//完成你手里的订单操作
string completeorderluascript = @"
local orderstaken = tonumber(redis.call('get', keys[1]) or '0')
if orderstaken > 0 then
redis.call('decr', keys[1])
return 1
else
return 0
end";
//模拟抢单,最多抢两单
string result = await redisclient.executeluascriptasync(takeorderluascript, new[] { "user:123" }, new[] { "2" });
result = await redisclient.executeluascriptasync(takeorderluascript, new[] { "user:123" }, new[] { "2" });
result = await redisclient.executeluascriptasync(takeorderluascript, new[] { "user:123" }, new[] { "2" });
result = await redisclient.executeluascriptasync(takeorderluascript, new[] { "user:123" }, new[] { "2" });
//完成订单
string anotherresult = await redisclient.executeluascriptasync(completeorderluascript, keys: new[] { "user:123" });
anotherresult = await redisclient.executeluascriptasync(completeorderluascript, keys: new[] { "user:123" });
anotherresult = await redisclient.executeluascriptasync(completeorderluascript, keys: new[] { "user:123" });
anotherresult = await redisclient.executeluascriptasync(completeorderluascript, keys: new[] { "user:123" });
还有一个功能也是我们之前遇到的,就是使用redis
实现缓存最新的n条消息,旧的则被抛弃,实现这个功能也需要使用redis的list
结构结合lua的方式
string luascript = @"
local record_key = keys[1]
local max_records = tonumber(argv[1])
local new_record = argv[2]
local current_count = redis.call('llen', record_key)
if current_count >= max_records then
redis.call('lpop', record_key)
end
redis.call('rpush', record_key, new_record)
";
//这里限制保存最新的50条数据,旧的数据则被抛弃
for (int i = 0; i < 60; i )
{
_ = await redisclient.executeluascriptasync(luascript, keys: new[] { "msg:list" }, new[] { "50", i.tostring() });
}
list
list
很多时候会把它当做分布式队列来使用,它提供的操作也比较灵活,咱们这里只是封装了几个最简单的操作,大致的效果如下所示
//lis入队操作
var res = await redisclient.listpushasync("list:2", "123", timespan.fromhours(1));
res = await redisclient.listpushasync("list:2", "1234", timespan.fromhours(1));
res = await redisclient.listpushasync("list:2", "12345", timespan.fromhours(1));
//list出队操作
var str = await redisclient.listpopasync("list:2");
//list长度
var length = await redisclient.listlengthasync("list:2");
//list range操作
var list = await redisclient.listrangeasync("article:list", 0, 10);
总结
本文我们通过理解redis命令
和resp协议
来构建了一个简单redisclient
的实现,方便我们更容易的理解redis客户端
如何与redis服务器
进行通信,这个实现也可以作为学习和理解·redis客户端·的一个很好的例子。当然我们的这个redisclient
这是了解和学习使用,很多场景我们并没有展示,实际的项目我们还是尽量使用开源的redis sdk
, .net
中常用的有stackexchange.redis
、freeredis
、csredis
、newlife.redis
、service.stack.redis
,其中我经常使用的是stackexchange.redis
和freeredis
整体来说效果还是不错的。总结一下我们文章的主要内容
- 首先我们讲解了
redis命令
的格式 - 其次我们讲解了
redis协议(resp)
的主要格式以及如何解析 - 然后我们基于上面的理论简单的封装了一个
redisclient
类来演示相关概念 - 最后我们通过几个示例和我用过的两个
lua
来简单的演示redisclient
类的使用
作为新时代的职场人,我乐在探究自己感兴趣的领域,对未知的事物充满好奇,并渴望深入了解。对于常用的核心技术,我不仅要求自己能够熟练运用,更追求深入理解其实现原理。面对新的技术趋势,我决不会视而不见,而是在熟悉更多常用技术栈的同时,努力深入掌握一些重要的知识。我坚信,学无止境,每一步的进步都带来无比的喜悦与成就感。
总结
以上是尊龙游戏旗舰厅官网为你收集整理的基于c# socket实现的简单的redis客户端的全部内容,希望文章能够帮你解决所遇到的问题。
- 上一篇:
- 下一篇: