0%

FastDFS简介

fastDFS 是以C语言开发的一项开源轻量级分布式文件系统,他对文件进行管理,主要功能有:文件存储,文件同步,文件访问(文件上传/下载),特别适合以文件为载体的在线服务,如图片网站,视频网站等

分布式文件系统:

基于客户端/服务器的文件存储系统对等特性允许一些系统扮演客户端和服务器的双重角色,可供多个用户访问的服务器,比如,用户可以“发表”一个允许其他客户机访问的目录,一旦被访问,这个目录对客户机来说就像使用本地驱动器一样

FastDFS由跟踪服务器(Tracker Server)、存储服务器(Storage Server)和客户端(Client)构成。

Tracker server 追踪服务器

追踪服务器负责接收客户端的请求,选择合适的组合storage server ,tracker server 与 storage server之间也会用心跳机制来检测对方是否活着。
Tracker需要管理的信息也都放在内存中,并且里面所有的Tracker都是对等的(每个节点地位相等),很容易扩展
客户端访问集群的时候会随机分配一个Tracker来和客户端交互。

Storage server 储存服务器

实际存储数据,分成若干个组(group),实际traker就是管理的storage中的组,而组内机器中则存储数据,group可以隔离不同应用的数据,不同的应用的数据放在不同group里面,

优点:
海量的存储:主从型分布式存储,存储空间方便拓展,
fastDFS对文件内容做hash处理,避免出现重复文件
然后fastDFS结合Nginx集成, 提供网站效率

linux-centos7-install-fastdfs-2020415143340

读写操作

写入数据

写操作的时候,storage会将他所挂载的所有数据存储目录的底下都创建2级子目录,每一级256个总共65536个,新写的文件会以hash的方式被路由到其中某个子目录下,然后将文件数据作为本地文件存储到该目录中。

linux-centos7-install-fastdfs-202041514373

下载文件

当客户端向Tracker发起下载请求时,并不会直接下载,而是先查询storage server(检测同步状态),返回storage server的ip和端口,
然后客户端会带着文件信息(组名,路径,文件名),去访问相关的storage,然后下载文件。

linux-centos7-install-fastdfs-2020415143642

源代码方式安装

首先下载源代码:

libfastcommon

fastdfs

fastdfs-nginx-module

nginx

1.把需要的安装包上传到服务器

mkdir 目录名

2.安装所需依赖

yum -y install zlib zlib-devel pcre pcre-devel gcc gcc-c++ openssl openssl-devel libevent libevent-devel perl unzip net-tools wget && yum install lrzsz -y

3.进入程序所在的目录

a.安装libfastcommon

1
2
3
4
5
6
7
8
9
unzip libfastcommon.zip
cd libfastcommon
./make.sh
./make.sh install
#如果没有报错,执行建立软件链接的命令:
ln -s /usr/lib64/libfastcommon.so /usr/local/lib/libfastcommon.so
ln -s /usr/lib64/libfastcommon.so /usr/lib/libfastcommon.so
ln -s /usr/lib64/libfdfsclient.so /usr/local/lib/libfdfsclient.so
ln -s /usr/lib64/libfdfsclient.so /usr/lib/libfdfsclient.so

b.安装FastDfs:

1
2
3
4
unzip fastdfs-5.11.zip
cd fastdfs-5.11
./make.sh
./make.sh install

查看目录:

ll /etc/fdfs

有3个模板文件,各复制一份重命名为对应的conf文件

1
2
3
cp client.conf.sample client.conf
cp storage.conf.sample storage.conf
cp tracker.conf.sample tracker.conf

c.安装tracker:

创建tarcker工作目录,根据服务器硬盘分布创建工作目录:

mkdir -p /lvdata/fastdfs/fastdfs_tracker

配置tracker

vim /etc/fdfs/tracker.conf

disabled=false #默认开启 
port=22122 #默认端口号 
base_path=/lvdata/fastdfs/fastdfs_tracker #刚刚创建的目录 
http.server_port=6666 #默认端口是8080

:wq!

保存修改后,启动tracker:
service fdfs_trackerd start
设置开机启动:
systemctl enable fdfs_trackerd

ll /lvdata/fastdfs/fastdfs_tracker,多了data,logs目录

查看tracker运行情况:
netstat -unltp|grep fdfs
可以看到在监听22122端口

d.安装storage

创建数据目录:

mkdir -p /lvdata/fastdfs/fastdfs_storage
mkdir -p /lvdata/fastdfs/fastdfs_storage_data

修改配置文件:

vim /etc/fdfs/storage.conf

1.disabled=false 
2.group_name=group1 #组名,根据实际情况修改 
3.port=23000  #设置storage的端口号,默认是23000,同一个组的storage端口号必须一致 
4.base_path=/lvdata/fastdfs/fastdfs_storage  #设置storage数据文件和日志目录 
5.store_path_count=1 #存储路径个数,需要和store_path个数匹配 
6.base_path0=/lvdata/fastdfs/fastdfs_storage_data #实际文件存储路径 注意检查
7.tracker_server=192.168.150.132:22122 #我CentOS7的ip地址 
8.http.server_port=8888 #设置 http 端口号
:wq!

修改后,保存退出。创建软链接:

ln -s /usr/bin/fdfs_storaged /usr/local/bin

启动storage:

service fdfs_storaged start

设置开机启动:

systemctl enable fdfs_storaged

查看服务运行情况:

netstat -unltp |grep fdfs

可以看到在监听23000端口。

确认下storage是否注册到了tracker:

/usr/bin/fdfs_monitor /etc/fdfs/storage.conf

成功后可以看到:

ip_addr = 192.168.150.132 (localhost.localdomain) ACTIVE 的字样

修改客户端配置文件:

vim /etc/fdfs/client.conf

base_path=/lvdata/fastdfs/fastdfs_tracker #tracker服务器文件路径
tracker_server=192.168.150.132:22122 #tracker服务器IP地址和端口号
http.tracker_server_port=6666 # tracker 服务器的 http端口号,必须和tracker的设置对应起来

:wq!

修改后,保存。

测试上传一张照片:

/usr/bin/fdfs_upload_file  /etc/fdfs/client.conf  /root/test.png

成功后,会返回图片的路径如:

group1/M00/00/00/wKiWhFrdeCeAC_vCAABqgowGIFg399.png

查看是否上传成功:

cd /usr/testdfsdev/fastdfs/fastdfs_storage_data/data
cd 00/00;ls
wKiWhFrdeCeAC_vCAABqgowGIFg399.png

表明上传成功。

data下有256个1级目录,每级目录下又有256个2级子目录,总共65536个文件,新写的文件会以hash的方式被路由到其中某个子目录下,然后将文件数据直接作为一个本地文件存储到该目录中。

在4.05版本的时候已经移除http支持,需要http支持,安装nginx及相关模块:
安装依赖包:

yum -y install pcre pcre-devel && yum -y install zlib zlib-devel && yum -y install openssl openssl-devel

安装nginx并添加fastdfs-nginx-module:

解压nginx和fastdfs-nginx-module:

tar xzvf nginx-1.12.0.tar.gz unzip fastdfs-nginx-module.zip vim /root/fastdfs-nginx-module/src/config,修改后的内容如下:

ngx_module_incs="/usr/include/fastdfs /usr/include/fastcommon/"
CORE_INCS="$CORE_INCS /usr/include/fastdfs /usr/include/fastcommon/"

:wq!


cd nginx-1.12.0
./configure --prefix=/usr/local/nginx --add-module=/data/fdfs.install/fastdfs-nginx-module-1.22/src/

ng
回车,如果没有错误信息,直接安装:

make && make install

修改nginx配置:

vim /usr/local/nginx/conf/nginx.conf
server {
        listen       9999;
        server_name  localhost;

        location / {
            root   html;
            index  index.html index.htm;
        }

        location /group1/M00 {
            root /lvdata/fastdfs/fastdfs_storage_data/data;
            ngx_fastdfs_module;
        }
:wq!

保存修改。

进入fastdfs安装时解压的目录下的conf,将http.conf,mime.types复制到/etc/fdfs目录下:

cp http.conf /etc/fdfs/
cp mime.types /etc/fdfs/

将fastdfs-nginx-module安装目录中src目录下的mod_fastdfs.conf也拷贝到/etc/fdfs目录下:

cp /lvdata/nginx/fastdfs-nginx-module/src/mod_fastdfs.conf /etc/fdfs/

vim /etc/fdfs/mod_fastdfs.conf
base_path=/lvdata/fastdfs/fastdfs_storage #保存日志目录
tracker_server=192.168.150.132:22122 #tracker服务器的IP地址以及端口号
storage_server_port=23000 #storage服务器的端口号
url_have_group_name = true #文件 url 中是否有 group 名
store_path0=/lvdata/fastdfs/fastdfs_storage_data #存储路径
group_count = 3 #设置组的个数,事实上这次只使用了group1

在文件的最后设置group:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
[group1]
group_name=group1
storage_server_port=23000
store_path_count=1
store_path0=/lvdata/fastdfs/fastdfs_storage_data
store_path1=/lvdata/fastdfs/fastdfs_storage_data

# group settings for group #2
# since v1.14
# when support multi-group, uncomment following section as neccessary
[group2]
group_name=group2
storage_server_port=23000
store_path_count=1
store_path0=/lvdata/fastdfs/fastdfs_storage_data

[group3]
group_name=group3
storage_server_port=23000
store_path_count=1
store_path0=/lvdata/fastdfs/fastdfs_storage_data

创建M00至storage存储目录的符号链接:

ln  -s  /lvdata/fastdfs/fastdfs_storage_data/data/ /lvdata/fastdfs/fastdfs_storage_data/data/M00

启动nginx:

/usr/local/nginx/sbin/nginx

成功启动:

ngx_http_fastdfs_set pid=1231

使用刚才返回的图片路径,使用浏览器测试:

X.X.X.X:9999/group1/M00/00/00/wKiWhFrdeCeAC_vCAABqgowGIFg399.png

如果可以看到刚才的图片,说明nginx配置正常。

docker 方式安装

待补充

什么是RabbitMQ

 RabbitMQ是一个开源的,基于AMQP(Advanced Message Queuing Protocol)协议的完整,可复用的企业级消息队列(Message Queue 一种应用程序与应用程序之间的一种通信方法)系统,RabbitMQ可以实现点对点,发布订阅等消息处理模式

使用 RabbitMQ

RabbitMQ从信息接收者角度可以看做三种模式:

  1. 一对一(简单队列模式)
  2. 一对多(是Worker模式 此一对多并不是发布订阅,而是每条信息只有一个接收者)
  3. 发布订阅(包括发布订阅模式,路由模式和通配符模式,可以总结为都是使用只是交换机(Exchange)类型不一致)

代码示例

简单模式

  1. 首先,我们需要创建两个控制台项目.Send(发送者)和Receive(接收者),然后为两个项目安装RabbitMQ.Client驱动

install-package rabbitmq.client

  1. 然后在Send和Receive项目中编写我们的消息队列代码
  • 发送者代码
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    using RabbitMQ.Client;
    using System;
    using System.Text;

    namespace HelloWorldRabbitMQ.Send
    {
    class Program
    {
    static void Main(string[] args)
    {
    Console.WriteLine("Start");
    IConnectionFactory conFactory = new ConnectionFactory//创建连接工厂对象
    {
    HostName = "192.168.1.37",//IP地址
    Port = 5672,//端口号
    UserName = "sa",//用户账号
    Password = "Sa123456"//用户密码
    };
    using (IConnection con = conFactory.CreateConnection())// 创建连接对象
    {
    using (IModel channel = con.CreateModel())// 创建连接会话对象
    {
    String queryName = "queue1";
    channel.QueueDeclare(
    queue: queryName,
    durable: false,
    exclusive: false,
    autoDelete: false,
    arguments: null
    );
    while (true)
    {
    Console.WriteLine("消息内容");
    string message = Console.ReadLine();
    // 消息内容
    byte[] body = Encoding.UTF8.GetBytes(message);
    // 发送消息
    channel.BasicPublish(exchange: "", routingKey: queryName, basicProperties: null, body: body);
    Console.WriteLine("成功发送消息:" + message);
    }
    }
    }
    }
    }
    }

可以看到RabbitMQ使用了IConnectionFactory,IConnection和IModel来创建链接和通信管道,IConnection实例对象只负责与Rabbit的连接,而发送接收这些实际操作全部由会话通道进行,而后使用QueneDeclare方法进行创建消息队列,创建完成后可以在RabbitMQ的管理工具中看到此队列,QueneDelare方法需要一个消息队列名称的必须参数.后面那些参数则代表缓存,参数等信息.最后使用BasicPublish来发送消息,在一对一中routingKey必须和 queueName一致

  • 接收者代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

namespace HelloWorldRabbitMQ.Receive
{
class Program
{
static void Main(string[] args)
{
Console.WriteLine("Start");
IConnectionFactory connFactory = new ConnectionFactory//创建连接工厂对象
{
HostName = "192.168.1.37",//IP地址
Port = 5672,//端口号
UserName = "sa",//用户账号
Password = "Sa123456"//用户密码
};
using (IConnection conn = connFactory.CreateConnection())
{
using (IModel channel = conn.CreateModel())
{
String queueName = String.Empty;
if (args.Length > 0)
queueName = args[0];
else
queueName = "queue1";
//声明一个队列
channel.QueueDeclare(
queue: queueName,//消息队列名称
durable: false,//是否缓存
exclusive: false,
autoDelete: false,
arguments: null
);
//创建消费者对象
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
byte[] message = ea.Body.ToArray();//接收到的消息
Console.WriteLine("接收到信息为:" + Encoding.UTF8.GetString(message));
};
//消费者开启监听
channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
Console.ReadKey();
}
}
}

}
}

在接收者中是定义一个EventingBasicConsumer对象的消费者(接收者),这个消费者与会话对象关联,

  然后定义接收事件,输出从消息队列中接收的数据,

  最后使用会话对象的BasicConsume方法来启动消费者监听.消费者的定义也是如此简单.

  不过注意一点,可以看到在接收者代码中也有声明队列的方法,其实这句代码可以去掉,但是如果去掉的话接收者在程序启动时监听队列,而此时这个队列还未存在,所以会出异常,所以往往会在消费者中也添加一个声明队列方法

  此时,简单消息队列传输就算写好了,我们可以运行代码就行测试

csharp-dotnet-core-rabbitmq-2020416191154

Worker模式

Worker模式其实是一对多的模式,但是这个一对多并不是像发布订阅那种,而是信息以顺序的传输给每个接收者,我们可以使用上个例子来运行worker模式甚至,只需要运行多个接收者即可
csharp-dotnet-core-rabbitmq-202041619152

可以看到运行两个接收者,然后发送者发送了1-5这五个消息,第一个接收者接收的是奇数,而第二个接收者接收的是偶数,但是现在的worker存在这很大的问题,

  1. 丢失数据:一旦其中一个宕机,那么另外接收者的无法接收原本这个接收者所要接收的数据

  2. 无法实现能者多劳:如果其中的接收者接收的较慢,那么便会极大的浪费性能,所以需要实现接收快的多接收

下面针对上面的两个问题进行处理

首先我们先来看一下所说的宕机丢失数据一说,我们在上个例子Receive接收事件中添加线程等待

1
2
3
4
5
6
consumer.Received += (model, ea) =>
{
Thread.Sleep(1000);//等待1秒,
byte[] message = ea.Body;//接收到的消息
Console.WriteLine("接收到信息为:" + Encoding.UTF8.GetString(message));
};

然后再次启动三个接收者进行测试

csharp-dotnet-core-rabbitmq-202041795658

可以看到发送者发送了1-9的数字,第二个接收者在接收数据途中宕机,第一个接收者也并没有去接收第二个接收者宕机后的数据,有的时候我们会有当接收者宕机后,其余数据交给其它接收者进行消费,那么该怎么进行处理呢,解决这个问题得方法就是改变其消息确认模式

在Rabbit中存在两种消息确认模式

  • 自动确认:只要消息从队列获取,无论消费者获取到消息后是否成功消费,都认为是消息成功消费,也就是说上面第二个接收者其实已经消费了它所接收的数据

  • 手动确认:消费从队列中获取消息后,服务器会将该消息处于不可用状态,等待消费者反馈

  也就是说我们只要将消息确认模式改为手动即可,改为手动确认方式只需改两处,1.开启监听时将autoAck参数改为false,2.消息消费成功后返回确认

1
2
3
4
5
6
7
8
9
10
11
consumer.Received += (model, ea) =>
{
Thread.Sleep(1000);//等待1秒,
byte[] message = ea.Body;//接收到的消息
Console.WriteLine("接收到信息为:" + Encoding.UTF8.GetString(message));
//返回消息确认
channel.BasicAck(ea.DeliveryTag, true);
};
//消费者开启监听
//将autoAck设置false 关闭自动确认
channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);

然后再次测试便会出现完全接收的结果

能者多劳是建立在手动确认基础上,下面修改一下代码中等待的时间

1
2
3
4
5
6
7
8
consumer.Received += (model, ea) =>
{
Thread.Sleep((new Random().Next(1,6))*1000);//随机等待,实现能者多劳,
byte[] message = ea.Body;//接收到的消息
Console.WriteLine("接收到信息为:" + Encoding.UTF8.GetString(message));
//返回消息确认
channel.BasicAck(ea.DeliveryTag, true);
};

然后只需要再添加BasicQos方法即可

1
2
3
4
5
6
7
8
9
10
//声明一个队列
channel.QueueDeclare(
queue: queueName,//消息队列名称
durable: false,//是否缓存
exclusive: false,
autoDelete: false,
arguments: null
);
//告诉Rabbit每次只能向消费者发送一条信息,再消费者未确认之前,不再向他发送信息
channel.BasicQos(0, 1, false);

运行看效果 可以看到此时已实现能者多劳

worker模式接收者完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
using System.Threading;

namespace HelloWorldRabbitMQ.Receive
{
class Program
{
static void Main(string[] args)
{
Console.WriteLine("Start");
IConnectionFactory connFactory = new ConnectionFactory//创建连接工厂对象
{
HostName = "192.168.1.37",//IP地址
Port = 5672,//端口号
UserName = "sa",//用户账号
Password = "Sa123456"//用户密码
};
using (IConnection conn = connFactory.CreateConnection())
{
using (IModel channel = conn.CreateModel())
{
String queueName = String.Empty;
if (args.Length > 0)
queueName = args[0];
else
queueName = "queue1";
//声明一个队列
channel.QueueDeclare(
queue: queueName,//消息队列名称
durable: false,//是否缓存
exclusive: false,
autoDelete: false,
arguments: null
);
//告诉Rabbit每次只能向消费者发送一条信息,再消费者未确认之前,不再向他发送信息
channel.BasicQos(0, 1, false);
//创建消费者对象
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
Thread.Sleep((new Random().Next(1, 6)) * 1000);//随机等待,实现能者多劳,
byte[] message = ea.Body.ToArray();//接收到的消息
Console.WriteLine("接收到信息为:" + Encoding.UTF8.GetString(message));
//返回消息确认
channel.BasicAck(ea.DeliveryTag, true);
};
//消费者开启监听
//将autoAck设置false 关闭自动确认
channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
Console.ReadKey();
}
}
}

}
}

Exchange模式(发布订阅模式,路由模式,通配符模式)

前面说过发布,路由,通配符这三种模式其实可以算为一种模式,区别仅仅是交互机类型不同.在这里出现了一个交换机的东西,发送者将消息发送发送到交换机,接收者创建各自的消息队列绑定到交换机

csharp-dotnet-core-rabbitmq-202041795659

通过上面三幅图可以看出这三种模式本质就是一种订阅模式,路由,通配符模式只是订阅模式的变种模式。使其可以选择发送订阅者中的接收者。

注意:交换机本身并不存储数据,数据存储在消息队列中,所以如果向没有绑定消息队列的交换机中发送信息,那么信息将会丢失

下面依次来看一下这三种模式

发布订阅模式(fanout)

  发送者代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
using RabbitMQ.Client;
using System;
using System.Text;

namespace HelloWorldRabbitMQ.fanoutSend
{
class Program
{
static void Main(string[] args)
{
Console.WriteLine("Start");
IConnectionFactory connFactory = new ConnectionFactory//创建连接工厂对象
{
HostName = "192.168.1.37",//IP地址
Port = 5672,//端口号
UserName = "sa",//用户账号
Password = "Sa123456"//用户密码
};
using (IConnection conn = connFactory.CreateConnection())
{
using (IModel channel = conn.CreateModel())
{
//交换机名称
String exchangeName = String.Empty;
if (args.Length > 0)
exchangeName = args[0];
else
exchangeName = "exchange1";
//声明交换机
channel.ExchangeDeclare(exchange: exchangeName, type: "fanout");
while (true)
{
Console.WriteLine("消息内容:");
String message = Console.ReadLine();
//消息内容
byte[] body = Encoding.UTF8.GetBytes(message);
//发送消息
channel.BasicPublish(exchange: exchangeName, routingKey: "", basicProperties: null, body: body);
Console.WriteLine("成功发送消息:" + message);
}
}
}
}
}

}

发送者代码与上面没有什么差异,只是由上面的消息队列声明变成了交换机声明(交换机类型为fanout),也就说发送者发送消息从原来的直接发送消息队列变成了发送到交换机

接收者代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

namespace HelloWorldRabbitMQ.fanoutReceive
{
class Program
{
static void Main(string[] args)
{
//创建一个随机数,以创建不同的消息队列
int random = new Random().Next(1, 1000);
Console.WriteLine("Start" + random.ToString());
IConnectionFactory connFactory = new ConnectionFactory//创建连接工厂对象
{
HostName = "192.168.1.37",//IP地址
Port = 5672,//端口号
UserName = "sa",//用户账号
Password = "Sa123456"//用户密码
};
using (IConnection conn = connFactory.CreateConnection())
{
using (IModel channel = conn.CreateModel())
{
//交换机名称
String exchangeName = String.Empty;
if (args.Length > 0)
exchangeName = args[0];
else
exchangeName = "exchange1";
//声明交换机
channel.ExchangeDeclare(exchange: exchangeName, type: "fanout");
//消息队列名称
String queueName = exchangeName + "_" + random.ToString();
//声明队列
channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
//将队列与交换机进行绑定
channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: "");
//声明为手动确认
channel.BasicQos(0, 1, false);
//定义消费者
var consumer = new EventingBasicConsumer(channel);
//接收事件
consumer.Received += (model, ea) =>
{
byte[] message = ea.Body.ToArray();//接收到的消息
Console.WriteLine("接收到信息为:" + Encoding.UTF8.GetString(message));
//返回消息确认
channel.BasicAck(ea.DeliveryTag, true);
};
//开启监听
channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
Console.ReadKey();
}
}
}
}
}

可以看到接收者代码与上面有些差异

  • 首先是声明交换机(同上面一样,为了防止异常)

  • 然后声明消息队列并对交换机进行绑定,在这里使用了随机数,目的是声明不重复的消息队列,如果是同一个消息队列,则就变成worker模式,也就是说对于发布订阅模式有多少接收者就有多少个消息队列,而这些消息队列共同从一个交换机中获取数据

  • 然后同时开两个接收者,结果就如下
    csharp-dotnet-core-rabbitmq-2020417102839

路由模式(direct)

上面说过路由模式是订阅模式的一个变种模式,以路由进行匹配发送,例如将消息1发送给A,B两个消息队列,或者将消息2发送给B,C两个消息队列,路由模式的交换机是direct

发送者代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
using RabbitMQ.Client;
using System;
using System.Text;

namespace HelloWorldRabbitMQ.directSend
{
class Program
{
static void Main(string[] args)
{
Console.WriteLine("Start");
IConnectionFactory connFactory = new ConnectionFactory//创建连接工厂对象
{
HostName = "192.168.1.37",//IP地址
Port = 5672,//端口号
UserName = "sa",//用户账号
Password = "Sa123456"//用户密码
};
using (IConnection conn = connFactory.CreateConnection())
{
using (IModel channel = conn.CreateModel())
{
//交换机名称
String exchangeName = "exchange2";
//路由名称
String routeKey = args[0];
//声明交换机 路由交换机类型direct
channel.ExchangeDeclare(exchange: exchangeName, type: "direct");
while (true)
{
Console.WriteLine("消息内容:");
String message = Console.ReadLine();
//消息内容
byte[] body = Encoding.UTF8.GetBytes(message);
//发送消息 发送到路由匹配的消息队列中
channel.BasicPublish(exchange: exchangeName, routingKey: routeKey, basicProperties: null, body: body);
Console.WriteLine("成功发送消息:" + message);
}
}
}
}
}
}

发送者代码相比上面只改了两处

  1. 将交换机类型改为了direct类型

  2. 将运行时的第一个参数改成了路由名称,然后发送数据时由指定路由的消息队列进行获取数据

 接收者代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

namespace HelloWorldRabbitMQ.directReceive
{
class Program
{
static void Main(string[] args)
{
if (args.Length == 0) throw new ArgumentException("args");
//创建一个随机数,以创建不同的消息队列
int random = new Random().Next(1, 1000);
Console.WriteLine("Start" + random.ToString());
IConnectionFactory connFactory = new ConnectionFactory//创建连接工厂对象
{
HostName = "47.104.206.56",//IP地址
Port = 5672,//端口号
UserName = "yan",//用户账号
Password = "yan"//用户密码
};
using (IConnection conn = connFactory.CreateConnection())
{
using (IModel channel = conn.CreateModel())
{
//交换机名称
String exchangeName = "exchange2";
//声明交换机
channel.ExchangeDeclare(exchange: exchangeName, type: "direct");
//消息队列名称
String queueName = exchangeName + "_" + random.ToString();
//声明队列
channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
//将队列与交换机进行绑定
foreach (var routeKey in args)
{//匹配多个路由
channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routeKey);
}
//声明为手动确认
channel.BasicQos(0, 1, false);
//定义消费者
var consumer = new EventingBasicConsumer(channel);
//接收事件
consumer.Received += (model, ea) =>
{
byte[] message = ea.Body.ToArray();//接收到的消息
Console.WriteLine("接收到信息为:" + Encoding.UTF8.GetString(message));
//返回消息确认
channel.BasicAck(ea.DeliveryTag, true);
};
//开启监听
channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
Console.ReadKey();
}
}
}
}
}

在接收者代码中的改动点也是与发送者一致,但是一个接收者消息队列可以声明多个路由与交换机进行绑定
运行情况如下



通配符模式(topic)

通配符模式与路由模式一致,只不过通配符模式中的路由可以声明为模糊查询,RabbitMQ拥有两个通配符

  • #:匹配0-n个字符语句

  • *:匹配一个字符语句

注意:RabbitMQ中通配符并不像正则中的单个字符,而是一个以“.”分割的字符串,如 ”topic1.*“匹配的规则以topic1开始并且”.”后只有一段语句的路由 例:“topic1.aaa”,“topic1.bb”

发送者代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
using RabbitMQ.Client;
using System;
using System.Text;

namespace HelloWorldRabbitMQ.topicSend
{
class Program
{
static void Main(string[] args)
{
Console.WriteLine("Start");
IConnectionFactory connFactory = new ConnectionFactory//创建连接工厂对象
{
HostName = "192.168.1.37",//IP地址
Port = 5672,//端口号
UserName = "sa",//用户账号
Password = "Sa123456"//用户密码
};
using (IConnection conn = connFactory.CreateConnection())
{
using (IModel channel = conn.CreateModel())
{
//交换机名称
String exchangeName = "exchange3";
//路由名称
String routeKey = args[0];
//声明交换机 通配符类型为topic
channel.ExchangeDeclare(exchange: exchangeName, type: "topic");
while (true)
{
Console.WriteLine("消息内容:");
String message = Console.ReadLine();
//消息内容
byte[] body = Encoding.UTF8.GetBytes(message);
//发送消息 发送到路由匹配的消息队列中
channel.BasicPublish(exchange: exchangeName, routingKey: routeKey, basicProperties: null, body: body);
Console.WriteLine("成功发送消息:" + message);
}
}
}
}
}
}

修改了两点:交换机名称(每个交换机只能声明一种类型,如果还用exchang2的话就会出异常),交换机类型改为topic

接收者代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

namespace HelloWorldRabbitMQ.topicRecoive
{
class Program
{
static void Main(string[] args)
{
if (args.Length == 0) throw new ArgumentException("args");
//创建一个随机数,以创建不同的消息队列
int random = new Random().Next(1, 1000);
Console.WriteLine("Start" + random.ToString());
IConnectionFactory connFactory = new ConnectionFactory//创建连接工厂对象
{
HostName = "192.168.1.37",//IP地址
Port = 5672,//端口号
UserName = "sa",//用户账号
Password = "Sa123456"//用户密码
};
using (IConnection conn = connFactory.CreateConnection())
{
using (IModel channel = conn.CreateModel())
{
//交换机名称
String exchangeName = "exchange3";
//声明交换机 通配符类型为topic
channel.ExchangeDeclare(exchange: exchangeName, type: "topic");
//消息队列名称
String queueName = exchangeName + "_" + random.ToString();
//声明队列
channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
//将队列与交换机进行绑定
foreach (var routeKey in args)
{//匹配多个路由
channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routeKey);
}
//声明为手动确认
channel.BasicQos(0, 1, false);
//定义消费者
var consumer = new EventingBasicConsumer(channel);
//接收事件
consumer.Received += (model, ea) =>
{
byte[] message = ea.Body.ToArray();//接收到的消息
Console.WriteLine("接收到信息为:" + Encoding.UTF8.GetString(message));
//返回消息确认
channel.BasicAck(ea.DeliveryTag, true);
};
//开启监听
channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
Console.ReadKey();
}
}
}
}
}

接收者修改与发送者一致

  运行结果如下


参考:

https://www.cnblogs.com/yan7/p/9498685.html

各种系统中会经常用到支付功能,需要对接各种第三方支付系统(微信、支付宝、网银等),这种系统模块如何架构呢,我这里提供一种架构思路。

阅读全文 »

一般情况下不建议修改系统的内核版本,但是当要使用一些需要高版本内核才能支持的功能的时候,必须要升级当前系统的内核版本。下面我们来简述升级过程。

阅读全文 »

前端利器VS Code,不只是可以开发DIV/CSS还可以开发nodejs typescript,这次我们介绍下如何配置使用vscode调试 typescript代码

全局安装ts-node

1
2
# 在终端执行
sudo npm i -g ts-node typescript # 已经安装过可以忽略

开启vscode的 自动附加模式

在编辑器主界面,执行快捷键 ctrl+shift+p,输入 Toggle Auto Attach 回车

开始调试

方式一

在 package.json 中增加 debug 启动脚本 node –inspect-brk -r ts-node/register src/main.ts

注意,最后的参数 src/main.ts 是程序的入口,可自行修改适配到自己的项目中

方式二

直接修改 .vscode 下的 launch.json文件 需改为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
{
// 使用 IntelliSense 了解相关属性。
// 悬停以查看现有属性的描述。
// 欲了解更多信息,请访问: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"type": "node",
"request": "launch",
"name": "Koa Program",
"runtimeArgs": [
"--nolazy",
"-r",
"ts-node/register",
],
"args": [
"${workspaceFolder}/app/app.ts"
],
"env": {
"DEBUG": "yuedun:*,-not_this",
"NODE_ENV": "development"
},
"sourceMaps": true,
"cwd": "${workspaceFolder}",
"protocol": "inspector",
"console": "integratedTerminal",
"internalConsoleOptions": "neverOpen"
}
]
}

OpenVZ 更新glibc到2.15

  • 方法一:rpm安装

先下载以下几个文件:

wget http://ftp.redsleeve.org/pub/steam/glibc-2.15-60.el6.x86_64.rpm \
http://ftp.redsleeve.org/pub/steam/glibc-common-2.15-60.el6.x86_64.rpm \
http://ftp.redsleeve.org/pub/steam/glibc-devel-2.15-60.el6.x86_64.rpm \
http://ftp.redsleeve.org/pub/steam/glibc-headers-2.15-60.el6.x86_64.rpm \
http://ftp.redsleeve.org/pub/steam/nscd-2.15-60.el6.x86_64.rpm

然后安装

rpm -Uvh glibc-2.15-60.el6.x86_64.rpm \
glibc-common-2.15-60.el6.x86_64.rpm \
glibc-devel-2.15-60.el6.x86_64.rpm \
glibc-headers-2.15-60.el6.x86_64.rpm \
nscd-2.15-60.el6.x86_64.rpm
  • 方法二:编译安装(上面的方式行不通,再用编译的方式,编译的方式相对时长会久很多)

    wget http://ftp.gnu.org/gnu/glibc/glibc-2.15.tar.gz
    wget http://ftp.gnu.org/gnu/glibc/glibc-ports-2.15.tar.gz
    tar -zxf glibc-2.15.tar.gz
    tar -zxf glibc-ports-2.15.tar.gz
    mv glibc-ports-2.15 glibc-2.15/ports
    mkdir glibc-build-2.15
    cd glibc-build-2.15
    ../glibc-2.15/configure –prefix=/usr –disable-profile –enable-add-ons –with-headers=/usr/include –with-binutils=/usr/bin
    make all && make install
    检查是否成功安装
    ldd –version
    成功安装的输入如下
    Copyright (C) 2012 Free Software Foundation, Inc.
    This is free software; see the source for copying conditions. There is NO
    warranty; not even for MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
    Written by Roland McGrath and Ulrich Drepper.

创建型

单例模式(Singleton Pattern)

工厂方法模式(Factory Pattern)

附加资料:

抽象工厂模式(Abstract Pattern)

建造者模式(Builder Pattern)

原型模式(Prototype Pattern)

结构型

适配器模式(Adapter Pattern)

桥接模式(Bridge Pattern)

装饰者模式(Decorator Pattern)

附加资料:

组合模式(Composite Pattern)

外观模式(Facade Pattern)

享元模式(Flyweight Pattern)

附加资料:

代理模式(Proxy Pattern)

行为型

模板方法模式(Template Method)

命令模式(Command Pattern)

迭代器模式(Iterator Pattern)

附加资料:

观察者模式(Observer Pattern)

中介者模式(Mediator Pattern)

状态模式(State Pattern)

策略模式(Stragety Pattern)

责任链模式(Chain of Responsibility Pattern)

访问者模式(Vistor Pattern)

备忘录模式(Memento Pattern)

解释器模式(Interpreter Pattern)

tail 命令可用于查看文件的内容,有一个常用的参数 -f 常用于查阅正在改变的日志文件。

tail -f filename 会把 filename 文件里的最尾部的内容显示在屏幕上,并且不断刷新,只要 filename 更新就可以看到最新的文件内容。

命令格式:

tail [参数] [文件]  

参数:

-f 循环读取
-q 不显示处理信息
-v 显示详细的处理信息
-c<数目> 显示的字节数
-n<行数> 显示行数
--pid=PID 与-f合用,表示在进程ID,PID死掉之后结束.
-q, --quiet, --silent 从不输出给出文件名的首部
-s, --sleep-interval=S 与-f合用,表示在每次反复的间隔休眠S秒

实例

要显示 notes.log 文件的最后 10 行,请输入以下命令:

tail notes.log

要跟踪名为 notes.log 的文件的增长情况,请输入以下命令:

tail -f notes.log

此命令显示 notes.log 文件的最后 10 行。当将某些行添加至 notes.log 文件时,tail 命令会继续显示这些行。 显示一直继续,直到您按下(Ctrl-C)组合键停止显示。

显示文件 notes.log 的内容,从第 20 行至文件末尾:

tail +20 notes.log

显示文件 notes.log 的最后 10 个字符:

tail -c 10 notes.log