来源:
工作需要实现消息路由的中间层模块
测试结果:
2W客户, 每客户10 Request -> MQEngine -> 本地tomcat
消息转发并发量: 6.7K 零丢包
设计图
1. MQEngine
package lightmq; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class MQEngine<E, P extends MessageHandler<E>> { /** * 消息队列 * TODO 优化改为 LightQueue(内部实现为queue组) */ final Queue<E> queue = new ConcurrentLinkedQueue<E>(); //final Queue<E> queue1 = new LightQueue<E>(10); /** * handler class */ private Class<? extends MessageHandler<E>> handlerClass; /** * 消费者线程池 */ ExecutorService consumerES; /** * 消费者数量 */ private int consumerSize = 1; private Runnable[] consumers; /** * 构造函数 * * @param c 处理器类 */ public MQEngine(Class<? extends MessageHandler<E>> c){ this(1, 1, c); } /** * 构�?函数 * * @param threadPoolSize 线程池大�? * @param consumerSize 消息者数�? * @param c 处理器类�? */ public MQEngine(int threadPoolSize, int consumerSize, Class<? extends MessageHandler<E>> c){ consumerES = Executors.newFixedThreadPool(threadPoolSize); this.handlerClass = c; this.consumerSize = consumerSize; } /** * 启动消费者线�? * @param consumerSize * @param c */ public void start() { final Class<? extends MessageHandler<E>> c = this.handlerClass; class ConsumerTask implements Runnable{ @Override public void run() { MessageHandler<E> p = null; try { p = c.newInstance(); } catch (InstantiationException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } catch (IllegalAccessException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } // int i = 0; while(true){ try { if (!queue.isEmpty()) { p.consume(queue.poll()); } i++; } catch (Exception e) { e.printStackTrace(); } // 每执行100次 if(10==i){ synchronized (this) { try { i = 0; wait(100); } catch (InterruptedException e) { e.printStackTrace(); } } } } } } this.consumers = new Runnable[this.consumerSize]; for (int i = 0; i < this.consumers.length; i++) { consumers[i] = new ConsumerTask(); } for (int i = 0; i < consumers.length; i++) { consumerES.execute(consumers[i]); } } /** * * @param e */ public void push(E e){ queue.add(e); for (int i = 0; i < this.consumers.length; i++) { synchronized (consumers[i]) { consumers[i].notify(); } } } /** * */ public void destory(){ this.queue.clear(); this.consumerES.shutdown(); } }
2. MessageHandler
package lightmq; /** * 消息处理器 * 由子类派生 * @author kevin * * @param <E> */ public abstract class MessageHandler<E> { public abstract void consume(E e); }
3.MyMessageHandler
package lightmq; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicLong; import org.apache.http.HttpResponse; import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; import org.apache.http.impl.nio.client.HttpAsyncClients; import org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager; import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor; import org.apache.http.nio.conn.NHttpClientConnectionManager; /** * 业务相关消息处理器 * @author kevin * */ public class MyMessageHandler extends MessageHandler<String>{ static AtomicLong sentCount = new AtomicLong(0); static NHttpClientConnectionManager connMgr; @Override public void consume(String e) { sendToTomcat(e); } private CloseableHttpAsyncClient httpclient = null; public MyMessageHandler(){ try { connMgr = new PoolingNHttpClientConnectionManager(new DefaultConnectingIOReactor()); httpclient = HttpAsyncClients.createMinimal(connMgr); httpclient.start(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } /** * 发给sc * @param message */ private void sendToTomcat(String message){ long startTime = System.currentTimeMillis(); try { // http[GET]请求, final HttpGet request1 = new HttpGet("http://localhost"); Future<HttpResponse> future = httpclient.execute(request1, null); // and wait until a response is received HttpResponse response1; response1 = future.get(); System.out.println("message " + message + ":" + request1.getRequestLine() + "->" + response1.getStatusLine()); System.out.println(message + " Sent; Cost:" + (System.currentTimeMillis() - startTime) + "; Succeed Sent: " + sentCount.incrementAndGet()); } catch (Exception e1) { System.err.println(e1.getMessage()); } finally{ // 关闭链接 if(null!=httpclient){ try { //httpclient.close(); } catch (Exception e1) { //e1.printStackTrace(); System.err.println(e1.getMessage()); } } } } }
4.M2Queue
package lightmq; import java.util.Collection; import java.util.Iterator; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; /** * * @author kevin.xu * * @param <V> */ public class M2Queue<V> implements Queue<V>{ /** * 队列数组 */ private Queue<V> queues[]; /** * * @param initQueueSize */ public M2Queue(int initQueueSize) { queues = new Queue[initQueueSize]; for (int i = 0; i < queues.length; i++) { queues[i] = new ConcurrentLinkedQueue<V>(); } } @Override public int size() { return 0; } @Override public boolean isEmpty() { // TODO Auto-generated method stub return false; } @Override public boolean contains(Object o) { // TODO Auto-generated method stub return false; } @Override public Iterator<V> iterator() { // TODO Auto-generated method stub return null; } @Override public Object[] toArray() { // TODO Auto-generated method stub return null; } @Override public <T> T[] toArray(T[] a) { // TODO Auto-generated method stub return null; } @Override public boolean remove(Object o) { // TODO Auto-generated method stub return false; } @Override public boolean containsAll(Collection<?> c) { // TODO Auto-generated method stub return false; } @Override public boolean addAll(Collection<? extends V> c) { // TODO Auto-generated method stub return false; } @Override public boolean removeAll(Collection<?> c) { // TODO Auto-generated method stub return false; } @Override public boolean retainAll(Collection<?> c) { // TODO Auto-generated method stub return false; } @Override public void clear() { // TODO Auto-generated method stub } @Override public boolean add(V e) { return offer(e); } /** * 添加到元素最少的队列中 */ @Override public boolean offer(V e) { return queues[getSmallestQueueIndex()].offer(e); } /** * 从元素最大的队列中remove */ @Override public V remove() { return queues[getLargestQueueIndex()].remove(); } /** * 从元素最大的队列中poll */ @Override public V poll() { return queues[getLargestQueueIndex()].poll(); } /** * 从元素最大的队列中element */ @Override public V element() { return queues[getLargestQueueIndex()].element(); } /** * 先从记录数最多的queue里peek */ @Override public V peek() { return queues[getLargestQueueIndex()].peek(); } /** * 返回最少记录数的queue * * @return */ private int getSmallestQueueIndex(){ int index = 0; if (queues.length > 1) { for (int i = index; i < queues.length; i++) { if(queues[i].size() > queues[i+1].size()){ index = i+1; } } } return index; } /** * 返回最多记录数的queue * * @return */ private int getLargestQueueIndex(){ int index = 0; if (queues.length > 1) { for (int i = index; i < queues.length; i++) { if(queues[i].size() < queues[i+1].size()){ index = i+1; } } } return index; } }
5.TestMQ
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicLong; import lightmq.MQEngine; import lightmq.MyMessageHandler; /** * MQEngine测试类 * @author kevin * */ public class TestMQ { public static void main(String[] args) { final AtomicLong l = new AtomicLong(0); // final MQEngine<String, MyMessageHandler> mq = new MQEngine<String, MyMessageHandler>(10, 50, MyMessageHandler.class); mq.start(); // 模拟客户并发数 final int PRODUCER_SIZE = 200000; // 模拟每个客户平均请求次数 final int REQUEST_TIME = 10; ExecutorService es = Executors.newFixedThreadPool(10); for (int i = 0; i < PRODUCER_SIZE; i++) { es.execute(new Runnable() { @Override public void run() { for (int i = 0; i < REQUEST_TIME; i++) { mq.push(String.valueOf(l.incrementAndGet())); } } }); } try { Thread.sleep(1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println(mq.size()); } }
相关推荐
ACE 服务器源码,高并发的高性能处理器和源码
基于众核处理器的高并发视频转码与分发系统.pdf
所谓并发编程是指在一台处理器上“同时”处理多个任务。并发是在同一实体上的多个事件。多个事件在同一时间间隔发生。
线程池 - 合理配置 1. CPU密集型任务,就需要尽量压榨CPU,参考可以设置为NCPU+1 2. IO密集型任务,参考可以设置为2*N...> 如果希望处理器达到理想的使用率,那么线程池的最优大小为: > 线程池大小=NCPU *UCPU(1+W/C)
多处理器编程的艺术,高并发高性能服务端开发必备书籍
《多处理器编程艺术》课后答案
golang爬虫框架,适用于刚学习golang语言,想要学习golang开发高并发程序、网络爬虫相关知识。该爬虫框架有三个处理模块:下载器、分析器和条目处理器,再加上调度和协调这些处理模块运行的控制模块,我们可以明晰该...
93个netty高并发全面的教学视频下载,每个视频在400-700M,一到两个小时时长的视频,无机器码和解压密码,下载下来的就是MP4格式视频。点击即可观看学习。下载txt文档,里面有永久分享的连接。包括01_学习的要义;02...
Java秒杀系统方案优化高性能高并发学习实战源代码以及笔记..zip 章节笔记 第1章-课程介绍及项目框架搭建 知识点 使用spring boot 搭建项目基础框架 使用Thymeleaf做页面展示,封装Result统一结果 集成 mybatis + ...
大型高并发高负载网站的系统架构分析!简单的来说,如果一个系统可扩展,那么你可以通过扩展来提供系统的性能。这代表着系统能够容纳更高的负载、更大的数据集,并且系统是可维护的。扩展和语言、某项具体的技术都是...
LMAX是一种新型零售金融交 易平台,它能够以很低的延迟产生大量交易。这个系统是建立在JVM平台上,其核心是一个业务逻辑...业务逻辑处理器完全是运行在内存中,使用事件源驱动方 式。业务逻辑处理器的核心是Disruptor。
《JAVA并发编程实践》随着多核处理器的普及,使用并发成为构建高性能应用程序的关键。Java 5以及6在开发并发程序中取得了显著的进步,提高了Java虚拟机的性能以及并发类的可伸缩性,并加入了丰富的新并发构建块。在...
《JAVA并发编程实践》随着多核处理器的普及,使用并发成为构建高性能应用程序的关键。Java 5以及6在开发并发程序中取得了显著的进步,提高了Java虚拟机的性能以及并发类的可伸缩性,并加入了丰富的新并发构建块。在...
《JAVA并发编程实践》随着多核处理器的普及,使用并发成为构建高性能应用程序的关键。Java 5以及6在开发并发程序中取得了显著的进步,提高了Java虚拟机的性能以及并发类的可伸缩性,并加入了丰富的新并发构建块。在...
《JAVA并发编程实践》随着多核处理器的普及,使用并发成为构建高性能应用程序的关键。Java 5以及6在开发并发程序中取得了显著的进步,提高了Java虚拟机的性能以及并发类的可伸缩性,并加入了丰富的新并发构建块。在...