Changeset 444

Show
Ignore:
Timestamp:
08/28/09 11:58:02 (15 years ago)
Author:
bhilburn
Message:

40% done reworking the SML source.

Files:
1 modified

Legend:

Unmodified
Added
Removed
  • vtcross/trunk/src/service_management_layer/ServiceManagementLayer.cpp

    r443 r444  
    115115    int32_t rc;         /* sqlite command return code */ 
    116116 
    117     _services_DB->command="drop table "; 
     117    _services_DB->command = "drop table "; 
    118118    _services_DB->command.append(_services_DB->tablename); 
    119119    rc = sqlite3_exec(_services_DB->db, _services_DB->command.c_str(), callback, 0, &errorMsg); 
     
    121121        WARNING("ServiceManagementLayer::Destructor services 'drop table' error: %s\n", errorMsg); 
    122122 
    123     _services_DB->command="vacuum"; 
     123    _services_DB->command = "vacuum"; 
    124124    rc = sqlite3_exec(_services_DB->db, _services_DB->command.c_str(), callback, 0, &errorMsg); 
    125125    if((rc != SQLITE_OK) && (rc != 101)) 
     
    128128    free(_services_DB); 
    129129 
    130     _data_DB->command="drop table "; 
     130    _data_DB->command = "drop table "; 
    131131    _data_DB->command.append(_data_DB->tablename); 
    132132    rc = sqlite3_exec(_data_DB->db, _data_DB->command.c_str(), callback, 0, &errorMsg); 
     
    134134        WARNING("ServiceManagementLayer::Destructor data 'drop table' error: %s\n", errorMsg); 
    135135 
    136     _data_DB->command="vacuum"; 
     136    _data_DB->command = "vacuum"; 
    137137    rc = sqlite3_exec(_data_DB->db, _data_DB->command.c_str(), callback, 0, &errorMsg); 
    138138    if((rc != SQLITE_OK) && (rc != 101)) 
     
    186186    sqlite3_open(_services_DB->filename.c_str(), &(_services_DB->db)); 
    187187 
    188     char* cols[] = {(char *)"ID_Num", (char *)"Service_Name"}; 
     188    char *cols[] = {(char *)"ID_Num", (char *)"Service_Name"}; 
    189189 
    190190    _services_DB->tablename="Services"; 
     
    242242    sqlite3_open(_data_DB->filename.c_str(), &(_data_DB->db)); 
    243243 
    244     char* cols[] = {(char *)"Tag", (char *)"Data"}; 
    245  
    246     _data_DB->tablename="Data"; 
    247     _data_DB->command="DROP TABLE IF EXISTS Data;";      
     244    char *cols[] = {(char *)"Tag", (char *)"Data"}; 
     245 
     246    _data_DB->tablename = "Data"; 
     247    _data_DB->command = "DROP TABLE IF EXISTS Data;";      
    248248 
    249249    rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), 128, &ppStmt, &pzTail); 
     
    258258 
    259259    /* Generate command */ 
    260     _data_DB->command="CREATE TABLE "; 
     260    _data_DB->command = "CREATE TABLE "; 
    261261    _data_DB->command.append(_data_DB->tablename); 
    262262    _data_DB->command.append("("); 
     
    293293/* CALLED BY: constructor 
    294294 * INPUTS: |serverName| the IPv4 name of the server (127.0.0.1 for localhost) 
    295  *       |serverPort| the port on the server to connect to 
     295 *         |serverPort| the port on the server to connect to 
    296296 * OUTPUTS: <none> 
    297297 * 
     
    323323     
    324324    if(ID != -1) 
    325     _FD = CE_List[ID].FD; 
     325        _FD = CE_List[ID].FD; 
    326326    else  
    327     _FD = shellSocketFD; 
     327        _FD = shellSocketFD; 
     328 
    328329    ReadMessage(_FD, buffer); 
    329     //printf("MH_buffer = %s\n", buffer); 
    330330     
    331331    //--------Policy Engine Stuff - no policy engine support in this version-------// 
     
    368368    } 
    369369    else if(strcmp(buffer, "register_engine_cognitive") == 0) { 
    370          RegisterCognitiveEngine(ID); 
     370        RegisterCognitiveEngine(ID); 
    371371    } 
    372372    else if(strcmp(buffer, "register_service") == 0) { 
    373      ReceiveServices(ID); 
     373        ReceiveServices(ID); 
    374374    } 
    375375    else if(strcmp(buffer, "send_component_type") == 0) { 
    376      SendComponentType(); 
     376        SendComponentType(); 
    377377    } 
    378378    else if(strcmp(buffer, "list_services") == 0) { 
    379      ListServices(); 
     379        ListServices(); 
    380380    } 
    381381    else if(strcmp(buffer, "set_active_mission") == 0) { 
    382     SetActiveMission(); 
     382        SetActiveMission(); 
    383383    } 
    384384    else if(strcmp(buffer, "request_optimization") == 0) { 
    385     PerformActiveMission(); 
     385        PerformActiveMission(); 
    386386    } 
    387387    else if(strcmp(buffer, "deregister_engine_cognitive") == 0) { 
    388     DeregisterCognitiveEngine(ID); 
     388        DeregisterCognitiveEngine(ID); 
    389389    } 
    390390    else if(strcmp(buffer, "deregister_service") == 0) { 
    391     DeregisterServices(ID); 
     391        DeregisterServices(ID); 
    392392    } 
    393393} 
     
    431431    SendMessage(shellSocketFD, "register_sml"); 
    432432    LOG("ServiceManagementLayer:: Registration message sent.\n"); 
    433     //printf("SSFD = %d\n", shellSocketFD); 
    434433} 
    435434 
     
    460459 * for an "ack" message from the CE after every sent message  
    461460 * to know when to stop communication. 
    462  */ 
    463  
    464 //Modified to check the incoming message buffer rather than the outgoing message buffer to avoid a portion of the delay 
    465 //May change this again to handle data more inteligently, taking advantage of it's properties. 
     461 *  
     462 * NOTE: Modified to check the incoming message buffer rather than the outgoing  
     463 * message buffer to avoid a portion of the delay. May change this again to handle  
     464 * data more inteligently, taking advantage of it's properties. 
     465 */ 
    466466void  
    467467ServiceManagementLayer::TransferRadioConfiguration(int32_t ID) 
    468468{ 
    469     //printf("transRadConfig\n"); 
    470469    struct timeval selTimeout; 
    471470    fd_set sockSet; 
    472471    int32_t rc = 1; 
    473472    char buffer[256]; 
    474     //Send data until the CE sends an ACK message back 
    475     while(rc!=0){ 
     473 
     474    /* Send data until the CE sends an ACK message back */ 
     475    while(rc != 0) { 
    476476        memset(buffer, 0, 256); 
    477     //Receive data from Shell 
    478     ReadMessage(shellSocketFD, buffer); 
    479     //printf("buffer = %s\n", buffer); 
    480     //Send data to CE 
    481     SendMessage(CE_List[ID].FD, buffer); 
     477 
     478        /* Receive data from Shell */ 
     479        ReadMessage(shellSocketFD, buffer); 
     480 
     481        /* Send data to CE */ 
     482        SendMessage(CE_List[ID].FD, buffer); 
    482483        FD_ZERO(&sockSet); 
    483484        FD_SET(shellSocketFD, &sockSet); 
    484485        selTimeout.tv_sec = 0; 
    485486        selTimeout.tv_usec = 5000; 
    486         //Check if there is a message on the shell ready to be processed 
    487         rc=select(shellSocketFD + 1, &sockSet, NULL, NULL, &selTimeout); 
    488     } 
     487 
     488        /* Check if there is a message on the shell ready to be processed */ 
     489        rc = select(shellSocketFD + 1, &sockSet, NULL, NULL, &selTimeout); 
     490    } 
     491 
    489492    memset(buffer, 0, 256); 
    490493    ReadMessage(CE_List[ID].FD, buffer); 
    491494    SendMessage(shellSocketFD, buffer); 
    492     //printf("transfer done!\n"); 
    493495} 
    494496 
     
    499501 * 
    500502 * DESCRIPTION: Simmilar to TransferRadioConfig, just with Experience data 
    501  */ 
    502  
    503 //Modified to check the incoming message buffer rather than the outgoing message buffer to avoid a portion of the delay 
    504 //May change this again to handle data more inteligently, taking advantage of it's properties. 
     503 *  
     504 * NOTE: Modified to check the incoming message buffer rather than the outgoing  
     505 * message buffer to avoid a portion of the delay. May change this again to handle  
     506 * data more inteligently, taking advantage of it's properties. 
     507 */ 
    505508void  
    506509ServiceManagementLayer::TransferExperience(int32_t ID) 
     
    510513    int32_t rc = 1; 
    511514    char buffer[256]; 
    512     //Send data until the CE sends an ACK message back 
    513     while(rc!=0){ 
    514     //printf("transfering...\n"); 
     515    /* Send data until the CE sends an ACK message back */ 
     516    while(rc != 0) { 
    515517        memset(buffer, 0, 256); 
    516     //Receive data from Shell 
    517     ReadMessage(shellSocketFD, buffer); 
    518     //printf("buffer = %s\n", buffer); 
    519     //Send data to CE 
    520     SendMessage(CE_List[ID].FD, buffer); 
     518 
     519        /* Receive data from Shell */ 
     520        ReadMessage(shellSocketFD, buffer); 
     521 
     522        /* Send data to CE */ 
     523        SendMessage(CE_List[ID].FD, buffer); 
    521524        FD_ZERO(&sockSet); 
    522525        FD_SET(shellSocketFD, &sockSet); 
    523526        selTimeout.tv_sec = 0; 
    524527        selTimeout.tv_usec = 5000; 
    525         //Check if there is a message on the shell ready to be processed 
    526         rc=select(shellSocketFD + 1, &sockSet, NULL, NULL, &selTimeout); 
    527     } 
     528 
     529        /* Check if there is a message on the shell ready to be processed */ 
     530        rc = select(shellSocketFD + 1, &sockSet, NULL, NULL, &selTimeout); 
     531    } 
     532 
    528533    memset(buffer, 0, 256); 
    529     //printf("done trans exp!\n"); 
    530534    ReadMessage(CE_List[ID].FD, buffer); 
    531535    SendMessage(shellSocketFD, buffer); 
     
    544548    memset(buffer, 0, 256); 
    545549    ReadMessage(CE_List[ID].FD, buffer); 
    546     char* cols[] = {(char *)"ID_Num", (char *)"Service_Name"}; 
    547     //printf("RS_buffer = %s\n", buffer); 
    548     // generate command 
    549     _services_DB->command="insert into "; 
     550 
     551    char *cols[] = {(char *) "ID_Num", (char *) "Service_Name"}; 
     552 
     553    /* Generate command */ 
     554    _services_DB->command = "insert into "; 
    550555    _services_DB->command.append(_services_DB->tablename);  
    551556    _services_DB->command.append(" ("); 
     
    555560    _services_DB->command.append(") "); 
    556561    _services_DB->command.append(" values("); 
     562 
    557563    char temp[3]; 
    558     memset(temp,0,3); 
     564    memset(temp, 0, 3); 
    559565    sprintf(temp, "%d", ID); 
     566 
    560567    _services_DB->command.append(temp); 
    561568    _services_DB->command.append(", '"); 
     
    563570    _services_DB->command.append("');"); 
    564571     
    565     //printf("search command: %s\n", _services_DB->command); 
    566     // execute add command 
     572    /* Execute add command */ 
    567573    char *errorMsg; 
    568574    int rc = sqlite3_exec(_services_DB->db, _services_DB->command.c_str(), callback, 0, &errorMsg); 
    569     if( rc!=SQLITE_OK && rc!=101 ) 
    570         fprintf(stderr, "ServiceManagementLayer::RecieveServices DB Error %s\n", errorMsg); 
    571     /*sprintf(outBuffer, "SML: Registering service '%s' from component number '%d'", buffer, ID); 
    572     LOG(outBuffer);*/ 
     575    if((rc != SQLITE_OK) && (rc != 101)) 
     576        WARNING("ServiceManagementLayer::RecieveServices DB Error %s\n", errorMsg); 
    573577} 
    574578 
     
    577581 * OUTPUTS: <none> 
    578582 * 
    579  * DESCRIPTION: This method associates the services that components provide with the services that are requested in the mission 
    580  * Each service in the mission is given the ID and FD of a component that has registered to provide that service  
    581  * Deregistration is okay until this method is called without a reload, but if deregistration occurs after this 
     583 * DESCRIPTION: This method associates the services that components provide with the  
     584 * services that are requested in the mission. Each service in the mission is given  
     585 * the ID and FD of a component that has registered to provide that service. Deregistration  
     586 * is okay until this method is called without a reload, but if deregistration occurs after this 
    582587 * method is called it needs to be called again even if other engines also provide the services 
    583588 */ 
     
    588593    memset(buffer, 0, 256); 
    589594    ReadMessage(shellSocketFD, buffer); 
     595 
    590596    uint32_t missID = atoi(buffer); 
    591     for(activeMission = 0; activeMission < 10; activeMission++) 
    592     { 
    593     //Find the active mission by comparing mission ID's 
    594     if(miss[activeMission].missionID == missID) 
    595         break; 
    596     } 
    597  
    598     LOG("ServiceManagementLayer:: Received Set Active Mission command: %i.\n",missID); 
    599     //For each service in the mission 
    600     for(uint16_t i = 0; i < miss[activeMission].numServices; i++) 
    601     {     
    602     //Check whether the current service is an actual service or a conditional 
    603     if(miss[activeMission].services[i].name.compare("if") && miss[activeMission].services[i].name.compare("dowhile") && \ 
    604        miss[activeMission].services[i].name.compare("shell")){ 
    605         //If it is a service, search the database of registered services to find the ID of the component that registered it 
    606             _services_DB->command="select "; 
    607            _services_DB->command.append(_services_DB->tablename); 
    608             _services_DB->command.append(".* from "); 
    609             _services_DB->command.append( _services_DB->tablename); 
    610             _services_DB->command.append(" where Service_Name=='"); 
    611         _services_DB->command.append(miss[activeMission].services[i].name); 
    612         _services_DB->command.append("';"); 
     597    for(activeMission = 0; activeMission < 10; activeMission++) { 
     598        /* Find the active mission by comparing mission ID's */ 
     599        if(miss[activeMission].missionID == missID) 
     600            break; 
     601    } 
     602 
     603    LOG("ServiceManagementLayer:: Received Set Active Mission command: %i.\n", missID); 
     604 
     605    /* For each service in the mission */ 
     606    for(size_t i = 0; i < miss[activeMission].numServices; i++) { 
     607        /* Check whether the current service is an actual service or a conditional */ 
     608        if(miss[activeMission].services[i].name.compare("if") && \ 
     609                miss[activeMission].services[i].name.compare("dowhile") && \ 
     610                miss[activeMission].services[i].name.compare("shell")) { 
     611                /* If it is a service, search the database of registered services to find  
     612                 * the ID of the component that registered it */ 
     613                _services_DB->command="select "; 
     614                _services_DB->command.append(_services_DB->tablename); 
     615                _services_DB->command.append(".* from "); 
     616                _services_DB->command.append( _services_DB->tablename); 
     617                _services_DB->command.append(" where Service_Name=='"); 
     618                _services_DB->command.append(miss[activeMission].services[i].name); 
     619                _services_DB->command.append("';"); 
    613620     
    614             sqlite3_stmt * pStatement; 
    615             int rc = sqlite3_prepare_v2(_services_DB->db, _services_DB->command.c_str(), -1, &pStatement, NULL); 
    616             if (rc == SQLITE_OK){  
    617                 if (sqlite3_step(pStatement) == SQLITE_ROW) 
    618                  miss[activeMission].services[i].componentID =  sqlite3_column_int(pStatement, 0);  
    619                 else { 
    620                 printf("services_DB:: Mission requires service %s not provided by any connected component.\n",miss[activeMission].services[i].name.c_str()); 
    621             rc=31337; 
    622             } 
    623              } else { 
    624             printf("services_DB:: Error executing SQL statement. rc = %i\n%s\n",rc,_services_DB->command.c_str()); 
    625             } 
    626         //printf("s_name=%s\n",miss[activeMission].services[i].name.c_str()); 
    627             sqlite3_finalize(pStatement); 
    628         miss[activeMission].services[i].socketFD = CE_List[miss[activeMission].services[i].componentID].FD; 
    629         //Set the FD and ID of the service to refer to the component where the service exists 
    630     } 
    631     //Nothing to be done for conditionals at this stage 
     621                sqlite3_stmt * pStatement; 
     622                int32_t rc = sqlite3_prepare_v2(_services_DB->db, _services_DB->command.c_str(), \ 
     623                        -1, &pStatement, NULL); 
     624                if(rc == SQLITE_OK) {  
     625                    if (sqlite3_step(pStatement) == SQLITE_ROW) 
     626                        miss[activeMission].services[i].componentID = sqlite3_column_int(pStatement, 0);  
     627                    else { 
     628                        WARNING("services_DB:: Mission requires service %s ", \ 
     629                                miss[activeMission].services[i].name.c_str()); 
     630                        WARNING("not provided by any connected component.\n") 
     631                        rc = 31337; 
     632                    } 
     633                } else { 
     634                    WARNING("services_DB:: Error executing SQL statement. rc = %i\n%s\n", \ 
     635                            rc, _services_DB->command.c_str()); 
     636                } 
     637 
     638                sqlite3_finalize(pStatement); 
     639                miss[activeMission].services[i].socketFD = \ 
     640                    CE_List[miss[activeMission].services[i].componentID].FD; 
     641        } 
     642        /* TODO Nothing to be done for conditionals at this stage */ 
    632643    } 
    633644  
    634645    SendMessage(shellSocketFD, "ack"); 
    635646    LOG("ServiceManagementLayer:: Done setting active mission.\n"); 
    636     //printf("\nhere ---%d, %d---\n", miss[activeMission].services[0].componentID, miss[activeMission].services[1].componentID); 
    637647} 
    638648 
     
    642652 * 
    643653 * DESCRIPTION: This is a helper method for the "PerformActiveMission" function 
    644  * NOTE: This function has changed durrastically from the previous implementation 
    645  * Takes an ID of a service 
    646  * For that service, finds inputs in DB and forwords those on to the engine after sending comm-starting messages 
    647  * Afterwords, listenes for the outputs so that it can store those in the database for future services or the overall output 
     654 * NOTE: This function has changed drastically from the previous implementation 
     655 *  
     656 * Takes an ID of a service. For that service, finds inputs in DB and forwords  
     657 * those on to the engine after sending comm-starting messages. Afterwords, listenes  
     658 * for the outputs so that it can store those in the database for future services or  
     659 * the overall output 
    648660 */ 
    649661void 
    650662ServiceManagementLayer::TransactData(int32_t sourceID) 
    651663{ 
    652    // LOG("ServiceManagementLayer:: Data transaction occuring.\n"); 
    653664    char buffer[256]; 
    654665    std::string data; 
    655     char* cols[] = {(char *)"Tag", (char *)"Data"}; 
    656     int i = 0; 
     666    char *cols[] = {(char *) "Tag", (char *) "Data"}; 
    657667    char *token; 
    658668 
    659    //Send a message directly to the shell 
    660    if(miss[activeMission].services[sourceID].name.find("shell")!=string::npos) 
    661    {  
    662     int k = 0; 
    663     shellFound=true; 
    664     while(k<10 && !miss[activeMission].input[k].empty()){ 
    665         k++; 
    666     } 
    667     sprintf(buffer, "%d", k); 
    668     SendMessage(shellSocketFD, buffer); 
    669     for(int t = 0; t < k; t++){ 
    670         memset(buffer, 0 , 256); 
    671             _data_DB->command="select "; 
     669   /* Send a message directly to the shell */ 
     670   if(miss[activeMission].services[sourceID].name.find("shell") != string::npos) { 
     671       shellFound=true; 
     672 
     673       int32_t k = 0; 
     674       while((k < 10) && (!miss[activeMission].input[k].empty())) { 
     675           k++; 
     676       } 
     677 
     678       sprintf(buffer, "%d", k); 
     679       SendMessage(shellSocketFD, buffer); 
     680       for(int32_t t = 0; t < k; t++) { 
     681           memset(buffer, 0 , 256); 
     682           _data_DB->command="select "; 
    672683           _data_DB->command.append(_data_DB->tablename); 
    673             _data_DB->command.append(".* from "); 
    674             _data_DB->command.append(_data_DB->tablename); 
    675             _data_DB->command.append(" where Tag=='"); 
    676         _data_DB->command.append(miss[activeMission].input[t]); 
    677         _data_DB->command.append("';"); 
    678             sqlite3_stmt * pStatement; 
    679             int rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), -1, &pStatement, NULL); 
    680             if (rc == SQLITE_OK){  
    681                 if (sqlite3_step(pStatement) == SQLITE_ROW) 
    682                 data=((const char*) sqlite3_column_text(pStatement, 1));  
    683                 else { 
    684                 printf("3data_DB:: Data not yet in DB., %s\n", _data_DB->command.c_str()); 
    685             rc=31337; 
    686             } 
    687             }  
    688         else { 
    689             printf("data_DB:: Error executing SQL statement. rc = %i\n%s\n",rc,_data_DB->command.c_str()); 
    690             } 
    691             sqlite3_finalize(pStatement); 
    692         token = strtok((char *)data.c_str(), "@"); 
    693         token = strtok(NULL, "@"); 
    694         SendMessage(shellSocketFD, token); 
    695         token = strtok(NULL, "@"); 
    696         SendMessage(shellSocketFD, token); 
    697     } 
    698     return; 
     684           _data_DB->command.append(".* from "); 
     685           _data_DB->command.append(_data_DB->tablename); 
     686           _data_DB->command.append(" where Tag=='"); 
     687           _data_DB->command.append(miss[activeMission].input[t]); 
     688           _data_DB->command.append("';"); 
     689           sqlite3_stmt * pStatement; 
     690 
     691           int32_t rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), \ 
     692                   -1, &pStatement, NULL); 
     693           if(rc == SQLITE_OK) {  
     694               if(sqlite3_step(pStatement) == SQLITE_ROW) 
     695                   data=((const char*) sqlite3_column_text(pStatement, 1));  
     696               else { 
     697                   LOG("3data_DB:: Data not yet in DB., %s\n", _data_DB->command.c_str()); 
     698                   rc = 31337; 
     699               } 
     700           }  
     701           else { 
     702               WARNING("data_DB:: Error executing SQL statement. rc = %i\n%s\n", \ 
     703                       rc,_data_DB->command.c_str()); 
     704           } 
     705 
     706           sqlite3_finalize(pStatement); 
     707           token = strtok((char *) data.c_str(), "@"); 
     708           token = strtok(NULL, "@"); 
     709           SendMessage(shellSocketFD, token); 
     710           token = strtok(NULL, "@"); 
     711           SendMessage(shellSocketFD, token); 
     712       } 
     713 
     714       return; 
    699715   }  
    700716 
    701     //If this is a service command and not a shell command... 
    702     //Transmission starting messages 
     717    /* If this is a service command and not a shell command... */ 
     718    /* Transmission starting messages */ 
    703719    SendMessage(miss[activeMission].services[sourceID].socketFD, "request_optimization_service"); 
    704     SendMessage(miss[activeMission].services[sourceID].socketFD, miss[activeMission].services[sourceID].name.c_str()); 
    705  
    706    //If the service takes a parameter, feed that parameter in 
     720    SendMessage(miss[activeMission].services[sourceID].socketFD, \ 
     721            miss[activeMission].services[sourceID].name.c_str()); 
     722 
     723   /* If the service takes a parameter, feed that parameter in 
    707724   if(!miss[activeMission].services[sourceID].parameter.empty()){ 
    708725    //printf("sending parameter!\n"); 
     
    714731 
    715732    //Load and transmit the input data 
     733    int i = 0; 
    716734    while(i < 10 && !miss[activeMission].services[sourceID].input[i].empty()){ 
    717735        _data_DB->command="select ";