添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

NiFi 관련 사내 세미나도 마무리되었고, 이제 NiFi로 유연하게 문제를 해결해야할 때인 것 같다

그래서 이번에는 NiFi에서 처리한 데이터를 Enrich하는 방법으로 DistributedMapCache Processor를 사용해서 해결하고자한다

매번 DB가서 해당 메타정보를 가져온다던지, 아니면 배치로 테이블로 Join해서 enrich하는 방법도 있겠지만, Data Flow을 한 곳에서 처리하고 싶어서 !

DistributedMapCache Processor

DistributedMapCache를 사용하기 위해 필요한 Processor 및 Controller Services

Processor

  • PutDistributedMapCache
  • FetchDistributedMapCache
  • Controller Service

  • DistributedMapCacheClientService
  • DistributedMapCacheServer
  • DistributedMapCacheServer와 통신하는 Client
  • Cluster끼리 Cache된 Map 공유하기 위함
  • 설정값은 DistributedMapCacheServer의 hostname과 port만 설정함
  • Cluster 환경에서는 hostname을 localhost로 설정
  • github
  • 모 여기까지는 사용법이고, 내가 재밌게 봤던 것은 Cache하는 자료구조는 무엇이고, Eviction 전략을 어떻게 구현했는가 궁금했음

    캐시에 사용되는 Map은 HashMap을 사용하고, Eviction을 처리하기 위해서는 ConcurrentSkipListMap 사용함

  • ConcurrentSkipListMap에 Eviction 전략에 따라서 각 Comparator를 구현해놓았음!
  • 간단하게 Map 만들고, Evict하는 함수만 보면 다음과 같다

  • 생각보다 소스코드가 간결해서 좋았음!
  • public class SimpleMapCache implements MapCache {
        private static final Logger logger = LoggerFactory.getLogger(SimpleMapCache.class);
        private final Map<ByteBuffer, MapCacheRecord> cache = new HashMap<>();
        private final SortedMap<MapCacheRecord, ByteBuffer> inverseCacheMap;
        private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
        private final Lock readLock = rwLock.readLock();
        private final Lock writeLock = rwLock.writeLock();
        private final String serviceIdentifier;
        private final int maxSize;
        public SimpleMapCache(final String serviceIdentifier, final int maxSize, final EvictionPolicy evictionPolicy) {
            // need to change to ConcurrentMap as this is modified when only the readLock is held
            inverseCacheMap = new ConcurrentSkipListMap<>(evictionPolicy.getComparator());
            this.serviceIdentifier = serviceIdentifier;
            this.maxSize = maxSize;
        @Override
        public String toString() {
            return "SimpleMapCache[service id=" + serviceIdentifier + "]";
        // don't need synchronized because this method is only called when the writeLock is held, and all
        // public methods obtain either the read or write lock
        private MapCacheRecord evict() {
            if (cache.size() < maxSize) {
                return null;
            final MapCacheRecord recordToEvict = inverseCacheMap.firstKey();
            final ByteBuffer valueToEvict = inverseCacheMap.remove(recordToEvict);
            cache.remove(valueToEvict);
            if (logger.isDebugEnabled()) {
                logger.debug("Evicting value {} from cache", new String(valueToEvict.array(), StandardCharsets.UTF_8));
            return recordToEvict;
        //.....
        //....