nProbe가 동작하는 기본적인 skeleton 구조
Hash Table
definition:
HashBucket **theHash[MAX_NUM_PCAP_THREADS], **thePrevHash[MAX_NUM_PCAP_THREADS];
#define HASH_SIZE 4096 /* buckets */
void allocateHash(int idx) {
u_int mallocSize = sizeof(HashBucket*)*hashSize;
theHash[idx] = (HashBucket**)calloc(1, mallocSize);
if(theHash[idx] == NULL) {
traceEvent(TRACE_ERROR, "Not enough memory");
exit(-1);
}
}
...
/* pcap-based sniffing */
for(idx=0; idx<numPcapThreads; idx++)
allocateHash(idx);
addPktToHash 에서 hash key 를 만드는 코드
idx = vlanId+proto+srcHost+dstHost+sport+dport;
queue_idx = idx % numPcapThreads;
HashTable 관련하여
typedef struct hashBucket {
#ifdef ENABLE_MAGIC
u_char magic;
#endif
u_char bucket_expired; /* Force bucket to expire */
u_short proto; /* protocol (e.g. UDP/TCP..) */
u_char srcMacAddress[6];
IpAddress src;
u_short sport;
IpAddress dst;
u_char dstMacAddress[6];
u_short dport;
u_char src2dstTos, dst2srcTos;
u_short vlanId;
unsigned short src2dstTcpFlags, dst2srcTcpFlags;
u_char src2dstFingerprint[FINGERPRINT_LEN], dst2srcFingerprint[FINGERPRINT_LEN];
struct mpls_labels *mplsInfo;
/* **************** */
u_long bytesSent, pktSent;
struct timeval firstSeenSent, lastSeenSent;
u_long bytesRcvd, pktRcvd;
struct timeval firstSeenRcvd, lastSeenRcvd;
struct hashBucket *next;
u_char src2dstPayloadLen; /* # of bytes stored on the payload */
unsigned char *src2dstPayload;
u_char dst2srcPayloadLen; /* # of bytes stored on the payload */
unsigned char *dst2srcPayload;
u_int32_t flags; /* bitmask (internal) */
struct timeval nwLatency; /* network Latency (3-way handshake) */
struct timeval src2dstApplLatency, dst2srcApplLatency; /* Application Latency */
u_int32_t src2dstIcmpFlags, dst2srcIcmpFlags; /* ICMP bitmask */
u_int16_t src2dstIcmpType, dst2srcIcmpType; /* ICMP type */
PluginInformation *plugin;
} HashBucket;
processQueuedPacket
- idx를 가지고 bucket 의 위치를 찾는다
idx %= hashSize;
bkt = theHash[hash_idx][idx];
- bkt 이 존재하며, 현재 bkt 의 sip, dip, proto, sport, dport 가 pkt 의 값과 같으면, 통계값을 업데이트 한다.
if((bkt->proto == proto)
&& (bkt->vlanId == vlanId)
&& ((cmpIpAddress(bkt->src, src)
&& cmpIpAddress(bkt->dst, dst)
&& (bkt->sport == sport)
&& (bkt->dport == dport))
|| (cmpIpAddress(bkt->src, dst)
&& cmpIpAddress(bkt->dst, src)
&& (bkt->sport == dport)
&& (bkt->dport == sport)))) {
if(bkt->src.ipType.ipv4 == src.ipType.ipv4) {
bkt->bytesSent += len, bkt->pktSent += numPkts;
bkt->lastSeenSent.tv_sec = h->ts.tv_sec, bkt->lastSeenSent.tv_usec = h->ts.tv_usec;
if(isFragment) NPROBE_FD_SET(FLAG_FRAGMENTED_PACKET_SRC2DST, &(bkt->flags));
if(proto == IPPROTO_TCP)
updateTcpFlags(bkt, 0, &h->ts, flags, fingerprint, tos);
else if((proto == IPPROTO_UDP) || (proto == IPPROTO_ICMP))
updateApplLatency(proto, bkt, 0, &h->ts, icmpType, icmpCode);
setPayload(bkt, h, payload, payloadLen, 0);
bkt->src2dstTcpFlags |= flags; /* Do not move this line before updateTcpFlags(...) */
} else {
bkt->bytesRcvd += len, bkt->pktRcvd += numPkts;
if((bkt->firstSeenRcvd.tv_sec == 0) && (bkt->firstSeenRcvd.tv_usec == 0))
bkt->firstSeenRcvd.tv_sec = h->ts.tv_sec, bkt->firstSeenRcvd.tv_usec = h->ts.tv_usec;
bkt->lastSeenRcvd.tv_sec = h->ts.tv_sec, bkt->lastSeenRcvd.tv_usec = h->ts.tv_usec;
if(isFragment) NPROBE_FD_SET(FLAG_FRAGMENTED_PACKET_DST2SRC, &(bkt->flags));
if(proto == IPPROTO_TCP)
updateTcpFlags(bkt, 1, &h->ts, flags, fingerprint, tos);
else if((proto == IPPROTO_UDP) || (proto == IPPROTO_ICMP))
updateApplLatency(proto, bkt, 1, &h->ts, icmpType, icmpCode);
setPayload(bkt, h, payload, payloadLen, 1);
bkt->dst2srcTcpFlags |= flags; /* Do not move this line before updateTcpFlags(...) */
}
pluginCallback(PACKET_CALLBACK, bkt,
proto, isFragment, numPkts, tos,
vlanId, ehdr, &src, sport,
&dst, dport, len,
flags, icmpType, numMplsLabels,
mplsLabels, fingerprint,
h, p, payload, payloadLen);
pthread_spin_unlock(&hashMutex[hash_idx][mutex_idx]);
return;
- 만약 bkt 의 값과 pkt 의 값이없으면 (collision or Null bucket)
. 즉 bucket 의 entry 는 linked list 로 관리되고 있음
bkt = (HashBucket*)malloc(sizeof(HashBucket));
/* Put the bucket on top of the list */
addToList(bkt, &theHash[hash_idx][idx]);
void addToList(HashBucket *bkt, HashBucket **list) {
bkt->next = *list;
(*list) = bkt;
}
Expire
- When exports netflow information from hash table?
- hash table walker is another thread
for(i=0; i<numPcapThreads; i++)
pthread_create(&walkHashThread[i], NULL, hashWalker, (void*)i);
hashWalker( ) -> walkHash( ) -> queueBucketToExport( )
- walk hash table periodically, if bucket is to expired, move buckets to exportQueue
/*
From the tests carried on, the very best approach
is to have a periodic thread that scans for expired
flows.
*/
void* hashWalker(void* _idx) {
u_int numSlots = 0;
u_char first_run = 1;
u_short sleep_time;
int idx = (int)_idx;
/* Wait until all the data structures have been allocated */
while(theHash[idx] == NULL) ntop_sleep(1);
sleep_time = 60 - (time(NULL) % 60); /* Align to the minute */
traceEvent(TRACE_INFO, "Sleeping %d sec before walking hash for the first time.",
sleep_time);
ntop_sleep(sleep_time);
for(;shutdownInProgress == 0;) {
walkHash(idx, 0);
if(++numSlots >= hashSize) {
/* End of scan */
unsigned int activeBuckets = bucketsAllocated-(purgedBucketsLen+exportBucketsLen);
unsigned int freeBucketsThreshold = (unsigned int)(activeBuckets*.1); /* 10% of activeBuckets */
if(purgedBucketsLen > freeBucketsThreshold) {
/* Too many buckets: let's free some of them */
while((purgedBucketsLen > 0) && (freeBucketsThreshold > 0)) {
HashBucket *bkt;
/* Get the head */
pthread_mutex_lock(&purgedBucketsMutex);
bkt = getListHead(&purgedBuckets);
purgedBucketsLen--, bucketsAllocated--;
pthread_mutex_unlock(&purgedBucketsMutex);
/* Free the head */
free(bkt);
freeBucketsThreshold--;
}
}
printStats();
numSlots = 0;
if(rebuild_hash) {
int i;
traceEvent(TRACE_INFO, "Rebuilding hash...");
if(thePrevHash != NULL) free(thePrevHash);
/* stop all activities and create a new hash */
for(i=0; i<MAX_HASH_MUTEXES; idx++) pthread_spin_lock(&hashMutex[idx][i]);
thePrevHash[idx] = theHash[idx];
allocateHash(idx);
for(i=0; i<MAX_HASH_MUTEXES; i++) pthread_spin_unlock(&hashMutex[idx][i]);
traceEvent(TRACE_INFO, "The hash has been rebuilt.");
}
traceEvent(TRACE_INFO, "Sleeping %d sec before walking hash...", sleep_time);
ntop_sleep(scanCycle);
}
} /* for */
traceEvent(TRACE_INFO, "Hash walker thread terminated\n");
return(NULL);
}
int isFlowExpired(HashBucket *myBucket, time_t theTime) {
if(myBucket->bucket_expired /* Forced expire */
|| ((theTime-myBucket->lastSeenSent.tv_sec) >= idleTimeout) /* flow expired: data not sent for a while */
|| ((theTime-myBucket->firstSeenSent.tv_sec) >= lifetimeTimeout) /* flow expired: flow active but too old */
|| ((myBucket->pktRcvd > 0)
&& (((theTime-myBucket->lastSeenRcvd.tv_sec) >= idleTimeout) /* flow expired: data not sent for a while */
|| ((theTime-myBucket->firstSeenRcvd.tv_sec) >= lifetimeTimeout))) /* flow expired: flow active but too old */
) {
return(1);
} else {
/* if(hashDebug) printBucket(myBucket); */
return(0);
}
}