Changeset 255 for vtcross/branches

Show
Ignore:
Timestamp:
05/15/09 11:47:36 (15 years ago)
Author:
wrodgers
Message:

Implemented service DB, transfer methods

Files:
1 modified

Legend:

Unmodified
Added
Removed
  • vtcross/branches/wrodgers/ServiceManagementLayer.cpp

    r250 r255  
    55 */ 
    66 
    7 /* DESCRIPTION OF FILE. 
     7/* When a CE connects, the address of it's local socket is stored and it is assigned an ID number. 
     8 * Until it sends a message ending communication, any communications from it are processed with 
     9 * that ID number.  The "sendto" command uses the address data to send messages directly back to 
     10 * it's local socket, allowing us to get away with a single-socket implementation on the SML-side. 
     11 * 
     12 * Services are stored in a SQLite DB by the ID of the CE that registered them. 
     13 * 
     14 * Mission support has not been implemented yet, but will be soon. 
     15 */ 
     16 
     17/* KNOWN ISSUE: Note that this implementation would be unstable for CE's on any sort of network 
     18 * since it relies on asyncronous communication occuring linearly and with one partner at a time. 
     19 * The breakdown occurs specifically when the CE has to send ack messages back to the SML; 
     20 * since there may be other messages queued on the socket since communication started, 
     21 * there is no guarentee that the message would be the first up and I have yet to discover a method 
     22 * for peaking more than 1 item into the message queue without starting to pop messages off first. 
     23 * Still a work in progress though... 
    824 */ 
    925 
     
    3955#include "../../trunk/src/include/tinyxml/tinyxml.h" 
    4056#include "../../trunk/src/include/tinyxml/tinystr.h" 
     57#include "sqlite3.h" 
     58 
     59typedef struct services_s * services_DB; 
     60 
     61struct services_s { 
     62    char filename[64]; 
     63    char tablename[64]; 
     64    char command[2048]; 
     65    sqlite3 *db;  
     66    unsigned int num_columns; 
     67}; 
     68 
     69services_DB _services_DB; 
    4170 
    4271 
     
    7099 
    71100    LoadConfiguration(); 
     101    CreateServicesDB(); 
     102} 
     103 
     104void 
     105CreateServicesDB() 
     106{ 
     107    _services_DB = (services_DB) malloc(sizeof(struct services_s)); 
     108 
     109    // create database 
     110 
     111    // copy filename 
     112    unsigned int i=0; 
     113    strcpy(_services_DB->filename, "Services_Table"); 
     114 
     115    // execute create database command 
     116    // database handle 
     117    //_services_DB->db = NULL; 
     118    sqlite3_open(_services_DB->filename, &(_services_DB->db)); 
     119    char* cols[] = {"ID_Num", "Service_Name"}; 
     120 
     121    // create table 
     122 
     123    // copy tablename 
     124    strcpy(_services_DB->tablename, "Services"); 
     125 
     126    // number of columns in the table 
     127    _services_DB->num_columns = 2; 
     128 
     129    // generate command 
     130    strcpy(_services_DB->command, "CREATE TABLE "); 
     131    strcat(_services_DB->command, _services_DB->tablename); 
     132    strcat(_services_DB->command, "("); 
     133    strcat(_services_DB->command, cols[0]); 
     134    strcat(_services_DB->command, " INT, "); 
     135    strcat(_services_DB->command, cols[1]); 
     136    strcat(_services_DB->command, " TEXT"); 
     137    strcat(_services_DB->command, ");"); 
     138 
     139    // execute create table command 
     140    sqlite3_exec(_services_DB->db, _services_DB->command, NULL, 0, NULL); 
    72141} 
    73142 
     
    137206ServiceManagementLayer::CESignalHandler(int32_t ID) 
    138207{ 
    139    char buffer[256];     
    140    memset(buffer, 0, 256);      
    141    // 
    142    ReadMessage(cogEngSrv, buffer); 
    143    if(strcmp(buffer, "register_engine_cognitive") == 0) { 
    144         RegisterCognitiveEngine(ID); 
    145    } 
     208    char buffer[256];    
     209    memset(buffer, 0, 256);      
     210    // 
     211    ReadMessage(cogEngSrv, buffer); 
     212    if(strcmp(buffer, "register_engine_cognitive") == 0) { 
     213         RegisterCognitiveEngine(ID); 
     214    } 
     215    else if(strcmp(buffer, "register_service") == 0) { 
     216         ReceiveServices(ID); 
     217    } 
    146218 
    147219 
     
    185257} 
    186258 
    187  
     259/* Streams config data directly from the shell to the CE, and checks 
     260 * for an "ack" message from the CE after every sent message  
     261 * to know when to stop communication. 
     262 */ 
    188263void  
    189264ServiceManagementLayer::TransferRadioConfiguration(int32_t ID) 
    190265{ 
    191 } 
    192  
    193  
     266    struct timeval selTimeout; 
     267    fd_set sockSet; 
     268    int32_t rc = 0; 
     269    char buffer[256]; 
     270    //Send data until the CE sends an ACK message back 
     271    while(rc == 0){ 
     272        memset(buffer, 0, 256); 
     273        //Receive data from Shell 
     274        ReadMessage(shellSocketFD, buffer); 
     275        //Send data to CE 
     276        sendto(cogEngSrv, buffer, 256, NULL, (struct sockaddr*) CE_List[ID].sock_ptr, CE_List[ID].sock_len);  
     277        FD_ZERO(&sockSet); 
     278        FD_SET(cogEngSrv, &sockSet); 
     279        selTimeout.tv_sec = 0; 
     280        selTimeout.tv_usec = 0; 
     281        //Check if there is a message on the CE socket ready to be processed 
     282        rc = select(cogEngSrv + 1, &sockSet, NULL, NULL, &selTimeout); 
     283    } 
     284    memset(buffer, 0, 256); 
     285    //Once there is a message on the CE socket, read it and forward it on to the shell 
     286    ReadMessage(cogEngSrv, buffer); 
     287    SendMessage(shellSocketFD, buffer); 
     288} 
     289 
     290//Simmilar to TransferRadioConfig, just with Experience data 
    194291void  
    195292ServiceManagementLayer::TransferExperience(int32_t ID) 
    196293{ 
    197 } 
    198  
    199  
    200 void  
    201 ServiceManagementLayer::ReceiveServices() 
    202 { 
     294    struct timeval selTimeout; 
     295    fd_set sockSet; 
     296    int32_t rc = 0; 
     297    char buffer[256]; 
     298    //Send data until the CE sends an ACK message back 
     299    while(rc == 0){ 
     300        memset(buffer, 0, 256); 
     301        //Receive data from Shell 
     302        ReadMessage(shellSocketFD, buffer); 
     303        //Send data to CE 
     304        sendto(cogEngSrv, buffer, 256, NULL, (struct sockaddr*) CE_List[ID].sock_ptr, CE_List[ID].sock_len);  
     305        FD_ZERO(&sockSet); 
     306        FD_SET(cogEngSrv, &sockSet); 
     307        selTimeout.tv_sec = 0; 
     308        selTimeout.tv_usec = 0; 
     309        //Check if there is a message on the CE socket ready to be processed 
     310        rc = select(cogEngSrv + 1, &sockSet, NULL, NULL, &selTimeout); 
     311    } 
     312    memset(buffer, 0, 256); 
     313    //Once there is a message on the CE socket, read it and forward it on to the shell 
     314    ReadMessage(cogEngSrv, buffer); 
     315    SendMessage(shellSocketFD, buffer); 
     316} 
     317 
     318 
     319void  
     320ServiceManagementLayer::ReceiveServices(int32_t ID) 
     321{ 
     322    char buffer[256]; 
     323    memset(buffer, 0, 256); 
     324    ReadMessage(cogEngSrv, buffer); 
     325    char* cols[] = {"ID_Num", "Service_Name"}; 
     326     
     327    // generate command 
     328    //printf("%s\n", _services_DB->command); 
     329    strcpy(_services_DB->command, "insert into "); 
     330    strcat(_services_DB->command, _services_DB->tablename);  
     331    strcat(_services_DB->command, " ("); 
     332    strcat(_services_DB->command, cols[0]); 
     333    strcat(_services_DB->command, ", "); 
     334    strcat(_services_DB->command, cols[1]); 
     335    strcat(_services_DB->command, ") "); 
     336    strcat(_services_DB->command, " values("); 
     337    sprintf(_services_DB->command, "%s%d", _services_DB->command, ID); 
     338    strcat(_services_DB->command, ", "); 
     339    strcat(_services_DB->command, buffer); 
     340    strcat(_services_DB->command, ");"); 
     341     
     342    //printf("search command: %s\n", _services_DB->command); 
     343    // execute add command 
     344    sqlite3_exec(_services_DB->db, _services_DB->command, NULL, 0, NULL); 
     345 
    203346} 
    204347 
     
    213356ServiceManagementLayer::ListServices() 
    214357{ 
     358    // generate commandi 
     359    strcpy(_services_DB->command, "select "); 
     360    strcat(_services_DB->command, _services_DB->tablename); 
     361    strcat(_services_DB->command, ".* from "); 
     362    strcat(_services_DB->command, _services_DB->tablename); 
     363    strcat(_services_DB->command, ";"); 
     364 
     365    // execute print (select all)  command 
     366    sqlite3_exec(_services_DB->db, _services_DB->command, NULL, 0, NULL); 
     367    printf("database %s, table %s:\n", _services_DB->filename, _services_DB->tablename); 
    215368} 
    216369 
     
    230383 
    231384void 
    232 ServiceManagementLayer::RegisterCognitiveEngine(int32_t socketFD) 
     385ServiceManagementLayer::RegisterCognitiveEngine(int32_t ID) 
    233386{ 
    234387    SendMessage(shellSocketFD, "register_engine_cognitive"); 
    235     LOG("Cognitive Engine:: Registration message sent to shell.\n"); 
    236  
    237     TransferRadioConfiguration(socketFD); 
    238     TransferExperience(socketFD); 
     388    LOG("ServiceManagementLayer:: CE registration message forwarded to shell.\n"); 
     389 
     390    TransferRadioConfiguration(ID); 
     391    TransferExperience(ID); 
    239392 
    240393    numberOfCognitiveEngines++; 
     
    242395} 
    243396 
    244  
    245 void  
    246 ServiceManagementLayer::DeregisterCognitiveEngine(int32_t socketFD) 
    247 { 
    248     LOG("Cognitive Radio Shell:: Received deregistration message from Cognitive Engine.\n"); 
     397void  
     398ServiceManagementLayer::DeregisterServices(int32_t ID) 
     399{ 
     400    char str_buffer[64]; 
     401    strcpy(_services_DB->command, "delete "); 
     402    strcat(_services_DB->command, _services_DB->tablename); 
     403    strcat(_services_DB->command, ".* from "); 
     404    strcat(_services_DB->command, _services_DB->tablename); 
     405    strcat(_services_DB->command, " where "); 
     406    strcat(_services_DB->command, "ID_Num"); 
     407    strcat(_services_DB->command, "=="); 
     408    sprintf(str_buffer, "%d;", ID); 
     409    strcat(_services_DB->command, str_buffer); 
     410    sqlite3_exec(_services_DB->db, _services_DB->command, NULL, 0, NULL); 
     411} 
     412 
     413 
     414void  
     415ServiceManagementLayer::DeregisterCognitiveEngine(int32_t ID) 
     416{ 
     417    LOG("ServiceManagementLayer:: CE deregistration message forwarded to shell.\n"); 
    249418 
    250419    numberOfCognitiveEngines--; 
     
    252421        CE_Present = false; 
    253422 
    254     SendMessage(socketFD, "deregister_ack"); 
    255     shutdown(socketFD, 2); 
    256     close(socketFD); 
     423    SendMessage(shellSocketFD, "deregister_engine_cognitive"); 
     424    char buffer[256]; 
     425    memset(buffer, 0, 256); 
     426    ReadMessage(shellSocketFD, buffer); 
     427    sendto(cogEngSrv, buffer, 256, NULL, (struct sockaddr*) CE_List[ID].sock_ptr, CE_List[ID].sock_len); 
     428    if(strcmp("deregister_ack", buffer) != 0) { 
     429        ERROR(1, "SML:: Failed to close CE socket\n"); 
     430    } 
     431    CE_List[ID].sock_ptr = NULL; 
     432    CE_List[ID].ID_num = -1; 
     433 
    257434    LOG("Cognitive Radio Shell:: Socket closed.\n"); 
    258435} 
    259436 
     437/* The basic idea here is that when any time a message is sent over the socket, the sender's address is sent too. 
     438 * We take advantage of this in order to uniquely identify the component for future communications since 
     439 * there is only one socket on the SML-side. 
     440 */ 
    260441void 
    261442ServiceManagementLayer::StartSMLServer() 
     
    282463        /* Timeout specification */ 
    283464        /* This must be reset every time select() is called */ 
    284         selTimeout.tv_sec = timeout;       /* timeout (secs.) */ 
     465        selTimeout.tv_sec = 0;       /* timeout (secs.) */ 
    285466        selTimeout.tv_usec = 0;            /* 0 microseconds */ 
     467        //Changed both to zero so that select will check messages from the shell instead of blocking 
     468        //when there is no command from the CE's to be processed 
    286469 
    287470        //Check if there is a message on the socket waiting to be read 
    288471        rc = select(cogEngSrv + 1, &sockSet, NULL, NULL, &selTimeout); 
    289472        if(rc == 0){ 
    290             //If not, log that fact and check instead for messages from the shell 
    291             LOG("No echo requests for %i secs...Server still alive\n", timeout); 
     473            //If not, check instead for messages from the shell 
    292474            ShellSignalHandler();        
    293475        } 
     
    298480            struct sockaddr_in sock_addr; 
    299481            memset((void *) &sock_addr, 0, sizeof(sock_addr)); 
     482            socklen_t sock_len; 
    300483            //Peak at the next message on the socket to determine its address of orgin 
    301             recvfrom(cogEngSrv, buffer, 256, MSG_PEEK, (struct sockaddr*) &sock_addr, NULL); 
     484            recvfrom(cogEngSrv, buffer, 256, MSG_PEEK, (struct sockaddr*) &sock_addr, &sock_len); 
    302485            bool found = false; 
    303486            //Is it from a previously logged address? 
     
    313496                CE_List[Current_ID].sock_ptr = &sock_addr; 
    314497                CE_List[Current_ID].ID_num = Current_ID; 
     498                CE_List[Current_ID].sock_len = sock_len; 
    315499                CESignalHandler(Current_ID); 
    316500                Current_ID++;