`
kevin.xqw
  • 浏览: 9988 次
  • 性别: Icon_minigender_1
  • 来自: 广州
社区版块
存档分类
最新评论

高并发消息处理器

阅读更多

来源:

工作需要实现消息路由的中间层模块

测试结果:

 

2W客户, 每客户10 Request -> MQEngine -> 本地tomcat

消息转发并发量: 6.7K 零丢包

设计图



1. MQEngine

package lightmq;

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class MQEngine<E, P extends MessageHandler<E>> {
	
	/**
     * 消息队列
     * TODO 优化改为 LightQueue(内部实现为queue组)
     */
    final Queue<E> queue = new ConcurrentLinkedQueue<E>();
    //final Queue<E> queue1 = new LightQueue<E>(10);
    
    /**
     * handler class
     */
    private Class<? extends MessageHandler<E>> handlerClass;
    
    /**
     * 消费者线程池
     */
    ExecutorService consumerES;    
    
    /**
     * 消费者数量
     */
    private int consumerSize = 1;
    
    private Runnable[] consumers;
    
    /**
     * 构造函数
     * 
     * @param c 处理器类
     */
    public MQEngine(Class<? extends MessageHandler<E>> c){
        this(1, 1, c);
    }
    
    /**
     * 构�?函数 
     * 
     * @param threadPoolSize 线程池大�?
     * @param consumerSize 消息者数�?
     * @param c 处理器类�?
     */
    public MQEngine(int threadPoolSize, int consumerSize, Class<? extends MessageHandler<E>> c){
        consumerES = Executors.newFixedThreadPool(threadPoolSize);
        this.handlerClass = c;
        this.consumerSize = consumerSize;        
    }
    
    /**
     * 启动消费者线�?
     * @param consumerSize
     * @param c
     */
    public void start() {
        final Class<? extends MessageHandler<E>> c = this.handlerClass;
        
        class ConsumerTask implements Runnable{            
            
            @Override
            public void run() {
                MessageHandler<E> p = null;
                try {
                    p = c.newInstance();
                } catch (InstantiationException e1) {
                    // TODO Auto-generated catch block
                    e1.printStackTrace();
                } catch (IllegalAccessException e1) {
                    // TODO Auto-generated catch block
                    e1.printStackTrace();
                }
                // 
                int i = 0;
                while(true){
                    try {
                        if (!queue.isEmpty()) {
                            p.consume(queue.poll());
                        }
                        i++;
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    // 每执行100次
                    if(10==i){
                        synchronized (this) {
                            try {
                                i = 0;
                                wait(100);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }                            
                    }
                }
            }
        }
        
        this.consumers = new Runnable[this.consumerSize];
        for (int i = 0; i < this.consumers.length; i++) {
            consumers[i] = new ConsumerTask();
        }
        
        for (int i = 0; i < consumers.length; i++) {
            consumerES.execute(consumers[i]);
        }
    }

    /**
     * 
     * @param e
     */
    public void push(E e){
        queue.add(e);
        for (int i = 0; i < this.consumers.length; i++) {
            synchronized (consumers[i]) {
                consumers[i].notify();
            }            
        }        
    }
    
    /**
     * 
     */
    public void destory(){
        this.queue.clear();
        this.consumerES.shutdown();
    }
	
}

 

2. MessageHandler

 

 

package lightmq;

/**
 * 消息处理器
 * 由子类派生
 * @author kevin
 *
 * @param <E>
 */
public abstract class MessageHandler<E> {
	
	public abstract void consume(E e);

}

 

3.MyMessageHandler

 

package lightmq;

import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.impl.nio.client.HttpAsyncClients;
import org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager;
import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
import org.apache.http.nio.conn.NHttpClientConnectionManager;

/**
 * 业务相关消息处理器
 * @author kevin
 *
 */
public class MyMessageHandler extends MessageHandler<String>{
	
	static AtomicLong sentCount = new AtomicLong(0);
    
    static NHttpClientConnectionManager connMgr;
    
    @Override
    public void consume(String e) {
        sendToTomcat(e);
    }
    
    private CloseableHttpAsyncClient httpclient = null;
    
    public MyMessageHandler(){
        
        try {
            connMgr = new PoolingNHttpClientConnectionManager(new DefaultConnectingIOReactor());
            httpclient = HttpAsyncClients.createMinimal(connMgr);
            httpclient.start();
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
    
    /**
     * 发给sc
     * @param message
     */
    private void sendToTomcat(String message){
        long startTime = System.currentTimeMillis();        
        
        try {
            
            
            // http[GET]请求, 
            final HttpGet request1 = new HttpGet("http://localhost");
            Future<HttpResponse> future = httpclient.execute(request1, null);
            
            // and wait until a response is received
            HttpResponse response1;
            response1 = future.get();
            System.out.println("message " + message + ":" + request1.getRequestLine() + "->" + response1.getStatusLine());
            System.out.println(message + " Sent; Cost:" + (System.currentTimeMillis() - startTime) + "; Succeed  Sent: " + sentCount.incrementAndGet());
        } catch (Exception e1) {
            System.err.println(e1.getMessage());
        } finally{
            // 关闭链接
            if(null!=httpclient){
                try {
                    //httpclient.close();
                } catch (Exception e1) {
                    //e1.printStackTrace();
                    System.err.println(e1.getMessage());
                }
            }
        }
    }
}

 4.M2Queue

 

package lightmq;

import java.util.Collection;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
 * 
 * @author kevin.xu
 *
 * @param <V>
 */
public class M2Queue<V> implements Queue<V>{
	/**
     * 队列数组
     */
    private Queue<V> queues[];
    
    /**
     * 
     * @param initQueueSize
     */
    public M2Queue(int initQueueSize) {
        queues = new Queue[initQueueSize];
        for (int i = 0; i < queues.length; i++) {
            queues[i] = new ConcurrentLinkedQueue<V>();
        }
    }

    @Override
    public int size() {
        return 0;
    }

    @Override
    public boolean isEmpty() {
        // TODO Auto-generated method stub
        return false;
    }

    @Override
    public boolean contains(Object o) {
        // TODO Auto-generated method stub
        return false;
    }

    @Override
    public Iterator<V> iterator() {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public Object[] toArray() {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public <T> T[] toArray(T[] a) {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public boolean remove(Object o) {
        // TODO Auto-generated method stub
        return false;
    }

    @Override
    public boolean containsAll(Collection<?> c) {
        // TODO Auto-generated method stub
        return false;
    }

    @Override
    public boolean addAll(Collection<? extends V> c) {
        // TODO Auto-generated method stub
        return false;
    }

    @Override
    public boolean removeAll(Collection<?> c) {
        // TODO Auto-generated method stub
        return false;
    }

    @Override
    public boolean retainAll(Collection<?> c) {
        // TODO Auto-generated method stub
        return false;
    }

    @Override
    public void clear() {
        // TODO Auto-generated method stub
        
    }

    @Override
    public boolean add(V e) {
        return offer(e);
    }

    /**
     * 添加到元素最少的队列中
     */
    @Override
    public boolean offer(V e) {
        return queues[getSmallestQueueIndex()].offer(e);
    }

    /**
     * 从元素最大的队列中remove
     */
    @Override
    public V remove() {
        return queues[getLargestQueueIndex()].remove();
    }

    /**
     * 从元素最大的队列中poll
     */
    @Override
    public V poll() {
        return queues[getLargestQueueIndex()].poll();
    }

    /**
     * 从元素最大的队列中element
     */
    @Override
    public V element() {
        return queues[getLargestQueueIndex()].element();
    }

    /**
     * 先从记录数最多的queue里peek
     */
    @Override
    public V peek() {
        return queues[getLargestQueueIndex()].peek();
    }
    
    /**
     * 返回最少记录数的queue
     * 
     * @return
     */
    private int getSmallestQueueIndex(){
        int index = 0;
        if (queues.length > 1) {
            for (int i = index; i < queues.length; i++) {
                if(queues[i].size() > queues[i+1].size()){
                    index = i+1;
                }
            }
        }
        return index;
    }
    
    /**
     * 返回最多记录数的queue
     * 
     * @return
     */
    private int getLargestQueueIndex(){
        int index = 0;
        if (queues.length > 1) {
            for (int i = index; i < queues.length; i++) {
                if(queues[i].size() < queues[i+1].size()){
                    index = i+1;
                }
            }
        }
        return index;
    }
}

 5.TestMQ

 

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;

import lightmq.MQEngine;
import lightmq.MyMessageHandler;

/**
 * MQEngine测试类
 * @author kevin
 *
 */
public class TestMQ {
	public static void main(String[] args) {	
		final AtomicLong l = new AtomicLong(0);
		// 
		final MQEngine<String, MyMessageHandler> mq = new MQEngine<String, MyMessageHandler>(10, 50, MyMessageHandler.class);		
		mq.start();
		// 模拟客户并发数
		final int PRODUCER_SIZE = 200000;
		// 模拟每个客户平均请求次数
		final int REQUEST_TIME = 10;
		
		ExecutorService es = Executors.newFixedThreadPool(10);
		for (int i = 0; i < PRODUCER_SIZE; i++) {
			es.execute(new Runnable() {
				@Override
				public void run() {
					for (int i = 0; i < REQUEST_TIME; i++) {						
						mq.push(String.valueOf(l.incrementAndGet()));
					}
				}
			});
		}
		
		
		try {
			Thread.sleep(1000);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		
		System.out.println(mq.size());
		
	}
}

 

13
6
分享到:
评论
5 楼 kevin.xqw 2014-01-10  
iablee 写道
TestMQ 中 17行 mq是不是要调用start()方法?


是的,应该加上mq.start();
谢谢指正
4 楼 iablee 2013-12-31  
TestMQ 中 17行 mq是不是要调用start()方法?
3 楼 qingling600 2013-12-31  
MyMessageHandler
56行有问题,没有办法编译过
2 楼 kevin.xqw 2013-12-31  
linde 写道
MQEngine
110行有问题吗?


谢谢linde指正,是要改为 consumerES.execute(consumers[i]);
1 楼 linde 2013-12-30  
MQEngine
110行有问题吗?

相关推荐

    ACE 服务器源码,高并发的高性能处理器和源码

    ACE 服务器源码,高并发的高性能处理器和源码

    基于众核处理器的高并发视频转码与分发系统.pdf

    基于众核处理器的高并发视频转码与分发系统.pdf

    高并发解决策略.mp4

    所谓并发编程是指在一台处理器上“同时”处理多个任务。并发是在同一实体上的多个事件。多个事件在同一时间间隔发生。

    高并发线程配置建议-合理配置

    线程池 - 合理配置 1. CPU密集型任务,就需要尽量压榨CPU,参考可以设置为NCPU+1 2. IO密集型任务,参考可以设置为2*N...&gt; 如果希望处理器达到理想的使用率,那么线程池的最优大小为: &gt; 线程池大小=NCPU *UCPU(1+W/C)

    多处理器编程的艺术 中文

    多处理器编程的艺术,高并发高性能服务端开发必备书籍

    《多处理器编程艺术》课后答案

    《多处理器编程艺术》课后答案

    golang爬虫框架,golang高并发实战

    golang爬虫框架,适用于刚学习golang语言,想要学习golang开发高并发程序、网络爬虫相关知识。该爬虫框架有三个处理模块:下载器、分析器和条目处理器,再加上调度和协调这些处理模块运行的控制模块,我们可以明晰该...

    93个netty高并发教学视频下载.txt

    93个netty高并发全面的教学视频下载,每个视频在400-700M,一到两个小时时长的视频,无机器码和解压密码,下载下来的就是MP4格式视频。点击即可观看学习。下载txt文档,里面有永久分享的连接。包括01_学习的要义;02...

    Java秒杀系统方案优化高性能高并发学习实战源代码以及笔记..zip

    Java秒杀系统方案优化高性能高并发学习实战源代码以及笔记..zip 章节笔记 第1章-课程介绍及项目框架搭建 知识点 使用spring boot 搭建项目基础框架 使用Thymeleaf做页面展示,封装Result统一结果 集成 mybatis + ...

    大型高并发高负载网站的系统架构分析

    大型高并发高负载网站的系统架构分析!简单的来说,如果一个系统可扩展,那么你可以通过扩展来提供系统的性能。这代表着系统能够容纳更高的负载、更大的数据集,并且系统是可维护的。扩展和语言、某项具体的技术都是...

    LMAX.Disruptor,一个无锁高并发框架,中文文档

    LMAX是一种新型零售金融交 易平台,它能够以很低的延迟产生大量交易。这个系统是建立在JVM平台上,其核心是一个业务逻辑...业务逻辑处理器完全是运行在内存中,使用事件源驱动方 式。业务逻辑处理器的核心是Disruptor。

    JAVA并发编程实践

    《JAVA并发编程实践》随着多核处理器的普及,使用并发成为构建高性能应用程序的关键。Java 5以及6在开发并发程序中取得了显著的进步,提高了Java虚拟机的性能以及并发类的可伸缩性,并加入了丰富的新并发构建块。在...

    JAVA并发编程实践高清中文带书签

    《JAVA并发编程实践》随着多核处理器的普及,使用并发成为构建高性能应用程序的关键。Java 5以及6在开发并发程序中取得了显著的进步,提高了Java虚拟机的性能以及并发类的可伸缩性,并加入了丰富的新并发构建块。在...

    java并发编程实践高清中文版+源码

    《JAVA并发编程实践》随着多核处理器的普及,使用并发成为构建高性能应用程序的关键。Java 5以及6在开发并发程序中取得了显著的进步,提高了Java虚拟机的性能以及并发类的可伸缩性,并加入了丰富的新并发构建块。在...

    JAVA并发编程实践(英文版)

    《JAVA并发编程实践》随着多核处理器的普及,使用并发成为构建高性能应用程序的关键。Java 5以及6在开发并发程序中取得了显著的进步,提高了Java虚拟机的性能以及并发类的可伸缩性,并加入了丰富的新并发构建块。在...

Global site tag (gtag.js) - Google Analytics