R&D/hadoop-dna

nProbe skeleton 코드

sunshout 2014. 7. 21. 19:17


nProbe가 동작하는 기본적인 skeleton 구조


Hash Table

definition: 

HashBucket **theHash[MAX_NUM_PCAP_THREADS], **thePrevHash[MAX_NUM_PCAP_THREADS];


#define HASH_SIZE       4096 /* buckets */


void allocateHash(int idx) {
  u_int mallocSize = sizeof(HashBucket*)*hashSize;

  theHash[idx] = (HashBucket**)calloc(1, mallocSize);
  if(theHash[idx] == NULL) {
    traceEvent(TRACE_ERROR, "Not enough memory");
    exit(-1);
  }
}

...

  /* pcap-based sniffing */
  for(idx=0; idx<numPcapThreads; idx++)
    allocateHash(idx);


addPktToHash 에서 hash key 를 만드는 코드

  idx = vlanId+proto+srcHost+dstHost+sport+dport;
  queue_idx = idx % numPcapThreads;


HashTable 관련하여

typedef struct hashBucket {
#ifdef ENABLE_MAGIC
  u_char magic;
#endif
  u_char bucket_expired;  /* Force bucket to expire */
  u_short proto;          /* protocol (e.g. UDP/TCP..) */
  u_char  srcMacAddress[6];
  IpAddress src;
  u_short sport;
  IpAddress dst;
  u_char  dstMacAddress[6];
  u_short dport;
  u_char src2dstTos, dst2srcTos;
  u_short vlanId;
  unsigned short src2dstTcpFlags, dst2srcTcpFlags;
  u_char src2dstFingerprint[FINGERPRINT_LEN], dst2srcFingerprint[FINGERPRINT_LEN];
  struct mpls_labels *mplsInfo;
 
  /* **************** */
 
  u_long bytesSent, pktSent;
  struct timeval firstSeenSent, lastSeenSent;
  u_long bytesRcvd, pktRcvd;
  struct timeval firstSeenRcvd, lastSeenRcvd;
  struct hashBucket *next;
  u_char src2dstPayloadLen;   /* # of bytes stored on the payload */
  unsigned char *src2dstPayload;
  u_char dst2srcPayloadLen;   /* # of bytes stored on the payload */
  unsigned char *dst2srcPayload;
  u_int32_t flags;               /* bitmask (internal) */
  struct timeval nwLatency;      /* network Latency (3-way handshake) */
  struct timeval src2dstApplLatency, dst2srcApplLatency; /* Application Latency */
  u_int32_t src2dstIcmpFlags, dst2srcIcmpFlags;  /* ICMP bitmask */
  u_int16_t src2dstIcmpType, dst2srcIcmpType;    /* ICMP type */
  PluginInformation *plugin;
} HashBucket;


processQueuedPacket

- idx를 가지고 bucket 의 위치를 찾는다


idx %= hashSize;

bkt = theHash[hash_idx][idx];


- bkt 이 존재하며, 현재 bkt 의 sip, dip, proto, sport, dport 가 pkt 의 값과 같으면, 통계값을 업데이트 한다.

    if((bkt->proto == proto)
       && (bkt->vlanId == vlanId)
       && ((cmpIpAddress(bkt->src, src)
        && cmpIpAddress(bkt->dst, dst)
        && (bkt->sport == sport)
        && (bkt->dport == dport))
       || (cmpIpAddress(bkt->src, dst)
           && cmpIpAddress(bkt->dst, src)
           && (bkt->sport == dport)
           && (bkt->dport == sport)))) {
      if(bkt->src.ipType.ipv4 == src.ipType.ipv4) {
    bkt->bytesSent += len, bkt->pktSent += numPkts;
    bkt->lastSeenSent.tv_sec = h->ts.tv_sec, bkt->lastSeenSent.tv_usec = h->ts.tv_usec;
    if(isFragment) NPROBE_FD_SET(FLAG_FRAGMENTED_PACKET_SRC2DST, &(bkt->flags));
    if(proto == IPPROTO_TCP)
      updateTcpFlags(bkt, 0, &h->ts, flags, fingerprint, tos);
    else if((proto == IPPROTO_UDP) || (proto == IPPROTO_ICMP))
      updateApplLatency(proto, bkt, 0, &h->ts, icmpType, icmpCode);

    setPayload(bkt, h, payload, payloadLen, 0);
    bkt->src2dstTcpFlags |= flags; /* Do not move this line before updateTcpFlags(...) */
      } else {
    bkt->bytesRcvd += len, bkt->pktRcvd += numPkts;
    if((bkt->firstSeenRcvd.tv_sec == 0) && (bkt->firstSeenRcvd.tv_usec == 0))
      bkt->firstSeenRcvd.tv_sec = h->ts.tv_sec, bkt->firstSeenRcvd.tv_usec = h->ts.tv_usec;
    bkt->lastSeenRcvd.tv_sec = h->ts.tv_sec, bkt->lastSeenRcvd.tv_usec = h->ts.tv_usec;
    if(isFragment) NPROBE_FD_SET(FLAG_FRAGMENTED_PACKET_DST2SRC, &(bkt->flags));
    if(proto == IPPROTO_TCP)
      updateTcpFlags(bkt, 1, &h->ts, flags, fingerprint, tos);
    else if((proto == IPPROTO_UDP) || (proto == IPPROTO_ICMP))
      updateApplLatency(proto, bkt, 1, &h->ts, icmpType, icmpCode);

    setPayload(bkt, h, payload, payloadLen, 1);
    bkt->dst2srcTcpFlags |= flags; /* Do not move this line before updateTcpFlags(...) */
      }

      pluginCallback(PACKET_CALLBACK, bkt,
             proto,  isFragment, numPkts,  tos,
             vlanId, ehdr, &src,  sport,
             &dst,  dport, len,
             flags,  icmpType, numMplsLabels,
             mplsLabels, fingerprint,
             h, p, payload, payloadLen);

      pthread_spin_unlock(&hashMutex[hash_idx][mutex_idx]);
      return;



- 만약 bkt 의 값과 pkt 의 값이없으면 (collision or Null bucket)

 . 즉 bucket 의 entry 는 linked list 로 관리되고 있음

bkt = (HashBucket*)malloc(sizeof(HashBucket));


/* Put the bucket on top of the list */
  addToList(bkt, &theHash[hash_idx][idx]);


void addToList(HashBucket *bkt, HashBucket **list) {
  bkt->next = *list;
  (*list) = bkt;
}


Expire

- When exports netflow information from hash table?

- hash table walker is another thread


    for(i=0; i<numPcapThreads; i++)
      pthread_create(&walkHashThread[i], NULL, hashWalker, (void*)i);


hashWalker( ) -> walkHash( ) -> queueBucketToExport( )

- walk hash table periodically, if bucket is to expired, move buckets to exportQueue

/*
  From the tests carried on, the very best approach
  is to have a periodic thread that scans for expired
  flows.
*/
void* hashWalker(void* _idx) {
  u_int numSlots = 0;
  u_char first_run = 1;
  u_short sleep_time;
  int idx = (int)_idx;

  /* Wait until all the data structures have been allocated */
  while(theHash[idx] == NULL) ntop_sleep(1);

  sleep_time = 60 - (time(NULL) % 60); /* Align to the minute */
  traceEvent(TRACE_INFO, "Sleeping %d sec before walking hash for the first time.",
         sleep_time);
  ntop_sleep(sleep_time);

  for(;shutdownInProgress == 0;) {
    walkHash(idx, 0);

    if(++numSlots >= hashSize) {
      /* End of scan */
      unsigned int activeBuckets = bucketsAllocated-(purgedBucketsLen+exportBucketsLen);
      unsigned int freeBucketsThreshold = (unsigned int)(activeBuckets*.1); /* 10% of activeBuckets */

      if(purgedBucketsLen > freeBucketsThreshold) {
    /* Too many buckets: let's free some of them */
    while((purgedBucketsLen > 0) && (freeBucketsThreshold > 0)) {
      HashBucket *bkt;

      /* Get the head */
      pthread_mutex_lock(&purgedBucketsMutex);
      bkt = getListHead(&purgedBuckets);
      purgedBucketsLen--, bucketsAllocated--;
      pthread_mutex_unlock(&purgedBucketsMutex);

      /* Free the head */
      free(bkt);
      freeBucketsThreshold--;
    }
      }

      printStats();
      numSlots = 0;
     if(rebuild_hash) {
    int i;

    traceEvent(TRACE_INFO, "Rebuilding hash...");
    if(thePrevHash != NULL) free(thePrevHash);
    /* stop all activities and create a new hash */
    for(i=0; i<MAX_HASH_MUTEXES; idx++) pthread_spin_lock(&hashMutex[idx][i]);
    thePrevHash[idx] = theHash[idx];
    allocateHash(idx);
    for(i=0; i<MAX_HASH_MUTEXES; i++) pthread_spin_unlock(&hashMutex[idx][i]);
    traceEvent(TRACE_INFO, "The hash has been rebuilt.");
      }

      traceEvent(TRACE_INFO, "Sleeping %d sec before walking hash...", sleep_time);
      ntop_sleep(scanCycle);
    }
  } /* for */

  traceEvent(TRACE_INFO, "Hash walker thread terminated\n");
  return(NULL);
}


int isFlowExpired(HashBucket *myBucket, time_t theTime) {
  if(myBucket->bucket_expired /* Forced expire */
     || ((theTime-myBucket->lastSeenSent.tv_sec)     >= idleTimeout)         /* flow expired: data not sent for a while */
     || ((theTime-myBucket->firstSeenSent.tv_sec) >= lifetimeTimeout)        /* flow expired: flow active but too old   */
     || ((myBucket->pktRcvd > 0)
     && (((theTime-myBucket->lastSeenRcvd.tv_sec) >= idleTimeout)        /* flow expired: data not sent for a while */
         || ((theTime-myBucket->firstSeenRcvd.tv_sec) >= lifetimeTimeout)))  /* flow expired: flow active but too old   */
     ) {
    return(1);
  } else {
    /* if(hashDebug) printBucket(myBucket); */
    return(0);
  }
}