最近给客户准备培训,看到Coherence可以通过三种方式批量加载数据,分别是:
- Custom application
- InvocableMap - PreloadRequest
- Invocation Service
Custom application的方式简单易懂,基本就是通过put和putAll方法实现,就不再纠结了。但问题是无论是put还是putAll
都是一个串行过程,如果装载大量数据的话,就需要有一种并行机制实现并行装载。
本文对第二种方式InvocableMap做一些研究,PreloadRequest主要是基于一个entry的集合通过Cache Loader进行装载,
其命令主要是:
包含如下特征:
- 装载前必须知道要装载的所有的key值。
- 本身装载的动作通过CacheLoader来实现。
- 装载是并行过程,每个存储节点负责把分布在自己Cache的内容按照key值,从数据库中装载
代码:
Person.java
package dataload; import java.io.Serializable; public class Person implements Serializable { private String Id; private String Firstname;public void setId(String Id) { this.Id = Id; }public String getId() { return Id; }public void setFirstname(String Firstname) { this.Firstname = Firstname; }public String getFirstname() { return Firstname; }public void setLastname(String Lastname) { this.Lastname = Lastname; }public String getLastname() { return Lastname; }public void setAddress(String Address) { this.Address = Address; }public String getAddress() { return Address; } private String Lastname; private String Address; public Person() { super(); } public Person(String sId,String sFirstname,String sLastname,String sAddress) { Id=sId; Firstname=sFirstname; Lastname=sLastname; Address=sAddress; }} |
实现CacheLoader的DBCacheStore.java,比较核心的是看load方法
package dataload; import com.tangosol.net.CacheFactory; import com.tangosol.net.NamedCache;import com.tangosol.net.cache.CacheStore;import com.tangosol.util.Base;import com.tangosol.util.InvocableMap; import java.sql.DriverManager; import java.sql.Connection;import java.sql.PreparedStatement;import java.sql.ResultSet;import java.sql.SQLException;import java.util.Collection; import java.util.Hashtable;import java.util.Iterator;import java.util.LinkedList;import java.util.List;import java.util.Map;import javax.naming.Context; import javax.naming.InitialContext;import java.sql.ResultSet; import java.sql.Statement;import java.util.Collections; import java.util.HashMap;import java.util.Set;import javax.naming.NamingException; /** * An example implementation of CacheStore * interface. * * @author erm 2003.05.01 */public class DBCacheStore extends Base implements CacheStore { // ----- constructors --------------------------------------------------- /** * Constructs DBCacheStore for a given database table. * * @param sTableName the db table name */ public DBCacheStore(String sTableName) { m_sTableName = sTableName; cache = CacheFactory.getCache("SampleCache"); }// ---- accessors ------------------------------------------------------- /** * Obtain the name of the table this CacheStore is persisting to. * * @return the name of the table this CacheStore is persisting to */ public String getTableName() { return m_sTableName; }/** * Obtain the connection being used to connect to the database. * * @return the connection used to connect to the database */ public Connection getConnection() { try { Context ctx = null; Hashtable<String,String> ht = new Hashtable<String,String>(); ht.put(Context.INITIAL_CONTEXT_FACTORY,"weblogic.jndi.WLInitialContextFactory"); ht.put(Context.PROVIDER_URL,"t3://localhost:7001"); ctx = new InitialContext(ht); javax.sql.DataSource ds= (javax.sql.DataSource) ctx.lookup("ds"); m_con = ds.getConnection(); } catch (Exception e) { System.out.println(e.getMessage()); } return m_con; } // ----- CacheStore Interface --------------------------------------------/** * Return the value associated with the specified key, or null if the * key does not have an associated value in the underlying store. * * @param oKey key whose associated value is to be returned * * @return the value associated with the specified key, or * <tt>null</tt> if no value is available for that key */ public Object load(Object oKey) { Object oValue = null; Person person = null; Connection con = getConnection(); String sSQL = "SELECT id, firstname,lastname,address FROM " + getTableName() + " WHERE id = ?"; System.out.println("Enter load= "+sSQL); try { PreparedStatement stmt = con.prepareStatement(sSQL);stmt.setString(1, String.valueOf(oKey)); System.out.println("key="+String.valueOf(oKey)); ResultSet rslt = stmt.executeQuery(); if (rslt.next()) { person = new Person(rslt.getString("id"),rslt.getString("firstname"),rslt.getString("lastname"),rslt.getString("address")); oValue = person; if (rslt.next()) { throw new SQLException("Not a unique key: " + oKey); } } stmt.close(); } catch (SQLException e) { System.out.println("=============="+e.getMessage()); //throw ensureRuntimeException(e, "Load failed: key=" + oKey); } return oValue; }/** * Store the specified value under the specific key in the underlying * store. This method is intended to support both key/value creation * and value update for a specific key. * * @param oKey key to store the value under * @param oValue value to be stored * * @throws UnsupportedOperationException if this implementation or the * underlying store is read-only */ public void store(Object oKey, Object oValue) { /* Connection con = getConnection(); String sTable = getTableName(); String sSQL; if (load(oKey) != null) { sSQL = "UPDATE " + sTable + " SET value = ? where id = ?"; } else { sSQL = "INSERT INTO " + sTable + " (value, id) VALUES (?,?)"; } try { PreparedStatement stmt = con.prepareStatement(sSQL); int i = 0; stmt.setString(++i, String.valueOf(oValue)); stmt.setString(++i, String.valueOf(oKey)); stmt.executeUpdate(); stmt.close(); } catch (SQLException e) { throw ensureRuntimeException(e, "Store failed: key=" + oKey); } */ }/** * Remove the specified key from the underlying store if present. * * @param oKey key whose mapping is to be removed from the map * * @throws UnsupportedOperationException if this implementation or the * underlying store is read-only */ public void erase(Object oKey) { /* Connection con = getConnection(); String sSQL = "DELETE FROM " + getTableName() + " WHERE id=?"; try { PreparedStatement stmt = con.prepareStatement(sSQL);stmt.setString(1, String.valueOf(oKey)); stmt.executeUpdate(); stmt.close(); } catch (SQLException e) { throw ensureRuntimeException(e, "Erase failed: key=" + oKey); }*/ }/** * Remove the specified keys from the underlying store if present. * * @param colKeys keys whose mappings are being removed from the cache * * @throws UnsupportedOperationException if this implementation or the * underlying store is read-only */ public void eraseAll(Collection colKeys) { throw new UnsupportedOperationException(); }/** * Return the values associated with each the specified keys in the * passed collection. If a key does not have an associated value in * the underlying store, then the return map will not have an entry * for that key. * * @param colKeys a collection of keys to load * * @return a Map of keys to associated values for the specified keys */ public Map loadAll(Collection colKeys) { /* System.out.println("Enter LoadAll Map"); Map mapResults = new HashMap(); for (Object entry : (Set<Object>) colKeys) { System.out.println(entry); mapResults.put(entry, load(entry)); } return mapResults; */ return Collections.emptyMap(); //throw new UnsupportedOperationException(); }/** * Store the specified values under the specified keys in the underlying * store. This method is intended to support both key/value creation * and value update for the specified keys. * * @param mapEntries a Map of any number of keys and values to store * * @throws UnsupportedOperationException if this implementation or the * underlying store is read-only */ public void storeAll(Map mapEntries) { throw new UnsupportedOperationException(); }/** * Iterate all keys in the underlying store. * * @return a read-only iterator of the keys in the underlying store */ public Iterator keys() { Connection con = getConnection(); String sSQL = "SELECT id FROM " + getTableName(); List list = new LinkedList(); try { PreparedStatement stmt = con.prepareStatement(sSQL); ResultSet rslt = stmt.executeQuery(); while (rslt.next()) { Object oKey = rslt.getString(1); list.add(oKey); } stmt.close(); } catch (SQLException e) { throw ensureRuntimeException(e, "Iterator failed"); }return list.iterator(); }// ----- data members --------------------------------------------------- /** * The connection. */ protected Connection m_con;/** * The db table name. */ protected String m_sTableName;protected NamedCache cache; } |
CoherencePreLoad.java程序
package dataload; import java.sql.ResultSet; import java.sql.Statement;import com.tangosol.net.CacheFactory; import com.tangosol.net.NamedCache;import com.tangosol.util.InvocableMap; import com.tangosol.util.processor.PreloadRequest;import java.sql.Connection; import java.util.Collection; import java.util.Collections;import java.util.HashSet;import java.util.Hashtable;import javax.naming.Context; import javax.naming.InitialContext; public class CoherencePreLoad { public CoherencePreLoad() { super(); }public static void main(String[] args) { CoherencePreLoad coherencePreLoad = new CoherencePreLoad(); NamedCache cache = CacheFactory.getCache("SampleCache"); //cache.put("1","eric"); String sql = "select id from persons order by id"; Connection con = null; Statement s = null; ResultSet rs = null; int count =0; Collection keys = new HashSet();; String key = null; try{Context ctx = null; Hashtable<String,String> ht = new Hashtable<String,String>(); ht.put(Context.INITIAL_CONTEXT_FACTORY,"weblogic.jndi.WLInitialContextFactory"); ht.put(Context.PROVIDER_URL,"t3://localhost:7001"); ctx = new InitialContext(ht); javax.sql.DataSource ds= (javax.sql.DataSource) ctx.lookup("ds"); con = ds.getConnection(); s = con.createStatement(); rs = s.executeQuery(sql); System.out.println("Loading with SQL "); while (rs.next()) { key = rs.getString(1); System.out.println(key); keys.add(key); count++; // this loads 1000 items at a time into the cache if ((count++ % 1000) == 0) { cache.invokeAll(keys, new PreloadRequest() ); keys.clear(); } } if (!keys.isEmpty()) { System.out.println("Enter"); //InvocableMap.EntryProcessor preloadrequest = new PreloadRequest(); cache.invokeAll(keys, new PreloadRequest() ); System.out.println("finish"); } }catch (Exception e) { System.out.println("============"+e.getStackTrace()); System.out.println(e.getMessage()); } }} |
然后需要在缓存的配置中进行设置
<?xml version="1.0"?> <!DOCTYPE cache-config SYSTEM "cache-config.dtd"><cache-config> <caching-scheme-mapping> <!-- Caches with names that start with 'DBBacked' will be created as distributed-db-backed. --> <cache-mapping> <cache-name>SampleCache</cache-name> <scheme-name>distributed-pof</scheme-name> </cache-mapping> </caching-scheme-mapping> <caching-schemes> <!-- DB Backed Distributed caching scheme. --> <distributed-scheme> <scheme-name>distributed-pof</scheme-name> <service-name>DistributedCache</service-name> <backing-map-scheme> <read-write-backing-map-scheme><internal-cache-scheme> <local-scheme/> </internal-cache-scheme> <cachestore-scheme> <class-scheme> <class-name>dataload.DBCacheStore</class-name> <init-params> <init-param> <param-type>java.lang.String</param-type> <param-value>persons</param-value> </init-param> </init-params> </class-scheme> </cachestore-scheme> </read-write-backing-map-scheme> </backing-map-scheme><listener/> <autostart>true</autostart> <local-storage>true</local-storage> </distributed-scheme></caching-schemes> </cache-config> |
需要注意的是,必须在启动Cache-server的时候加入weblogic.jar以及dataload的包,因为在DBCacheStore中用到了weblogic JNDI去寻找数据源。
输出结果如下:
在jdeveloper端的Coherence节点
在coherence server端的存储节点
通过visualVM监控是否已经写入缓存