如何使用java语言实现memcache服务器
小标 2018-07-17 来源 : 阅读 1050 评论 0

摘要:本文主要向大家介绍了如何使用java语言实现memcache服务器,通过具体的内容向大家展示,希望对大家学习JAVA语言有所帮助。

本文主要向大家介绍了如何使用java语言实现memcache服务器,通过具体的内容向大家展示,希望对大家学习JAVA语言有所帮助。

这篇文章将会涉及以下内容:

· Java Socket多线程服务器

· Java IO

· Concurrency

· Memcache特性和协议

Memcache

Memcache is an in-memory key-value store for small chunks of arbitrary data (strings, objects)from results of database calls, API calls, or page rendering.

即内存缓存数据库,是一个键值对数据库。该数据库的存在是为了将从其他服务中获取的数据暂存在内存中,在重复访问时可以直接从命中的缓存中返回。既加快了访问速率,也减少了其他服务的负载。这里将实现一个单服务器版本的Memcache,并且支持多个客户端的同时连接。

客户端将与服务器建立telnet连接,然后按照Memcache协议与服务器缓存进行交互。这里实现的指令为get,set和del。先来看一下各个指令的格式

set

set属于存储指令,存储指令的特点时,第一行输入基本信息,第二行输入其对应的value值。

set <key> <flags> <exptime> <bytes> [noreply]\r\n<value>\r\n

如果存储成功,将会返回STORED,如果指令中包含noreply属性,则服务器将不会返回信息。
该指令中每个域的内容如下:

key: 键flags: 16位无符号整数,会在get时随键值对返回exptime: 过期时间,以秒为单位

bytes:即将发送的value的长度

noreply:是否需要服务器响应,为可选属性

如果指令不符合标准,服务器将会返回ERROR。

get

get属于获取指令,该指令特点如下:

get <key>*\r\n

它支持传入多个key的值,如果缓存命中了一个或者多个key,则会返回相应的数据,并以END作为结尾。如果没有命中,则返回的消息中不包含该key对应的值。格式如下:

VALUE <key> <flags> <bytes>\r\n<data block>\r\n

VALUE <key> <flags> <bytes>\r\n<data block>\r\n

END

del

删除指令,该指令格式如下:

del <key> [noreply]\r\n

如果删除成功,则返回DELETED\r\n,否则返回NOT_FOUND。如果有noreply参数,则服务器不会返回响应。

JAVA SOCKET

JAVA SOCKET需要了解的只是包括TCP协议,套接字,以及IO流。这里就不详细赘述,可以参考我的这系列文章,也建议去阅读JAVA Network Programming。一书。

代码实现

这里贴图功能出了点问题,可以去文末我的项目地址查看类图。

这里采用了指令模式和工厂模式实现指令的创建和执行的解耦。指令工厂将会接收commandLine并且返回一个Command实例。每一个Command都拥有execute方法用来执行各自独特的操作。这里只贴上del指令的特殊实现。

**

 * 各种指令

 * 目前支持get,set,delete

 *

 * 以及自定义的

 * error,end

 */public interface Command {

 

    /**

     * 执行指令

     * @param reader

     * @param writer

     */

    void execute(Reader reader, Writer writer);

 

    /**

     * 获取指令的类型

     * @return

     */

    CommandType getType();

}

/**

 * 指令工厂 单一实例

 */public class CommandFactory {

 

    private static CommandFactory commandFactory;

    private static Cache<Item> memcache;

    private CommandFactory(){}

 

    public static CommandFactory getInstance(Cache<Item> cache) {

        if (commandFactory == null) {

            commandFactory = new CommandFactory();

            memcache = cache;

        }

        return commandFactory;

    }

 

    /**

     * 根据指令的类型获取Command

     * @param commandLine

     * @return

     */

    public Command getCommand(String commandLine){

        if (commandLine.matches("^set .*$")){

            return new SetCommand(commandLine, memcache);

        }else if (commandLine.matches("^get .*$")){

            return new GetCommand(commandLine, memcache);

        }else if (commandLine.matches("^del .*$")){

            return new DeleteCommand(commandLine, memcache);

        }else if (commandLine.matches("^end$")){

            return new EndCommand(commandLine);

        }else{

            return new ErrorCommand(commandLine, ErrorCommand.ErrorType.ERROR);

        }

    }

}

/**

 * 删除缓存指令

 */public class DeleteCommand implements Command{

 

    private final String command;

    private final Cache<Item> cache;

 

    private String key;

    private boolean noReply;

    public DeleteCommand(final String command, final Cache<Item> cache){

        this.command = command;

        this.cache = cache;

        initCommand();

    }

 

    private void initCommand(){

        if (this.command.contains("noreply")){

            noReply = true;

        }

        String[] info = command.split(" ");

        key = info[1];

    }

 

    @Override

    public void execute(Reader reader, Writer writer) {

        BufferedWriter bfw = (BufferedWriter) writer;

        Item item = cache.delete(key);

        if (!noReply){

            try {

                if (item == null){

                    bfw.write("NOT_FOUND\r\n");

                }else {

                    bfw.write("DELETED\r\n");

                }

                bfw.flush();

            } catch (IOException e) {

                try {

                    bfw.write("ERROR\r\n");

                    bfw.flush();

                } catch (IOException e1) {

                    e1.printStackTrace();

                }

                e.printStackTrace();

            }

        }

 

 

    }

 

    @Override

    public CommandType getType() {

        return CommandType.SEARCH;

    }

}

然后是实现内存服务器,为了支持先进先出功能,这里使用了LinkedTreeMap作为底层实现,并且重写了removeOldest方法。同时还使用CacheManager的后台线程及时清除过期的缓存条目。

public class Memcache implements Cache<Item>{

    private Logger logger = Logger.getLogger(Memcache.class.getName());

 

    //利用LinkedHashMap实现LRU

    private static LinkedHashMap<String, Item> cache;

 

 

    private final int maxSize;

 

    //负载因子

    private final float DEFAULT_LOAD_FACTOR = 0.75f;

 

    public Memcache(final int maxSize){

        this.maxSize = maxSize;

 

        //确保cache不会在达到maxSize之后自动扩容

        int capacity = (int) Math.ceil(maxSize / DEFAULT_LOAD_FACTOR) + 1;

 

        this.cache = new LinkedHashMap<String, Item>(capacity, DEFAULT_LOAD_FACTOR, true){

            @Override

            protected boolean removeEldestEntry(Map.Entry<String,Item> eldest) {

                if (size() > maxSize){

                    logger.info("缓存数量已经达到上限,会删除最近最少使用的条目");

                }

                return size() > maxSize;

            }

        };

 

        //实现同步访问

        Collections.synchronizedMap(cache);

    }

 

 

 

 

    public synchronized boolean isFull(){

        return cache.size() >= maxSize;

    }

 

    @Override

    public Item get(String key) {

        Item item = cache.get(key);

 

        if (item == null){

            logger.info("缓存中key:" + key + "不存在");

            return null;

        }else if(item!=null && item.isExpired()){ //如果缓存过期则删除并返回null

            logger.info("从缓存中读取key:" + key + " value:" + item.getValue() + "已经失效");

            cache.remove(key);

            return null;

        }

 

        logger.info("从缓存中读取key:" + key + " value:" + item.getValue() + " 剩余有效时间" + item.remainTime());

        return item;

    }

 

    @Override

    public void set(String key, Item value) {

        logger.info("向缓存中写入key:" + key + " value:" + value);

        cache.put(key, value);

    }

 

    @Override

    public Item delete(String key) {

        logger.info("从缓存中删除key:" + key);

        return cache.remove(key);

    }

 

    @Override

    public int size(){

        return cache.size();

    }

 

    @Override

    public int capacity() {

        return maxSize;

    }

 

    @Override

    public Iterator<Map.Entry<String, Item>> iterator() {

        return cache.entrySet().iterator();

    }

}

/**

 * 缓存管理器

 * 后台线程

 * 将cache中过期的缓存删除

 */public class CacheManager implements Runnable {

 

    private Logger logger = Logger.getLogger(CacheManager.class.getName());

 

    //缓存

    public Cache<Item> cache;

 

    public CacheManager(Cache<Item> cache){

        this.cache = cache;

    }

 

 

    @Override

    public void run() {

        while (true){

            Iterator<Map.Entry<String, Item>> itemIterator = cache.iterator();

            while (itemIterator.hasNext()){

                Map.Entry<String, Item> entry = itemIterator.next();

                Item item = entry.getValue();

                if(item.isExpired()){

                    logger.info("key:" + entry.getKey() + " value" + item.getValue() + " 已经过期,从数据库中删除");

                    itemIterator.remove();

                }

            }

 

            try {

                //每隔5秒钟再运行该后台程序

                TimeUnit.SECONDS.sleep(5);

            } catch (InterruptedException e) {

                e.printStackTrace();

            }

 

        }

    }

}

最后是实现一个多线程的Socket服务器,这里就是将ServerSocket绑定到一个接口,并且将accept到的Socket交给额外的线程处理。

/**

 * 服务器

 */public class IOServer implements Server {

 

    private boolean stop;

 

    //端口号

    private final int port;

 

    //服务器线程

    private ServerSocket serverSocket;

 

    private final Logger logger = Logger.getLogger(IOServer.class.getName());

 

    //线程池,线程容量为maxConnection

    private final ExecutorService executorService;

 

    private final Cache<Item> cache;

 

    public IOServer(int port, int maxConnection, Cache<Item> cache){

        if (maxConnection<=0) throw new IllegalArgumentException("支持的最大连接数量必须为正整数");

        this.port = port;

        executorService = Executors.newFixedThreadPool(maxConnection);

        this.cache = cache;

    }

 

    @Override

    public void start() {

        try {

            serverSocket = new ServerSocket(port);

            logger.info("服务器在端口"+port+"上启动");

            while (true){

                try {

                    Socket socket = serverSocket.accept();

                    logger.info("收到"+socket.getLocalAddress()+"的连接");

                    executorService.submit(new SocketHandler(socket, cache));

                } catch (IOException e) {

                    e.printStackTrace();

                }

            }

        } catch (IOException e) {

            logger.log(Level.WARNING, "服务器即将关闭...");

            e.printStackTrace();

        } finally {

            executorService.shutdown();

            shutDown();

        }

 

 

    }

 

    /**

     * 服务器是否仍在运行

     * @return

     */

    public boolean isRunning() {

        return !serverSocket.isClosed();

    }

 

    /**

     * 停止服务器

     */

    public void shutDown(){

        try {

            if (serverSocket!=null){

                serverSocket.close();

            }

        } catch (IOException e) {

            e.printStackTrace();

        }

    }

 

}

/**

 * 处理各个客户端的连接

 * 在获得end指令后关闭连接s

 */public class SocketHandler implements Runnable{

 

    private static Logger logger = Logger.getLogger(SocketHandler.class.getName());

 

    private final Socket socket;

 

    private final Cache<Item> cache;

 

    private boolean finish;

 

 

    public SocketHandler(Socket s, Cache<Item> cache){

        this.socket = s;

        this.cache = cache;

    }

 

    @Override

    public void run() {

        try {

            //获取socket输入流

            final BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));

            //获取socket输出流

            final BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));

 

            CommandFactory commandFactory = CommandFactory.getInstance(cache);

 

            while (!finish){

                final String commandLine = reader.readLine();

                logger.info("ip:" + socket.getLocalAddress() + " 指令:" + commandLine);

 

                if (commandLine == null || commandLine.trim().isEmpty()) {

                    continue;

                }

 

                //使用指令工厂获取指令实例

                final Command command = commandFactory.getCommand(commandLine);

                command.execute(reader, writer);

 

                if (command.getType() == CommandType.END){

                    logger.info("请求关闭连接");

                    finish = true;

                }

            }

        } catch (IOException e) {

            e.printStackTrace();

            logger.info("关闭来自" + socket.getLocalAddress() + "的连接");

        } finally {

            try {

                if (socket != null){

                    socket.close();

                }

            } catch (IOException e) {

                e.printStackTrace();

            }

        }

    }

}

以上就是职坐标整理发布关于JAVA的介绍,先祝大家对它有了一定的了解吧,了解更多内容,请关注职坐标编程语言JAVA频道!

本文由 @小标 发布于职坐标。未经许可,禁止转载。
喜欢 | 1 不喜欢 | 0
看完这篇文章有何感觉?已经有1人表态,100%的人喜欢 快给朋友分享吧~
评论(0)
后参与评论

您输入的评论内容中包含违禁敏感词

我知道了

助您圆梦职场 匹配合适岗位
验证码手机号,获得海同独家IT培训资料
选择就业方向:
人工智能物联网
大数据开发/分析
人工智能Python
Java全栈开发
WEB前端+H5

请输入正确的手机号码

请输入正确的验证码

获取验证码

您今天的短信下发次数太多了,明天再试试吧!

提交

我们会在第一时间安排职业规划师联系您!

您也可以联系我们的职业规划师咨询:

小职老师的微信号:z_zhizuobiao
小职老师的微信号:z_zhizuobiao

版权所有 职坐标-一站式IT培训就业服务领导者 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
 沪公网安备 31011502005948号    

©2015 www.zhizuobiao.com All Rights Reserved

208小时内训课程