千家信息网

spring aop + xmemcached 配置service层缓存策略

发表于:2024-11-28 作者:千家信息网编辑
千家信息网最后更新 2024年11月28日,Memcached 作用与使用 基本介绍1,对于缓存的存取方式,简言之,就是以键值对的形式将数据保存在内存中。在日常业务中涉及的操作无非就是增删改查。加入缓存机制后,查询的时候,对数据进行缓存,增删改
千家信息网最后更新 2024年11月28日spring aop + xmemcached 配置service层缓存策略

Memcached 作用与使用 基本介绍

1,对于缓存的存取方式,简言之,就是以键值对的形式将数据保存在内存中。在日常业务中涉及的操作无非就是增删改查。加入缓存机制后,查询的时候,对数据进行缓存,增删改的时候,清除缓存即可。这其中对于缓存的闭合就非常重要,如果缓存没有及时得到更新,那用户就会获取到过期数据,就会产生问题。

2,对于单一业务的缓存管理(数据库中只操作单表),只需生成一个key,查询时,使用key,置入缓存;增删改时,使用key,清除缓存。将key与表绑定,操作相对简单。

3,但是在现实业务中,更多的是对关联表的增删改查(数据库多表操作),业务之间互相关联,数据库中的某张表不止一个业务再进行操作,将缓存拦截在service层,对业务进行缓存,对多表进行缓存。

4,业务层缓存实现策略:

  4.1,在缓存中建立一个key为"union_query",value为"hashmap('简称uqmap')"的缓存,prefix保存的是当前业务操作涉及到的数据库表名的组合(数据库表名的唯一性),使用'|'分隔(例 prefix="A|B",此次业务将操作A表与B表),uqversion是业务版本号,从0开始递增。

  4.2,调用一个查询业务时,对数据进行缓存,设置operation为1,告诉cache对象,这是一个缓存操作,例如调用 queryAB(args[])方法时,cache对象切入,将prefix(即"A|B")与uqversion(初始化为0),存入uqmap中进行缓存。

  4.3,将prefix,uqversion,方法明+参数,进行拼接,使用md5进行加密后作为一个key,将方法的结果集作为value,进行缓存。至此缓存成功。

  4.4,当第二个请求来调用queryAB时,cache对象切入,首先,查询uqmap对象,使用prefix找到对应的uqversion,然后,通过拼接加密获取key,最后取得结果集进行返回。

  4.5,当有一个updateA方法被调用时,设置operation为4,告诉cache对象,这是一个删除缓存的操作,此时prefix的值为"A",cache对象切入,获取全局的uqmap,遍历其中的prefix,是否包含了表A的名称:如果包含,则更新此prefix的uqversion进行自增,uqversion一旦发生变化,4.3中组合的key将不复存在,业务缓存也就消失了。(对于复杂的updateAB方法,遍历prefix要复杂一点,可以实现)

  4.6,当第三个请求来调用queryAB时,可以获取到uqversion,组合成key后,但是没有对应的value。此时确定缓存不存在时,继续正常执行方法,获取结果集,返回给客户的同时,将结果集进行缓存。

5,对于缓存的操作,网上有三种api可以选择(memcached client forjava、spymemcached、xmemcached),具体的好坏,本人在这就不做分析。本人使用的是XMemcached api。

具体实现细节:

1,新建 @interface Annotation{ } 定义一个注解 @Annotation,一个注解是一个类。定义缓存策略。

import java.lang.annotation.Documented;import java.lang.annotation.ElementType;import java.lang.annotation.Inherited;import java.lang.annotation.Retention;import java.lang.annotation.RetentionPolicy;import java.lang.annotation.Target;/** * 用于查找的时候,放置缓存信息 * @author shufeng */@Target(ElementType.METHOD)@Retention(RetentionPolicy.RUNTIME)@Documented@Inheritedpublic @interface XmemCache{        /**         * 值为当前操作的表名,表名唯一         * 涉及到多表操作,使用|分隔         */        String prefix() default "";                /*         *    缓存有效期 设置,单位为秒         *    指定间隔时间,默认值为3600秒(1小时)         * */        int interval() default 3600;                 /**         *     1 从cache里取值,如果未置入cache,则置入         *   2 replace cache value                                         未扩展         *   3 replace cache value,并返回旧值                     未扩展         *   4 remove cache key 从cache里删除对应的缓存                 *   5 remove cache key 从cache里删除对应的缓存,并返回未删除之前的值               未扩展         **/        int operation() default 1; }

2,memcache基础操作类,对一些常用方法进行封装,对memcachedclient进行配置

import java.io.IOException;import java.io.InputStream;import java.util.HashMap;import java.util.Iterator;import java.util.Map;import java.util.Properties;import java.util.Set;import java.util.concurrent.TimeoutException;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.DisposableBean;import com.node.hlhw.rbac.api.constant.Constant;import net.rubyeye.xmemcached.GetsResponse;import net.rubyeye.xmemcached.MemcachedClient;import net.rubyeye.xmemcached.MemcachedClientBuilder;import net.rubyeye.xmemcached.XMemcachedClientBuilder;import net.rubyeye.xmemcached.command.BinaryCommandFactory;import net.rubyeye.xmemcached.exception.MemcachedException;import net.rubyeye.xmemcached.impl.KetamaMemcachedSessionLocator;import net.rubyeye.xmemcached.transcoders.SerializingTranscoder;import net.rubyeye.xmemcached.utils.AddrUtil;/** * @author Melody shufeng * 对memcachedclient进行封装,添加一下常用方法 */public class MemcachedOperate implements DisposableBean {        /*         * timeout - Operation timeout,if the method is not returned in this         * time,throw TimeoutException timeout - operation timeout,in milliseconds         * exp - An expiration time, in seconds. Can be up to 30 days. After 30         * days, is treated as a unix timestamp of an exact date. value - stored         * data         */        private static final Logger logger = LoggerFactory.getLogger(MemcachedOperate.class);                private static Properties PROPERTIES = new Properties();                private static String MEMCACHED_SETTING = "memcached.properties";                private static MemcachedClient memcachedClient;                public static MemcachedClient getClient(){                return memcachedClient;        }                        /**         * 静态代码块,类加载时,初始化缓存客户端         * 确保只创建一个client实例         * author shufeng          */        static {                InputStream in = Object.class.getResourceAsStream("/" + MEMCACHED_SETTING);                try {                        PROPERTIES.load(in);                } catch (IOException e) {                        e.printStackTrace();                }                String servers = PROPERTIES.getProperty("memcached.servers", "");                if (null != servers && !"".equals(servers)) {                        try {                                logger.debug("启动memcached连接");                                MemcachedClientBuilder builder = new XMemcachedClientBuilder(AddrUtil.getAddresses(servers));                                builder.setConnectionPoolSize(100);                                builder.setFailureMode(true);                                builder.setCommandFactory(new BinaryCommandFactory());                                builder.setSessionLocator(new KetamaMemcachedSessionLocator());                                builder.setTranscoder(new SerializingTranscoder());                                memcachedClient = builder.build();                                memcachedClient.setEnableHeartBeat(false); // 关闭心跳                                memcachedClient.flushAll(); // 清空缓存                        } catch (IOException e) {                                e.printStackTrace();                        } catch (TimeoutException e) {                                e.printStackTrace();                        } catch (InterruptedException e) {                                e.printStackTrace();                        } catch (MemcachedException e) {                                e.printStackTrace();                        } catch (Exception e) {                                e.printStackTrace();                        }                }        }                /**         * @param key         * @return 获取value         */        public static Object get(String key) {                Object object = null;                try {                        object = memcachedClient.get(key);                } catch (TimeoutException e) {                        e.printStackTrace();                } catch (InterruptedException e) {                        e.printStackTrace();                } catch (MemcachedException e) {                        e.printStackTrace();                }                return object;        }        public static void setWithNoReply(String key, int exp, Object value) {                try {                        memcachedClient.setWithNoReply(key, exp, value);                } catch (InterruptedException e) {                        e.printStackTrace();                } catch (MemcachedException e) {                        e.printStackTrace();                }        }        /**         * 查询联表的业务版本号 如果为空,则初始化         *          * @param prefix         * @return         */        @SuppressWarnings("unchecked")        public static Long getUnionQueryVersion(String prefix) {                try {                        Map uqmap = null;                        GetsResponse getsresponse = memcachedClient.gets(Constant.UNION_QUERY);                        if (getsresponse == null) {                                uqmap = new HashMap();                                Long uqversion = new Long(1); // 初始化版本号                                uqmap.put(prefix, uqversion);                                if (memcachedClient.cas(Constant.UNION_QUERY, 0, uqmap, 0)) { // 检测插入之前是否被修改过                                        return uqversion; // 插入成功                                } else { // 插入失败,说明在代码运行期间,已经有其他线程去修改了unionquery的缓存,重新进行查询                                        return getUnionQueryVersion(prefix);                                }                        } else {                                long cas = getsresponse.getCas();                                Object uqobj = getsresponse.getValue();                                if (uqobj == null) { // 不存在对象                                        uqmap = new HashMap();                                        Long uqversion = new Long(1); // 初始化版本号                                        uqmap.put(prefix, uqversion);                                        if (memcachedClient.cas(Constant.UNION_QUERY, 0, uqmap, cas)) { // 检测插入之前是否被修改过                                                return uqversion; // 插入成功                                        } else { // 插入失败,说明在代码运行期间,已经有其他线程去修改了unionquery的缓存,重新进行查询                                                return getUnionQueryVersion(prefix);                                        }                                } else {                                        uqmap = (Map) uqobj;                                        Long uqversion = uqmap.get(prefix);                                        if (uqversion == null) { // 不存在此业务版本                                                uqversion = new Long(1); // 初始化版本号                                                uqmap.put(prefix, uqversion);                                                if (memcachedClient.cas(Constant.UNION_QUERY, 0, uqmap, cas)) { // 检测插入之前是否被修改过                                                        return uqversion; // 插入成功                                                } else { // 插入失败,说明在代码运行期间,已经有其他线程去修改了unionquery的缓存,重新进行查询                                                        return getUnionQueryVersion(prefix);                                                }                                        } else {                                                return uqversion;                                        }                                }                        }                } catch (TimeoutException | InterruptedException | MemcachedException e) {                        e.printStackTrace();                        System.err.println("getUnionQueryVersion---Exception");                }                return 1L;        }        /**         * 查询单表的业务版本号 如果为空,则初始化         *          * @return         */        public static Long getVersion(String prefix) {                try {                        GetsResponse getsresponse = memcachedClient.gets(prefix);                        if (getsresponse == null) {                                Long pfversion = new Long(1);                                if (memcachedClient.cas(prefix, 0, pfversion, 0)) {                                        return pfversion;                                } else {                                        return getVersion(prefix);                                }                        } else {                                Object pfobj = getsresponse.getValue();                                long cas = getsresponse.getCas();                                if (pfobj == null) {                                        Long pfversion = new Long(1);                                        if (memcachedClient.cas(prefix, 0, pfversion, cas)) {                                                return pfversion;                                        } else {                                                return getVersion(prefix);                                        }                                } else {                                        return (Long) pfobj;                                }                        }                } catch (TimeoutException | InterruptedException | MemcachedException e) {                        e.printStackTrace();                        System.err.println("getVersion---Exception");                }                return 1L;        }        /**         * shufeng 更新 多表版本号          * 由于存在线程安全问题 ,会覆盖uqmap,更新unionquery业务版本号         * 使用cas方法解决线程安全问题         * 更新unionquery中key包含p1或p2或p3的version         * @param prefix         */        @SuppressWarnings("unchecked")        public static void updateUnionQueryVersion(String prefix) {                try {                        Map uqmap = null;                        GetsResponse getsresponse = memcachedClient.gets(Constant.UNION_QUERY);                        if (getsresponse == null) {                                return;                        } else {                                Object uqobj = getsresponse.getValue();                                long cas = getsresponse.getCas();                                if (uqobj == null) {                                        return;                                } else {                                        uqmap = (HashMap) uqobj;                                        Set uqset = uqmap.keySet(); // 遍历unionquery中的key                                        Iterator quit = uqset.iterator();                                        String uqkey = "";                                        boolean uqflag = false;                                        while (quit.hasNext()) {                                                uqkey = quit.next();                                                if (("|" + uqkey + "|").contains("|" + prefix + "|")) { // key中包含prefix                                                        uqmap.put(uqkey, uqmap.get(uqkey) + 1);          // 更新map                                                        uqflag = true;                                                }                                        }                                        if (uqflag) {                                                if (!memcachedClient.cas(Constant.UNION_QUERY, 0, uqmap, cas)) {                                                        updateUnionQueryVersion(prefix);                                                }                                        }                                }                        }                } catch (TimeoutException | InterruptedException | MemcachedException e) {                        e.printStackTrace();                        System.err.println("updateUnionQueryVersion---Exception");                }        }        /**         * 更新单表版本号         *          * @param prefix         * @return         */        public static void updateVersion(String prefix) {                try {                        GetsResponse getsresponse;                        getsresponse = memcachedClient.gets(prefix);                        if (getsresponse == null) {                                return ;                        } else {                                Object pfobj = getsresponse.getValue();                                long cas = getsresponse.getCas();                                if (pfobj == null) {                                        return ;                                } else {                                        Long pfversion = (Long) pfobj;                                        pfversion += 1;                                        if (!memcachedClient.cas(prefix, 0, pfversion, cas)) {                                                updateVersion(prefix);                                        }                                }                        }                } catch (TimeoutException | InterruptedException | MemcachedException e) {                        e.printStackTrace();                        System.err.println("updateVersion---Exception");                }        }        public void shutdown() {                try {                        memcachedClient.shutdown();                } catch (IOException e) {                        e.printStackTrace();                }        }        @Override        public void destroy() throws Exception {                shutdown();        }}

3,结合spring aop 配置缓存,使用spring aop来切入业务层加入缓存,与业务进行解耦。使用注解进行方便配置。

import java.lang.reflect.Method;import org.aspectj.lang.ProceedingJoinPoint;import org.aspectj.lang.Signature;import org.aspectj.lang.annotation.Around;import org.aspectj.lang.annotation.Aspect;import org.aspectj.lang.annotation.Before;import org.aspectj.lang.annotation.Pointcut;import org.aspectj.lang.reflect.MethodSignature;import org.springframework.stereotype.Component;import com.alibaba.fastjson.JSON;import com.node.hlhw.common.cache.XmemCache;import com.node.hlhw.common.digest.Md5Utils;@Component@Aspectpublic class MemcachedAop {        @Pointcut("execution (* com.node.hlhw.*.service.impl.*.*(..))")        public void pointcut() {        }        // 方法执行前调用        @Before("pointcut()")        public void before() {        }        // 方法执行的前后调用        /**         *          *  改进建议:使用uuid作为版本号,减少版本号的读取,直接生成uuid,进行缓存         *  线程安全问题:存在线程安全问题,但是针对于缓存,问题不大。         *  多线程同一时间重复覆盖一个业务id,还是可以更新缓存         *           * @param call         * @throws Throwable         */        @Around("pointcut()")        public Object doAround(ProceedingJoinPoint call) throws Throwable {                Object result = null;                // 检测是否存在memcached客户端实例                if (MemcachedOperate.getClient() == null) {                        System.err.println("memcached client not exist");                        result = call.proceed();                        return result;                }                Signature signature = call.getSignature();                MethodSignature methodSignature = (MethodSignature) signature;                Method method = methodSignature.getMethod();                                if(!method.isAnnotationPresent(XmemCache.class)){                        result = call.proceed();                        return result;                }                XmemCache xmemcache = method.getAnnotation(XmemCache.class);                                // 获取操作方法                int operation = xmemcache.operation();                // 获取注解前缀,实际使用是为各个业务包名称,一般为表名                String prefix = xmemcache.prefix();                // 无前缀                if(prefix==null||"".equals(prefix)){                        result = call.proceed();                        return result;                }                // 获取注解配置memcached死亡时间 秒单位                int interval = xmemcache.interval();                switch (operation) {                case 1: // 1 从cache里取值,如果未置入cache,则置入                        // 判断prefix是否涉及多表,查看是否包含|                        if (prefix.contains("|")) {                                Long uqversion = MemcachedOperate.getUnionQueryVersion(prefix);                                String combinedkey = generCombinedKey(prefix, uqversion, method, call.getArgs());                                Object resultobj = MemcachedOperate.get(combinedkey);                                if(resultobj == null){                                        result = call.proceed();                                        MemcachedOperate.setWithNoReply(combinedkey, interval, JSON.toJSONString(result));// 缓存数据                                }else{                                        Class returnType = ((MethodSignature) signature).getReturnType();                                         result = JSON.parseObject(resultobj.toString(), returnType);                                }                                                        } else { // 单表操作                                                                Long pfversion = MemcachedOperate.getVersion(prefix);                                String combinedkey = generCombinedKey(prefix, pfversion, method, call.getArgs());                                Object resultobj  = MemcachedOperate.get(combinedkey);                                if(resultobj == null){                                        result = call.proceed();                                        MemcachedOperate.setWithNoReply(combinedkey, interval, JSON.toJSONString(result));// 缓存数据                                }else{                                        Class returnType = ((MethodSignature) signature).getReturnType();                                         result = JSON.parseObject(resultobj.toString(), returnType);                                }                        }                        break;                case 2: // 2 replace cache value                        break;                case 3:                        break;                case 4: // 4 remove cache key 从cache里删除对应 业务版本的缓存                        /*                         * 更新unionquery业务版本号                         * 0,切割 prefix为p1、p2、p3                         * 1,更新prefix为p1或p2或p3的version                         * 2,更新unionquery中key包含p1或p2或p3的version                         */                        if (prefix.contains("|")) {  // 表示涉及到多表,需要清除 单表的缓存,与联表中 包含 当前部分的 缓存                                String[] prefixs = prefix.split("\\|");   // 0.切割 prefix为p1、p2、p3                                for(String pf : prefixs){                                                        MemcachedOperate.updateVersion(pf);  // 1,更新prefix为p1或p2或p3的version                                        MemcachedOperate.updateUnionQueryVersion(pf);                                }                        }else{  //  没有涉及到多表的时候                                MemcachedOperate.updateVersion(prefix);                                MemcachedOperate.updateUnionQueryVersion(prefix);                        }                        result = call.proceed();                        break;                default:                        result = call.proceed();                        break;                }                return result;        }        /**         * 组装key值         * @param key         * @param version         * @param method         * @param args         * @return         */        private String generCombinedKey(String key, Long version, Method method, Object[] args) {                StringBuffer sb = new StringBuffer();                // 获取方法名                String methodName = method.getName();                // 获取参数类型                Object[] classTemps = method.getParameterTypes();                // 存入方法名                sb.append(methodName);                for (int i = 0; i < args.length; i++) {                        sb.append(classTemps[i] + "&");                        if (null == args[i]) {                                sb.append("null");                        } else if ("".equals(args[i])) {                                sb.append("*");                        } else {                                String tt = JSON.toJSONString(args[i]);                                sb.append(tt);                        }                }                sb.append(key);                sb.append(version.toString());                String temp = Md5Utils.getMD5(sb.toString());                return temp;        }}

4,properties文件中配置memcached服务器地址

#host1:port1,host2:port2memcached.servers=192.168.1.1:11211,192.168.1.2:11211

5,修改spring配置文件,声明自动为spring容器中那些配置@aspectJ切面的bean创建代理,织入切面。

6,service层使用注解方式切入缓存

import java.util.Arrays;import java.util.Date;import java.util.List;import java.util.Map;import org.apache.ibatis.session.RowBounds;import org.apache.log4j.Logger;import org.springframework.beans.factory.annotation.Autowired;import com.alibaba.dubbo.common.utils.StringUtils;import com.alibaba.dubbo.config.annotation.Service;import com.node.hlhw.common.cache.XmemCache;import com.node.hlhw.common.digest.ApplicationUtils;import com.node.hlhw.core.service.BaseService;import com.node.hlhw.core.store.IBaseStore;import com.node.hlhw.core.store.PageParam;import com.node.hlhw.rbac.api.dao.UserRoleDao;import com.node.hlhw.rbac.api.entity.UserRole;import com.node.hlhw.rbac.api.service.UserRoleService;/** * @author Melody * 处理用户角色 */@Service(version = "1.0.0")public class UserRoleServiceImpl extends BaseService implements                UserRoleService {        private static final Logger logger = Logger                        .getLogger(UserRoleServiceImpl.class);        @Autowired        public UserRoleDao userRoleDao;        @Override        protected IBaseStore getBaseDao() {                return userRoleDao;        }                /*          * 单表操作,prefix为表名,operation为4,只进行缓存的删除操作         */        @XmemCache(prefix="userrole",operation=4)        public void insertUserRole(UserRole userRole) throws Exception {                userRoleDao.insertUserRole(userRole);                logger.info("插入用户角色数据");        }        /* (non-Javadoc)         * 此方法操作了两个表,role与userole,使用'|'进行分隔         * operation为1,表示缓存操作,对结果集进行缓存         * interval表示缓存时间默认不填为3600秒,也可指定具体时长         */        @Override        @XmemCache(prefix="role|userrole",interval=3600 , operation=1)        public List> selectUserRoleList(UserRole userrole, PageParam pageParam) throws Exception {                RowBounds rowBounds = new RowBounds(pageParam.getOffset(),pageParam.getLimit());                 List>  list = userRoleDao.selectUserRoleList(userrole,rowBounds);                return  list ;        }        @Override        @XmemCache(prefix="userrole" , operation=4)        public void modifyUserRole(UserRole userrole, String[] roleids)throws Exception {                                //删除所包含的角色                userRoleDao.deleteByUserRole(userrole);                for(String roleid : roleids){                        if(!StringUtils.isEmpty(roleid)){                                userrole.setCreatetime(new Date());                                userrole.setRoleid(roleid);                                userrole.setUuid(ApplicationUtils.getUUID());                                userRoleDao.insertUserRole(userrole);                        }                }                        }        @Override        @XmemCache(prefix="userrole" , operation=1)        public boolean existsRef(String roleids)throws Exception {                String [] roleid = roleids.split(",");                List roleidlist = Arrays.asList(roleid);                return userRoleDao.existsRef(roleidlist)>0?true:false;        }}


0