`
halloffame
  • 浏览: 54570 次
  • 性别: Icon_minigender_1
  • 来自: 广州
社区版块
存档分类
最新评论

Apache Thrift 初学小讲(九)【测试接口的QPS】

阅读更多

QPS是接口每秒处理成功的调用次数,RT是处理一次请求所需要的平均时间,并发量是系统能同时处理的请求数。公式:并发量 = QPS * 平均响应时间

 

这3个是比较重要的性能指标,如何测试这几个指标呢,可以用工具jmeter,jmeter原生支持http,但像thrift这种rpc接口就需要自己写jmeter的扩展了,可以参考https://www.jianshu.com/p/455e57ab329a。下面不用工具,用java写了一个,主要思路是启动n个线程并发的不断的调用接口一段时间,然后统计计算成功次数,失败次数和平均响应时间,主要用了并发包里的以下几个工具类:

 

CyclicBarrier:n个线程都到达await时才开始调用接口,模拟并发的感觉。

CountDownLatch:n个线程都运行结束后,主线程才开始统计计算。

LongAdder:计数器,线程安全,AtomicLong的升级版本,类似HashMap和ConcurrentHashMap的区别。

 

package com.halloffame.thriftjsoa;
   
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.LongAdder;

import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TFastFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;

import thrift.test.ThriftTest;
import thrift.test.User;

public class TestQpsClient {
    private static LongAdder successCount = new LongAdder(); //成功次数
    private static LongAdder failCount = new LongAdder(); //失败次数
    private static LongAdder reponseTimeSum = new LongAdder(); //总响应时间,单位:ms

    private static int threadNum = 12; //线程数
    private static int runTime = 60; //线程运行时间,单位:s

    //cyclicBarrier目的是让所有线程同时运行,模拟并发请求
    private static CyclicBarrier cyclicBarrier = new CyclicBarrier(threadNum + 1);
    private static volatile boolean isFinish = false; //结束线程运行标志变量
    //countDownLatch目的是所有线程都结束运行后,主线程才统计
    private static CountDownLatch countDownLatch = new CountDownLatch(threadNum);

    public static void main(String[] args) throws Exception {
        for (int i = 0; i < threadNum; i++) {
            ReqThread reqThread = new ReqThread(i);
            reqThread.start();
        }

        cyclicBarrier.await();
        Thread.sleep(runTime * 1000);

        isFinish = true;
        countDownLatch.await();

        long successLongCount = successCount.longValue();
        long failLongCount = failCount.longValue();
        long reponseTimeLongSum = reponseTimeSum.longValue();
        
        long totalCount = successLongCount + failLongCount;
        double avgRepTime = reponseTimeLongSum / totalCount;
        double qps = successLongCount / runTime;
        double concurrency = qps * (avgRepTime / 1000);
        
        System.out.println("成功调用次数:" + successLongCount);
        System.out.println("失败调用次数:" + failLongCount);
        System.out.println("平均响应时间:" + avgRepTime + "毫秒");
        System.out.println("QPS:" + qps);
        System.out.println("并发:" + concurrency);
    }

    static class ReqThread extends Thread {
        private int i;

        public ReqThread(int i){
            this.i = i;
        }

        @Override
        public void run() {
            try {
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
            
            while ( !isFinish ) {
            	long startTime = System.currentTimeMillis();
            	
            	User user = null;
            	TTransport transport = null;
                try {
                	TSocket socket = new TSocket("localhost", 4567);
                    transport = new TFastFramedTransport(socket);
        			transport.open();
                    TProtocol tProtocol = new TCompactProtocol(transport);
                    ThriftTest.Client testClient = new ThriftTest.Client(tProtocol);
                    
					user = testClient.getUser(2);
				} catch (Exception e) {
					user = null;
					e.printStackTrace();
				} finally {
					if (transport != null && transport.isOpen()) {
						transport.close();
					}
				}
                
                long endTime = System.currentTimeMillis();  
                System.out.println("线程" + i + "调用结果:" + user);
                reponseTimeSum.add(endTime - startTime);
                
                if (user != null) {
                	successCount.increment();
                } else {
                	failCount.increment();
                } 
            }

            countDownLatch.countDown();
        }
    }

}

 

在我的机器运行的后阶段出现了异常:java.net.BindException: Address already in use: connect,原因是端口被消耗完,即使socket.close(),端口也不是立即关闭,而是TIME_WAIT状态。可以参考https://blog.csdn.net/ceo158/article/details/9311065增大可分配的tcp连接端口数,减小处于TIME_WAIT状态的连接的生存时间。

 

代码已经放在GitHub:https://github.com/halloffamezwx/thriftjsoa

 

推荐一些文章:

线程安全实现与CLH队列:https://www.jianshu.com/p/0f6d3530d46b

ThreadLocalMap里弱引用:https://blog.csdn.net/vicoqi/article/details/79743112

 

感谢阅读!!!

0
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics