厂商分配问题
# 问题描述
这里有一道java多线程的场景问题。现在mysql数据库中有10000条消息,有A、B、C厂商,现在要求将这数据库中的10000条消息以最快的速度发送给A、B、C,其中A每秒最多收到15条消息,B最多收到10条消息,C最多收到8条消息,怎么设计系统让10000条消息最快发送到3个厂商
# 解决方案
要实现这个场景,你可以使用Java多线程编程来提高消息发送的效率。以下是一个简单的设计方案:
- 创建一个线程池:首先,创建一个固定大小的线程池,用于处理消息的发送。可以使用
ExecutorService
来实现线程池。 - 从数据库读取消息:编写一个方法从MySQL数据库中读取消息,并将这些消息放入一个共享的消息队列中。消息队列可以使用
BlockingQueue
来实现,这样可以确保线程之间的同步。 - 创建三个消费者线程:分别创建三个消费者线程,分别代表A、B、C厂商,用于从消息队列中获取消息,并进行发送。在每个消费者线程中,使用适当的逻辑来控制每个厂商接收消息的速率。
- 控制消息发送速率:为了控制每个厂商接收消息的速率,可以使用
Thread.sleep()
方法在每次发送消息之间添加适当的延迟。通过计算发送消息的频率与规定的速率之间的差异,来决定是否需要添加延迟。
# 示例代码
import java.util.concurrent.*;
public class MessageSender {
private static final int TOTAL_MESSAGES = 10000;
private static final int A_MAX_PER_SECOND = 15;
private static final int B_MAX_PER_SECOND = 10;
private static final int C_MAX_PER_SECOND = 8;
private static final int THREAD_POOL_SIZE = 5; // Adjust the size based on your system's capabilities
private static BlockingQueue<String> messageQueue = new LinkedBlockingQueue<>();
private static ExecutorService executorService = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
public static void main(String[] args) {
// Read messages from the database and put them in the message queue
readMessagesFromDatabase();
// Start three consumer threads for A, B, and C
executorService.execute(() -> sendMessageTo("A", A_MAX_PER_SECOND));
executorService.execute(() -> sendMessageTo("B", B_MAX_PER_SECOND));
executorService.execute(() -> sendMessageTo("C", C_MAX_PER_SECOND));
// Shutdown the executor service once all messages are sent
executorService.shutdown();
}
private static void readMessagesFromDatabase() {
// Simulate reading messages from the database and adding them to the queue
for (int i = 1; i <= TOTAL_MESSAGES; i++) {
messageQueue.add("Message " + i);
}
}
private static void sendMessageTo(String vendor, int maxPerSecond) {
while (!messageQueue.isEmpty()) {
String message = messageQueue.poll();
if (message != null) {
// Simulate sending the message to the vendor
System.out.println("Sending " + message + " to Vendor " + vendor);
// Calculate the delay based on the vendor's max messages per second
long delay = 1000L / maxPerSecond;
try {
// Add delay to control the message sending rate
Thread.sleep(delay);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
编辑 (opens new window)
上次更新: 2024/02/22, 14:03:19