NiFi 관련 사내 세미나도 마무리되었고, 이제 NiFi로 유연하게 문제를 해결해야할 때인 것 같다
그래서 이번에는 NiFi에서 처리한 데이터를 Enrich하는 방법으로 DistributedMapCache Processor를 사용해서 해결하고자한다
매번 DB가서 해당 메타정보를 가져온다던지, 아니면 배치로 테이블로 Join해서 enrich하는 방법도 있겠지만, Data Flow을 한 곳에서 처리하고 싶어서 !
DistributedMapCache Processor
DistributedMapCache를 사용하기 위해 필요한 Processor 및 Controller Services
Processor
PutDistributedMapCache
FetchDistributedMapCache
Controller Service
DistributedMapCacheClientService
DistributedMapCacheServer
DistributedMapCacheServer와 통신하는 Client
Cluster끼리 Cache된 Map 공유하기 위함
설정값은 DistributedMapCacheServer의 hostname과 port만 설정함
Cluster 환경에서는 hostname을 localhost로 설정
github
모 여기까지는 사용법이고, 내가 재밌게 봤던 것은 Cache하는 자료구조는 무엇이고, Eviction 전략을 어떻게 구현했는가 궁금했음
캐시에 사용되는 Map은 HashMap을 사용하고, Eviction을 처리하기 위해서는 ConcurrentSkipListMap 사용함
ConcurrentSkipListMap에 Eviction 전략에 따라서 각 Comparator를 구현해놓았음!
간단하게 Map 만들고, Evict하는 함수만 보면 다음과 같다
생각보다 소스코드가 간결해서 좋았음!
public class SimpleMapCache implements MapCache {
private static final Logger logger = LoggerFactory.getLogger(SimpleMapCache.class);
private final Map<ByteBuffer, MapCacheRecord> cache = new HashMap<>();
private final SortedMap<MapCacheRecord, ByteBuffer> inverseCacheMap;
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
private final Lock readLock = rwLock.readLock();
private final Lock writeLock = rwLock.writeLock();
private final String serviceIdentifier;
private final int maxSize;
public SimpleMapCache(final String serviceIdentifier, final int maxSize, final EvictionPolicy evictionPolicy) {
// need to change to ConcurrentMap as this is modified when only the readLock is held
inverseCacheMap = new ConcurrentSkipListMap<>(evictionPolicy.getComparator());
this.serviceIdentifier = serviceIdentifier;
this.maxSize = maxSize;
@Override
public String toString() {
return "SimpleMapCache[service id=" + serviceIdentifier + "]";
// don't need synchronized because this method is only called when the writeLock is held, and all
// public methods obtain either the read or write lock
private MapCacheRecord evict() {
if (cache.size() < maxSize) {
return null;
final MapCacheRecord recordToEvict = inverseCacheMap.firstKey();
final ByteBuffer valueToEvict = inverseCacheMap.remove(recordToEvict);
cache.remove(valueToEvict);
if (logger.isDebugEnabled()) {
logger.debug("Evicting value {} from cache", new String(valueToEvict.array(), StandardCharsets.UTF_8));
return recordToEvict;
//.....
//....