设为首页 收藏本站
云服务器等爆品抢先购,低至4.2元/月
查看: 841|回复: 0

[经验分享] 模块简单设计——用Jetty实现一个client adapter

[复制链接]

尚未签到

发表于 2017-2-27 09:32:49 | 显示全部楼层 |阅读模式
  首先看看下面的应用场景
DSC0000.jpg

  传统的服务器端为若干个客户端提供服务,一般需要开启多个服务器端进程。为了进一步提升服务器端的处理能力,可以如图所示将服务解耦为两部分(adapter与workers),它们之间通过消息队列传输数据,其中workers处理具体业务,adapter负责接入请求以及反馈结果,具体包含下面两个工作。
  1,将所有客户端的请求发送到消息队列(进而传给后台处理)
  2,后台处理完毕后将结果返回响应队列,client adapter获取到结果后返回给相应客户端。
  我们选择用Jetty(8),redis以及php简单实现这个场景,主要用到jetty的continuation机制和redis的list数据结构
  A,先配置一个服务器如下,同时开启一个守护线程阻塞监听response queue(用到json lib库以及jedis库)

package test;
import java.util.HashMap;
import java.util.List;
import org.eclipse.jetty.continuation.Continuation;
import org.eclipse.jetty.server.*;
import org.eclipse.jetty.server.nio.SelectChannelConnector;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.json.simple.*;
import redis.clients.jedis.Jedis;
public class PJetty{
public static HashMap<String,Continuation>globalMap = new HashMap<String,Continuation>();
// 用一个守护线程阻塞等待结果队列返回数据
public static class DaemonThread extends Thread{
private JSONObject obj = new JSONObject();
private Jedis jedis = new Jedis("127.0.0.1",6379);
private List<String> res;
public void run(){
while(true){
// 阻塞等待响应队列
res = jedis.brpop(0, "response_queue");
// 获取结果信息
String response = res.get(1);
obj=(JSONObject) JSONValue.parse(response);
String request_sid = obj.get("request_sid").toString();
String result = obj.get("results").toString();
if(request_sid == null){
continue;
}
// 通过消息中的连接的sessonid获取到响应的continuation实例,然后设置结果信息再唤醒实例
Continuation con = globalMap.get(request_sid);
if(con == null){continue;}
globalMap.remove(request_sid);
//客户端异常断开这里会抛错
try{
con.setAttribute("results", result);
con.resume();
} catch(Exception e){
continue;
}
}
}
}
public static void main(String[] args) throws Exception {
//开启守护线程去阻塞等待响应结果队列,唤醒请求
DaemonThread dt = new DaemonThread();
dt.start();
//设置connectors
SelectChannelConnector connector1 = new SelectChannelConnector();
connector1.setPort(1987);
connector1.setThreadPool(new QueuedThreadPool(5));
Server server = new Server();
server.setConnectors(new Connector[]{connector1});
//使用servlet处理请求
ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
context.setContextPath("/");
context.addServlet(new ServletHolder(new NonBlockingServlet()), "/fetch");
server.setHandler(context);
server.start();
server.join();
}
}
  B,实现自定义的servlets接受前端client连接,将请求信息传入队列request queue

package test;
import java.io.IOException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.servlet.http.HttpSession;
import org.eclipse.jetty.continuation.Continuation;
import org.eclipse.jetty.continuation.ContinuationSupport;
import org.json.simple.JSONObject;
import redis.clients.jedis.Jedis;
public class NonBlockingServlet extends HttpServlet {
/**
* generated serialize number
*/
private static final long serialVersionUID = 3313258432391586994L;

protected void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException
{
// 用sleeptime来模拟后台工作量
String sleepTime = request.getParameter("st");
if(sleepTime == null){
sleepTime = "0";
}
// 查看结果队列是否返回此连接的请求结果
Object results = request.getAttribute("results");
if (results==null) // 如果异步处理尚未返回结果
{
Continuation continuation = ContinuationSupport.getContinuation(request);
if(continuation.isInitial()){
// 设置连接超时时间
continuation.setTimeout(10000);
response.setContentType("text/plain");
response.getWriter().flush();
HttpSession session=request.getSession();
String sid = session.getId();
Jedis jedis = new Jedis("127.0.0.1",6379);
//将请求连接sessionid以及请求内容encode后传到处理队列中
JSONObject obj=new JSONObject();
obj.put("request_sid",sid);
obj.put("params",sleepTime);
jedis.lpush("request_queue", obj.toJSONString());
//将连接和continuation实例做个映射关系存到全局hashmap中,不确定这里是否应该加锁
PJetty.globalMap.put(sid, continuation);
}
// 判断是否超时
if (continuation.isExpired())
{
// 返回超时Response
response.getWriter().println("timeout");
response.getWriter().flush();  
return;
}
// 挂起HTTP连接
continuation.suspend();
return; // or continuation.undispatch();
}
// 连接恢复后返回结果
response.getWriter().println("Got Result:\t" + results);
response.getWriter().flush();  
}
}
  C,实现后端worker.php(可以自定义worker进程数,进程数多能获取更好的并发)(用到predis库)

#!/root/bin/php
<?php
require_once("lib/Predis/Autoloader.php");
function worker_thread()
{
Predis\Autoloader::register();
$redis = new Predis\Client('tcp://127.0.0.1:6379');
while(true){
try{
$request = $redis->brpop("request_queue", 0);
} catch(Exception $e){
continue;
}
/** demo
array(2) {
[0]=>
string(13) "request_queue"
[1]=>
string(55) "{"request_sid":"q0muxazo8k1h1k3uw85wuayh","params":"4"}"
}
*/
$request = json_decode($request[1], true);
// sleep represents work loads
sleep(intval($request["params"]));
$results = array("request_sid"=>$request["request_sid"], "results"=>$request["params"]);
$response = json_encode($results);
$redis->lpush("response_queue",$response);
}
}
//开启多个worker进程提供服务
for ($worker_nbr = 0; $worker_nbr < 5; $worker_nbr++) {
$pid = pcntl_fork();
if ($pid == 0) {
worker_thread();
return;
}
}
?>
  运行效果如下:

root # for((i=10;i>=1;i--)) ; do lynx -dump http://127.0.0.1:1987/fetch?st=$i & done
[1] 14112
[2] 14113
[3] 14114
[4] 14115
[5] 14116
[6] 14117
[7] 14118
[8] 14119
[9] 14120
[10] 14121
root # Got Result:     3
Got Result:     4
Got Result:     2
Got Result:     7
Got Result:     1
Got Result:     9
Got Result:     6
timeout
timeout
timeout
[1]   Done                    lynx -dump http://127.0.0.1:1987/fetch?st=$i
[2]   Done                    lynx -dump http://127.0.0.1:1987/fetch?st=$i
[3]   Done                    lynx -dump http://127.0.0.1:1987/fetch?st=$i
[4]   Done                    lynx -dump http://127.0.0.1:1987/fetch?st=$i
[5]   Done                    lynx -dump http://127.0.0.1:1987/fetch?st=$i
[6]   Done                    lynx -dump http://127.0.0.1:1987/fetch?st=$i
[7]   Done                    lynx -dump http://127.0.0.1:1987/fetch?st=$i
[8]   Done                    lynx -dump http://127.0.0.1:1987/fetch?st=$i
[9]-  Done                    lynx -dump http://127.0.0.1:1987/fetch?st=$i
[10]+  Done                    lynx -dump http://127.0.0.1:1987/fetch?st=$i
  
 这只是一个简单的demo,为了防止redis,workers进程挂掉或者客户端异常断开,还需要做些异常处理,比如设置请求超时,捕获一些空指针等,超时需要将continuation从globalMap中剔除。
  redis数据库中存储的内容如下,可以看出虽然经后台处理后顺序变化了,但是对应关系正确:

redis 127.0.0.1:6379> lrange request_queue 0 15
1) "{\"request_sid\":\"igiwkwnb715aphw8uvtfa6rj\",\"params\":\"3\"}"
2) "{\"request_sid\":\"wsrglxa3h6ef19ik5i0nbiiys\",\"params\":\"2\"}"
3) "{\"request_sid\":\"tyiqoj6awj5t16ddpqusftwc8\",\"params\":\"6\"}"
4) "{\"request_sid\":\"1052tgkiyy7c31bmxjtbom7ca\",\"params\":\"5\"}"
5) "{\"request_sid\":\"17jo1xwnnkd3h1mhcqcfplrl5k\",\"params\":\"8\"}"
6) "{\"request_sid\":\"1xk521sq6vmmf6enxauwzduj9\",\"params\":\"4\"}"
7) "{\"request_sid\":\"1cxnir1slgjiq1o2n3xwznh0kk\",\"params\":\"9\"}"
8) "{\"request_sid\":\"961vf8hao3stsv4vt1qif3ws\",\"params\":\"7\"}"
9) "{\"request_sid\":\"35pfn5au6p8qdbri17p636si\",\"params\":\"10\"}"
10) "{\"request_sid\":\"1ca4wy8qsfr7av0hwk8xtlqhp\",\"params\":\"1\"}"
redis 127.0.0.1:6379> lrange response_queue 0 15
1) "{\"request_sid\":\"tyiqoj6awj5t16ddpqusftwc8\",\"results\":\"6\"}"
2) "{\"request_sid\":\"igiwkwnb715aphw8uvtfa6rj\",\"results\":\"3\"}"
3) "{\"request_sid\":\"wsrglxa3h6ef19ik5i0nbiiys\",\"results\":\"2\"}"
4) "{\"request_sid\":\"35pfn5au6p8qdbri17p636si\",\"results\":\"10\"}"
5) "{\"request_sid\":\"1052tgkiyy7c31bmxjtbom7ca\",\"results\":\"5\"}"
6) "{\"request_sid\":\"1cxnir1slgjiq1o2n3xwznh0kk\",\"results\":\"9\"}"
7) "{\"request_sid\":\"17jo1xwnnkd3h1mhcqcfplrl5k\",\"results\":\"8\"}"
8) "{\"request_sid\":\"961vf8hao3stsv4vt1qif3ws\",\"results\":\"7\"}"
9) "{\"request_sid\":\"1xk521sq6vmmf6enxauwzduj9\",\"results\":\"4\"}"
10) "{\"request_sid\":\"1ca4wy8qsfr7av0hwk8xtlqhp\",\"results\":\"1\"}"

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-347731-1-1.html 上篇帖子: groovy 和jetty 搭建小webapp应用(转) 下篇帖子: jetty 配置jndi数据源(Postgres连接池),Spring中使用
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表