Changeset 444
- Timestamp:
- 08/28/09 11:58:02 (15 years ago)
- Files:
-
- 1 modified
Legend:
- Unmodified
- Added
- Removed
-
vtcross/trunk/src/service_management_layer/ServiceManagementLayer.cpp
r443 r444 115 115 int32_t rc; /* sqlite command return code */ 116 116 117 _services_DB->command ="drop table ";117 _services_DB->command = "drop table "; 118 118 _services_DB->command.append(_services_DB->tablename); 119 119 rc = sqlite3_exec(_services_DB->db, _services_DB->command.c_str(), callback, 0, &errorMsg); … … 121 121 WARNING("ServiceManagementLayer::Destructor services 'drop table' error: %s\n", errorMsg); 122 122 123 _services_DB->command ="vacuum";123 _services_DB->command = "vacuum"; 124 124 rc = sqlite3_exec(_services_DB->db, _services_DB->command.c_str(), callback, 0, &errorMsg); 125 125 if((rc != SQLITE_OK) && (rc != 101)) … … 128 128 free(_services_DB); 129 129 130 _data_DB->command ="drop table ";130 _data_DB->command = "drop table "; 131 131 _data_DB->command.append(_data_DB->tablename); 132 132 rc = sqlite3_exec(_data_DB->db, _data_DB->command.c_str(), callback, 0, &errorMsg); … … 134 134 WARNING("ServiceManagementLayer::Destructor data 'drop table' error: %s\n", errorMsg); 135 135 136 _data_DB->command ="vacuum";136 _data_DB->command = "vacuum"; 137 137 rc = sqlite3_exec(_data_DB->db, _data_DB->command.c_str(), callback, 0, &errorMsg); 138 138 if((rc != SQLITE_OK) && (rc != 101)) … … 186 186 sqlite3_open(_services_DB->filename.c_str(), &(_services_DB->db)); 187 187 188 char *cols[] = {(char *)"ID_Num", (char *)"Service_Name"};188 char *cols[] = {(char *)"ID_Num", (char *)"Service_Name"}; 189 189 190 190 _services_DB->tablename="Services"; … … 242 242 sqlite3_open(_data_DB->filename.c_str(), &(_data_DB->db)); 243 243 244 char *cols[] = {(char *)"Tag", (char *)"Data"};245 246 _data_DB->tablename ="Data";247 _data_DB->command ="DROP TABLE IF EXISTS Data;";244 char *cols[] = {(char *)"Tag", (char *)"Data"}; 245 246 _data_DB->tablename = "Data"; 247 _data_DB->command = "DROP TABLE IF EXISTS Data;"; 248 248 249 249 rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), 128, &ppStmt, &pzTail); … … 258 258 259 259 /* Generate command */ 260 _data_DB->command ="CREATE TABLE ";260 _data_DB->command = "CREATE TABLE "; 261 261 _data_DB->command.append(_data_DB->tablename); 262 262 _data_DB->command.append("("); … … 293 293 /* CALLED BY: constructor 294 294 * INPUTS: |serverName| the IPv4 name of the server (127.0.0.1 for localhost) 295 * |serverPort| the port on the server to connect to295 * |serverPort| the port on the server to connect to 296 296 * OUTPUTS: <none> 297 297 * … … 323 323 324 324 if(ID != -1) 325 _FD = CE_List[ID].FD;325 _FD = CE_List[ID].FD; 326 326 else 327 _FD = shellSocketFD; 327 _FD = shellSocketFD; 328 328 329 ReadMessage(_FD, buffer); 329 //printf("MH_buffer = %s\n", buffer);330 330 331 331 //--------Policy Engine Stuff - no policy engine support in this version-------// … … 368 368 } 369 369 else if(strcmp(buffer, "register_engine_cognitive") == 0) { 370 370 RegisterCognitiveEngine(ID); 371 371 } 372 372 else if(strcmp(buffer, "register_service") == 0) { 373 ReceiveServices(ID);373 ReceiveServices(ID); 374 374 } 375 375 else if(strcmp(buffer, "send_component_type") == 0) { 376 SendComponentType();376 SendComponentType(); 377 377 } 378 378 else if(strcmp(buffer, "list_services") == 0) { 379 ListServices();379 ListServices(); 380 380 } 381 381 else if(strcmp(buffer, "set_active_mission") == 0) { 382 SetActiveMission();382 SetActiveMission(); 383 383 } 384 384 else if(strcmp(buffer, "request_optimization") == 0) { 385 PerformActiveMission();385 PerformActiveMission(); 386 386 } 387 387 else if(strcmp(buffer, "deregister_engine_cognitive") == 0) { 388 DeregisterCognitiveEngine(ID);388 DeregisterCognitiveEngine(ID); 389 389 } 390 390 else if(strcmp(buffer, "deregister_service") == 0) { 391 DeregisterServices(ID);391 DeregisterServices(ID); 392 392 } 393 393 } … … 431 431 SendMessage(shellSocketFD, "register_sml"); 432 432 LOG("ServiceManagementLayer:: Registration message sent.\n"); 433 //printf("SSFD = %d\n", shellSocketFD);434 433 } 435 434 … … 460 459 * for an "ack" message from the CE after every sent message 461 460 * to know when to stop communication. 462 */ 463 464 //Modified to check the incoming message buffer rather than the outgoing message buffer to avoid a portion of the delay 465 //May change this again to handle data more inteligently, taking advantage of it's properties. 461 * 462 * NOTE: Modified to check the incoming message buffer rather than the outgoing 463 * message buffer to avoid a portion of the delay. May change this again to handle 464 * data more inteligently, taking advantage of it's properties. 465 */ 466 466 void 467 467 ServiceManagementLayer::TransferRadioConfiguration(int32_t ID) 468 468 { 469 //printf("transRadConfig\n");470 469 struct timeval selTimeout; 471 470 fd_set sockSet; 472 471 int32_t rc = 1; 473 472 char buffer[256]; 474 //Send data until the CE sends an ACK message back 475 while(rc!=0){ 473 474 /* Send data until the CE sends an ACK message back */ 475 while(rc != 0) { 476 476 memset(buffer, 0, 256); 477 //Receive data from Shell 478 ReadMessage(shellSocketFD, buffer); 479 //printf("buffer = %s\n", buffer); 480 //Send data to CE 481 SendMessage(CE_List[ID].FD, buffer); 477 478 /* Receive data from Shell */ 479 ReadMessage(shellSocketFD, buffer); 480 481 /* Send data to CE */ 482 SendMessage(CE_List[ID].FD, buffer); 482 483 FD_ZERO(&sockSet); 483 484 FD_SET(shellSocketFD, &sockSet); 484 485 selTimeout.tv_sec = 0; 485 486 selTimeout.tv_usec = 5000; 486 //Check if there is a message on the shell ready to be processed 487 rc=select(shellSocketFD + 1, &sockSet, NULL, NULL, &selTimeout); 488 } 487 488 /* Check if there is a message on the shell ready to be processed */ 489 rc = select(shellSocketFD + 1, &sockSet, NULL, NULL, &selTimeout); 490 } 491 489 492 memset(buffer, 0, 256); 490 493 ReadMessage(CE_List[ID].FD, buffer); 491 494 SendMessage(shellSocketFD, buffer); 492 //printf("transfer done!\n");493 495 } 494 496 … … 499 501 * 500 502 * DESCRIPTION: Simmilar to TransferRadioConfig, just with Experience data 501 */ 502 503 //Modified to check the incoming message buffer rather than the outgoing message buffer to avoid a portion of the delay 504 //May change this again to handle data more inteligently, taking advantage of it's properties. 503 * 504 * NOTE: Modified to check the incoming message buffer rather than the outgoing 505 * message buffer to avoid a portion of the delay. May change this again to handle 506 * data more inteligently, taking advantage of it's properties. 507 */ 505 508 void 506 509 ServiceManagementLayer::TransferExperience(int32_t ID) … … 510 513 int32_t rc = 1; 511 514 char buffer[256]; 512 //Send data until the CE sends an ACK message back 513 while(rc!=0){ 514 //printf("transfering...\n"); 515 /* Send data until the CE sends an ACK message back */ 516 while(rc != 0) { 515 517 memset(buffer, 0, 256); 516 //Receive data from Shell 517 ReadMessage(shellSocketFD, buffer); 518 //printf("buffer = %s\n", buffer); 519 //Send data to CE 520 SendMessage(CE_List[ID].FD, buffer); 518 519 /* Receive data from Shell */ 520 ReadMessage(shellSocketFD, buffer); 521 522 /* Send data to CE */ 523 SendMessage(CE_List[ID].FD, buffer); 521 524 FD_ZERO(&sockSet); 522 525 FD_SET(shellSocketFD, &sockSet); 523 526 selTimeout.tv_sec = 0; 524 527 selTimeout.tv_usec = 5000; 525 //Check if there is a message on the shell ready to be processed 526 rc=select(shellSocketFD + 1, &sockSet, NULL, NULL, &selTimeout); 527 } 528 529 /* Check if there is a message on the shell ready to be processed */ 530 rc = select(shellSocketFD + 1, &sockSet, NULL, NULL, &selTimeout); 531 } 532 528 533 memset(buffer, 0, 256); 529 //printf("done trans exp!\n");530 534 ReadMessage(CE_List[ID].FD, buffer); 531 535 SendMessage(shellSocketFD, buffer); … … 544 548 memset(buffer, 0, 256); 545 549 ReadMessage(CE_List[ID].FD, buffer); 546 char* cols[] = {(char *)"ID_Num", (char *)"Service_Name"}; 547 //printf("RS_buffer = %s\n", buffer); 548 // generate command 549 _services_DB->command="insert into "; 550 551 char *cols[] = {(char *) "ID_Num", (char *) "Service_Name"}; 552 553 /* Generate command */ 554 _services_DB->command = "insert into "; 550 555 _services_DB->command.append(_services_DB->tablename); 551 556 _services_DB->command.append(" ("); … … 555 560 _services_DB->command.append(") "); 556 561 _services_DB->command.append(" values("); 562 557 563 char temp[3]; 558 memset(temp, 0,3);564 memset(temp, 0, 3); 559 565 sprintf(temp, "%d", ID); 566 560 567 _services_DB->command.append(temp); 561 568 _services_DB->command.append(", '"); … … 563 570 _services_DB->command.append("');"); 564 571 565 //printf("search command: %s\n", _services_DB->command); 566 // execute add command 572 /* Execute add command */ 567 573 char *errorMsg; 568 574 int rc = sqlite3_exec(_services_DB->db, _services_DB->command.c_str(), callback, 0, &errorMsg); 569 if( rc!=SQLITE_OK && rc!=101 ) 570 fprintf(stderr, "ServiceManagementLayer::RecieveServices DB Error %s\n", errorMsg); 571 /*sprintf(outBuffer, "SML: Registering service '%s' from component number '%d'", buffer, ID); 572 LOG(outBuffer);*/ 575 if((rc != SQLITE_OK) && (rc != 101)) 576 WARNING("ServiceManagementLayer::RecieveServices DB Error %s\n", errorMsg); 573 577 } 574 578 … … 577 581 * OUTPUTS: <none> 578 582 * 579 * DESCRIPTION: This method associates the services that components provide with the services that are requested in the mission 580 * Each service in the mission is given the ID and FD of a component that has registered to provide that service 581 * Deregistration is okay until this method is called without a reload, but if deregistration occurs after this 583 * DESCRIPTION: This method associates the services that components provide with the 584 * services that are requested in the mission. Each service in the mission is given 585 * the ID and FD of a component that has registered to provide that service. Deregistration 586 * is okay until this method is called without a reload, but if deregistration occurs after this 582 587 * method is called it needs to be called again even if other engines also provide the services 583 588 */ … … 588 593 memset(buffer, 0, 256); 589 594 ReadMessage(shellSocketFD, buffer); 595 590 596 uint32_t missID = atoi(buffer); 591 for(activeMission = 0; activeMission < 10; activeMission++) 592 { 593 //Find the active mission by comparing mission ID's 594 if(miss[activeMission].missionID == missID) 595 break; 596 } 597 598 LOG("ServiceManagementLayer:: Received Set Active Mission command: %i.\n",missID); 599 //For each service in the mission 600 for(uint16_t i = 0; i < miss[activeMission].numServices; i++) 601 { 602 //Check whether the current service is an actual service or a conditional 603 if(miss[activeMission].services[i].name.compare("if") && miss[activeMission].services[i].name.compare("dowhile") && \ 604 miss[activeMission].services[i].name.compare("shell")){ 605 //If it is a service, search the database of registered services to find the ID of the component that registered it 606 _services_DB->command="select "; 607 _services_DB->command.append(_services_DB->tablename); 608 _services_DB->command.append(".* from "); 609 _services_DB->command.append( _services_DB->tablename); 610 _services_DB->command.append(" where Service_Name=='"); 611 _services_DB->command.append(miss[activeMission].services[i].name); 612 _services_DB->command.append("';"); 597 for(activeMission = 0; activeMission < 10; activeMission++) { 598 /* Find the active mission by comparing mission ID's */ 599 if(miss[activeMission].missionID == missID) 600 break; 601 } 602 603 LOG("ServiceManagementLayer:: Received Set Active Mission command: %i.\n", missID); 604 605 /* For each service in the mission */ 606 for(size_t i = 0; i < miss[activeMission].numServices; i++) { 607 /* Check whether the current service is an actual service or a conditional */ 608 if(miss[activeMission].services[i].name.compare("if") && \ 609 miss[activeMission].services[i].name.compare("dowhile") && \ 610 miss[activeMission].services[i].name.compare("shell")) { 611 /* If it is a service, search the database of registered services to find 612 * the ID of the component that registered it */ 613 _services_DB->command="select "; 614 _services_DB->command.append(_services_DB->tablename); 615 _services_DB->command.append(".* from "); 616 _services_DB->command.append( _services_DB->tablename); 617 _services_DB->command.append(" where Service_Name=='"); 618 _services_DB->command.append(miss[activeMission].services[i].name); 619 _services_DB->command.append("';"); 613 620 614 sqlite3_stmt * pStatement; 615 int rc = sqlite3_prepare_v2(_services_DB->db, _services_DB->command.c_str(), -1, &pStatement, NULL); 616 if (rc == SQLITE_OK){ 617 if (sqlite3_step(pStatement) == SQLITE_ROW) 618 miss[activeMission].services[i].componentID = sqlite3_column_int(pStatement, 0); 619 else { 620 printf("services_DB:: Mission requires service %s not provided by any connected component.\n",miss[activeMission].services[i].name.c_str()); 621 rc=31337; 622 } 623 } else { 624 printf("services_DB:: Error executing SQL statement. rc = %i\n%s\n",rc,_services_DB->command.c_str()); 625 } 626 //printf("s_name=%s\n",miss[activeMission].services[i].name.c_str()); 627 sqlite3_finalize(pStatement); 628 miss[activeMission].services[i].socketFD = CE_List[miss[activeMission].services[i].componentID].FD; 629 //Set the FD and ID of the service to refer to the component where the service exists 630 } 631 //Nothing to be done for conditionals at this stage 621 sqlite3_stmt * pStatement; 622 int32_t rc = sqlite3_prepare_v2(_services_DB->db, _services_DB->command.c_str(), \ 623 -1, &pStatement, NULL); 624 if(rc == SQLITE_OK) { 625 if (sqlite3_step(pStatement) == SQLITE_ROW) 626 miss[activeMission].services[i].componentID = sqlite3_column_int(pStatement, 0); 627 else { 628 WARNING("services_DB:: Mission requires service %s ", \ 629 miss[activeMission].services[i].name.c_str()); 630 WARNING("not provided by any connected component.\n") 631 rc = 31337; 632 } 633 } else { 634 WARNING("services_DB:: Error executing SQL statement. rc = %i\n%s\n", \ 635 rc, _services_DB->command.c_str()); 636 } 637 638 sqlite3_finalize(pStatement); 639 miss[activeMission].services[i].socketFD = \ 640 CE_List[miss[activeMission].services[i].componentID].FD; 641 } 642 /* TODO Nothing to be done for conditionals at this stage */ 632 643 } 633 644 634 645 SendMessage(shellSocketFD, "ack"); 635 646 LOG("ServiceManagementLayer:: Done setting active mission.\n"); 636 //printf("\nhere ---%d, %d---\n", miss[activeMission].services[0].componentID, miss[activeMission].services[1].componentID);637 647 } 638 648 … … 642 652 * 643 653 * DESCRIPTION: This is a helper method for the "PerformActiveMission" function 644 * NOTE: This function has changed durrastically from the previous implementation 645 * Takes an ID of a service 646 * For that service, finds inputs in DB and forwords those on to the engine after sending comm-starting messages 647 * Afterwords, listenes for the outputs so that it can store those in the database for future services or the overall output 654 * NOTE: This function has changed drastically from the previous implementation 655 * 656 * Takes an ID of a service. For that service, finds inputs in DB and forwords 657 * those on to the engine after sending comm-starting messages. Afterwords, listenes 658 * for the outputs so that it can store those in the database for future services or 659 * the overall output 648 660 */ 649 661 void 650 662 ServiceManagementLayer::TransactData(int32_t sourceID) 651 663 { 652 // LOG("ServiceManagementLayer:: Data transaction occuring.\n");653 664 char buffer[256]; 654 665 std::string data; 655 char* cols[] = {(char *)"Tag", (char *)"Data"}; 656 int i = 0; 666 char *cols[] = {(char *) "Tag", (char *) "Data"}; 657 667 char *token; 658 668 659 //Send a message directly to the shell 660 if(miss[activeMission].services[sourceID].name.find("shell")!=string::npos) 661 { 662 int k = 0; 663 shellFound=true; 664 while(k<10 && !miss[activeMission].input[k].empty()){ 665 k++; 666 } 667 sprintf(buffer, "%d", k); 668 SendMessage(shellSocketFD, buffer); 669 for(int t = 0; t < k; t++){ 670 memset(buffer, 0 , 256); 671 _data_DB->command="select "; 669 /* Send a message directly to the shell */ 670 if(miss[activeMission].services[sourceID].name.find("shell") != string::npos) { 671 shellFound=true; 672 673 int32_t k = 0; 674 while((k < 10) && (!miss[activeMission].input[k].empty())) { 675 k++; 676 } 677 678 sprintf(buffer, "%d", k); 679 SendMessage(shellSocketFD, buffer); 680 for(int32_t t = 0; t < k; t++) { 681 memset(buffer, 0 , 256); 682 _data_DB->command="select "; 672 683 _data_DB->command.append(_data_DB->tablename); 673 _data_DB->command.append(".* from "); 674 _data_DB->command.append(_data_DB->tablename); 675 _data_DB->command.append(" where Tag=='"); 676 _data_DB->command.append(miss[activeMission].input[t]); 677 _data_DB->command.append("';"); 678 sqlite3_stmt * pStatement; 679 int rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), -1, &pStatement, NULL); 680 if (rc == SQLITE_OK){ 681 if (sqlite3_step(pStatement) == SQLITE_ROW) 682 data=((const char*) sqlite3_column_text(pStatement, 1)); 683 else { 684 printf("3data_DB:: Data not yet in DB., %s\n", _data_DB->command.c_str()); 685 rc=31337; 686 } 687 } 688 else { 689 printf("data_DB:: Error executing SQL statement. rc = %i\n%s\n",rc,_data_DB->command.c_str()); 690 } 691 sqlite3_finalize(pStatement); 692 token = strtok((char *)data.c_str(), "@"); 693 token = strtok(NULL, "@"); 694 SendMessage(shellSocketFD, token); 695 token = strtok(NULL, "@"); 696 SendMessage(shellSocketFD, token); 697 } 698 return; 684 _data_DB->command.append(".* from "); 685 _data_DB->command.append(_data_DB->tablename); 686 _data_DB->command.append(" where Tag=='"); 687 _data_DB->command.append(miss[activeMission].input[t]); 688 _data_DB->command.append("';"); 689 sqlite3_stmt * pStatement; 690 691 int32_t rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), \ 692 -1, &pStatement, NULL); 693 if(rc == SQLITE_OK) { 694 if(sqlite3_step(pStatement) == SQLITE_ROW) 695 data=((const char*) sqlite3_column_text(pStatement, 1)); 696 else { 697 LOG("3data_DB:: Data not yet in DB., %s\n", _data_DB->command.c_str()); 698 rc = 31337; 699 } 700 } 701 else { 702 WARNING("data_DB:: Error executing SQL statement. rc = %i\n%s\n", \ 703 rc,_data_DB->command.c_str()); 704 } 705 706 sqlite3_finalize(pStatement); 707 token = strtok((char *) data.c_str(), "@"); 708 token = strtok(NULL, "@"); 709 SendMessage(shellSocketFD, token); 710 token = strtok(NULL, "@"); 711 SendMessage(shellSocketFD, token); 712 } 713 714 return; 699 715 } 700 716 701 / /If this is a service command and not a shell command...702 / /Transmission starting messages717 /* If this is a service command and not a shell command... */ 718 /* Transmission starting messages */ 703 719 SendMessage(miss[activeMission].services[sourceID].socketFD, "request_optimization_service"); 704 SendMessage(miss[activeMission].services[sourceID].socketFD, miss[activeMission].services[sourceID].name.c_str()); 705 706 //If the service takes a parameter, feed that parameter in 720 SendMessage(miss[activeMission].services[sourceID].socketFD, \ 721 miss[activeMission].services[sourceID].name.c_str()); 722 723 /* If the service takes a parameter, feed that parameter in 707 724 if(!miss[activeMission].services[sourceID].parameter.empty()){ 708 725 //printf("sending parameter!\n"); … … 714 731 715 732 //Load and transmit the input data 733 int i = 0; 716 734 while(i < 10 && !miss[activeMission].services[sourceID].input[i].empty()){ 717 735 _data_DB->command="select ";