root/vtcross/trunk/src/service_management_layer/ServiceManagementLayer.cpp @ 443

Revision 443, 68.9 KB (checked in by bhilburn, 15 years ago)

Code cleanup in the sqlite DB creation functions.

Line 
1/*
2 Copyright 2009 Virginia Polytechnic Institute and State University 
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7 
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17/* Inter-component communication handled by sockets and FD's. 
18 * Server support has been completely implemented and tested.
19 *
20 * Services are stored in a SQLite DB by the ID of the CE that registered them.  Service
21 * support has been completely implemented and tested.
22 *
23 * Missions are loaded from an XML file, connected with services provided by components,
24 * and run.  See the documentation for the "PerformActiveMission" below for important
25 * info.
26 */
27
28
29#include <cmath>
30#include <cstdio>
31#include <cstdlib>
32#include <cstring>
33#include <stdint.h>
34
35#include <arpa/inet.h>
36#include <iostream>
37#include <netinet/in.h>
38#include <netdb.h>
39#include <fcntl.h>
40#include <sqlite3.h>
41#include <string>
42#include <sys/ioctl.h>
43#include <sys/mman.h>
44#include <sys/socket.h>
45#include <sys/types.h>
46#include <sys/wait.h>
47
48#include "tinyxml/tinyxml.h"
49#include "tinyxml/tinystr.h"
50
51#include "vtcross/debug.h"
52#include "vtcross/error.h"
53#include "vtcross/common.h"
54#include "vtcross/components.h"
55#include "vtcross/containers.h"
56#include "vtcross/socketcomm.h"
57
58
59typedef struct services_s *services_DB;
60typedef struct data_s *data_DB;
61
62using namespace std;
63
64struct services_s {
65    string filename;
66    string tablename;
67    string command;
68    sqlite3 *db;
69    unsigned int num_columns;
70};
71
72struct data_s {
73    string filename;
74    string tablename;
75    string command;
76    sqlite3 *db;
77    unsigned int num_columns;
78};
79
80services_DB _services_DB;
81data_DB _data_DB;
82string _SML_Config;
83bool shellFound;
84
85/* Callback function used internally by some of the SQLite3 commands */
86int32_t
87callback(void *notUsed, int32_t argc, char **argv, char **azColName)
88{
89    for(size_t i = 0; i < argc; i++) {
90        printf("%s = %s\n", azColName[i], argv[i] ? argv[i] : "NULL");
91    }
92
93    printf("\n");
94    return 0;
95}
96
97ServiceManagementLayer::ServiceManagementLayer()
98{
99    LOG("Creating Service Management Layer.\n");
100
101    shellSocketFD = -1;
102    numberOfCognitiveEngines = 0;
103    CE_Present = false;
104    cogEngSrv = 1;
105}
106
107/* Free and clear the DB's associated with this SML in the destructor.
108 *
109 * Note that exiting with an error condition will cause SML to not be destructed,
110 * resulting in the DB's staying in memory until the destructor is encountered in
111 * future executions. */
112ServiceManagementLayer::~ServiceManagementLayer()
113{
114    char *errorMsg;
115    int32_t rc;         /* sqlite command return code */
116
117    _services_DB->command="drop table ";
118    _services_DB->command.append(_services_DB->tablename);
119    rc = sqlite3_exec(_services_DB->db, _services_DB->command.c_str(), callback, 0, &errorMsg);
120    if((rc != SQLITE_OK) && (rc != 101))
121        WARNING("ServiceManagementLayer::Destructor services 'drop table' error: %s\n", errorMsg);
122
123    _services_DB->command="vacuum";
124    rc = sqlite3_exec(_services_DB->db, _services_DB->command.c_str(), callback, 0, &errorMsg);
125    if((rc != SQLITE_OK) && (rc != 101))
126        WARNING("ServiceManagementLayer::Destructor services 'vacuum' error: %s\n", errorMsg);
127
128    free(_services_DB);
129
130    _data_DB->command="drop table ";
131    _data_DB->command.append(_data_DB->tablename);
132    rc = sqlite3_exec(_data_DB->db, _data_DB->command.c_str(), callback, 0, &errorMsg);
133    if((rc != SQLITE_OK) && (rc != 101))
134        WARNING("ServiceManagementLayer::Destructor data 'drop table' error: %s\n", errorMsg);
135
136    _data_DB->command="vacuum";
137    rc = sqlite3_exec(_data_DB->db, _data_DB->command.c_str(), callback, 0, &errorMsg);
138    if((rc != SQLITE_OK) && (rc != 101))
139        WARNING("ServiceManagementLayer::Destructor data 'vacuum' error: %s\n", errorMsg);
140
141    free(_data_DB);
142}
143
144/* Note that sizes of CE_List, miss, and service are hardcoded for now.
145 * Also, their sizes are hardcoded into the code in various places; a fix
146 * for a future version. */
147ServiceManagementLayer::ServiceManagementLayer(const char* SML_Config, \
148    const char* serverName, const char* serverPort, int16_t clientPort)
149{
150    LOG("Creating Service Management Layer.\n");
151
152    _SML_Config = string(SML_Config);
153    SMLport = clientPort;
154
155    ConnectToShell(serverName, serverPort);
156    CE_List = new CE_Reg[10];
157
158    miss = new Mission[10];
159    for(size_t i = 0; i < 10; i++) {
160        miss[i].services = new Service[30];
161    }
162
163    Current_ID = 0;
164
165    LoadConfiguration(SML_Config, miss);
166
167    CreateServicesDB();
168    CreateDataDB();
169}
170
171/* CALLED BY: constructor
172 * INPUTS: <none>
173 * OUTPUTS: <none>
174 *
175 * DESCRIPTION: Create and initialize a DB to hold the services registered by components
176 */
177void
178ServiceManagementLayer::CreateServicesDB()
179{
180    sqlite3_stmt *ppStmt;   /* OUT: Statement handle */
181    const char *pzTail;     /* OUT: Pointer to unused portion of zSql */
182    int32_t rc;             /* sqlite command return code */
183
184    _services_DB = new services_s;
185    _services_DB->filename="Services_Table";
186    sqlite3_open(_services_DB->filename.c_str(), &(_services_DB->db));
187
188    char* cols[] = {(char *)"ID_Num", (char *)"Service_Name"};
189
190    _services_DB->tablename="Services";
191
192    /* If program execution ends in anything other than a ordered shutdown, DB's will still
193     * be there for next run. Need to get rid of it so that old data isn't inadvertantly
194     * used in the next execution cycle. */
195    _services_DB->command = "DROP TABLE IF EXISTS Services;";     
196
197    rc = sqlite3_prepare_v2(_services_DB->db, _services_DB->command.c_str(), 128, &ppStmt, &pzTail);
198    if((rc != SQLITE_OK) && (rc != 101))
199        WARNING("ServiceManagementLayer::CreateServicesDB 'prepare_stmt' error %d\n", rc);
200
201    rc = sqlite3_step(ppStmt);
202    if((rc != SQLITE_OK) && (rc != 101))
203        WARNING("ServiceManagementLayer::CreateServicesDB 'step' error\n");
204
205    _services_DB->num_columns = 2;
206
207    /* Generate command */
208    _services_DB->command="CREATE TABLE ";
209    _services_DB->command.append(_services_DB->tablename);
210    _services_DB->command.append("(");
211    _services_DB->command.append(cols[0]);
212    _services_DB->command.append(" INT, ");
213    _services_DB->command.append(cols[1]);
214    _services_DB->command.append(" TEXT);");
215
216    /* Execute create table command */
217    rc = sqlite3_prepare_v2(_services_DB->db, _services_DB->command.c_str(), 128, &ppStmt, &pzTail);
218    if((rc != SQLITE_OK) && (rc != 101))
219        WARNING("ServiceManagementLayer::CreateServicesDB 'prepare_stmt' error %d\n", rc);
220
221    rc = sqlite3_step(ppStmt);
222    if((rc != SQLITE_OK) && (rc != 101))
223        WARNING("ServiceManagementLayer::CreateServicesDB 'step' error\n");
224}
225
226/* CALLED BY: constructor
227 * INPUTS: <none>
228 * OUTPUTS: <none>
229 *
230 * DESCRIPTION: Create and initialize a DB to hold the data sent by components
231 */
232void
233ServiceManagementLayer::CreateDataDB()
234{
235    sqlite3_stmt *ppStmt;   /* OUT: Statement handle */
236    const char *pzTail;     /* OUT: Pointer to unused portion of zSql */
237    int32_t rc;             /* sqlite command return code */
238
239    _data_DB = new data_s;
240
241    _data_DB->filename="Data_Table";
242    sqlite3_open(_data_DB->filename.c_str(), &(_data_DB->db));
243
244    char* cols[] = {(char *)"Tag", (char *)"Data"};
245
246    _data_DB->tablename="Data";
247    _data_DB->command="DROP TABLE IF EXISTS Data;";     
248
249    rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), 128, &ppStmt, &pzTail);
250    if((rc != SQLITE_OK) && (rc != 101))
251        WARNING("ServiceManagementLayer::CreateDataDB 'prepare_stmt' error %d\n", rc);
252
253    rc = sqlite3_step(ppStmt);
254    if((rc != SQLITE_OK) && (rc != 101))
255        WARNING("ServiceManagementLayer::CreateDataDB 'step' error\n");
256
257    _data_DB->num_columns = 2;
258
259    /* Generate command */
260    _data_DB->command="CREATE TABLE ";
261    _data_DB->command.append(_data_DB->tablename);
262    _data_DB->command.append("(");
263    _data_DB->command.append(cols[0]);
264
265    /* First column is the name of the data (corresponding to the name of the output/input pair)
266     * It is the primary key so any subsequent data with the same name will replace the row. */
267    _data_DB->command.append(" TEXT PRIMARY KEY ON CONFLICT REPLACE, ");
268    _data_DB->command.append(cols[1]);
269    _data_DB->command.append(" TEXT);");
270
271    rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), 128, &ppStmt, &pzTail);
272    if((rc != SQLITE_OK) && (rc != 101))
273        WARNING("ServiceManagementLayer::CreateDataDB 'prepare_stmt' error %d\n", rc);
274
275    rc = sqlite3_step(ppStmt);
276    if((rc != SQLITE_OK) && (rc != 101))
277        WARNING("ServiceManagementLayer::CreateDataDB 'step' error\n");
278}
279
280/* CALLED BY: MessageHandler
281 * INPUTS: <none>
282 * OUTPUTS: <none>
283 *
284 * DESCRIPTION: Sends a message identifying this component as an SML to the Shell
285 */
286void
287ServiceManagementLayer::SendComponentType()
288{
289    SendMessage(shellSocketFD, "response_sml");
290    LOG("SML responded to GetRemoteComponentType query.\n");
291}
292
293/* CALLED BY: constructor
294 * INPUTS: |serverName| the IPv4 name of the server (127.0.0.1 for localhost)
295 *       |serverPort| the port on the server to connect to
296 * OUTPUTS: <none>
297 *
298 * DESCRIPTION: Connecting to the shell takes 2 steps
299 * 1) Establish a client socket for communication
300 * 2) Run the initial Registration/handshake routine
301 */
302void
303ServiceManagementLayer::ConnectToShell(const char* serverName, \
304        const char* serverPort)
305{
306    shellSocketFD = ClientSocket(serverName, serverPort);
307    RegisterComponent();
308}
309
310/* CALLED BY: StartSMLServer
311 * INPUTS: |ID| The ID number of the CE that has a message wating
312 * OUTPUTS: <none>
313 *
314 * DESCRIPTION: Called whenever a socket is identified as being ready for communication
315 *              This funciton reads the message and calls the appropriate helper
316 */
317void
318ServiceManagementLayer::MessageHandler(int32_t ID)
319{
320    char buffer[256];   
321    memset(buffer, 0, 256); 
322    int32_t _FD; 
323   
324    if(ID != -1)
325    _FD = CE_List[ID].FD;
326    else
327    _FD = shellSocketFD;
328    ReadMessage(_FD, buffer);
329    //printf("MH_buffer = %s\n", buffer);
330   
331    //--------Policy Engine Stuff - no policy engine support in this version-------//
332
333    //printf("********* %s **********\n", buffer);
334    // TODO
335    // If we send integer op codes rather than strings, this process will be
336    // MUCH faster since instead of donig string compares we can simply
337    // switch on the integer value...
338    /*if(strcmp(buffer, "register_service") == 0) {
339        if(strcmp(buffer, "policy_geo") == 0) {
340        }
341        else if(strcmp(buffer, "policy_time") == 0) {
342        }
343        else if(strcmp(buffer, "policy_spectrum") == 0) {
344        }
345        else if(strcmp(buffer, "policy_spacial") == 0) {
346        }
347    }
348    else if(strcmp(buffer, "deregister_service") == 0) {
349        if(strcmp(buffer, "policy_geo") == 0) {
350        }
351        else if(strcmp(buffer, "policy_time") == 0) {
352        }
353        else if(strcmp(buffer, "policy_spectrum") == 0) {
354        }
355        else if(strcmp(buffer, "policy_spacial") == 0) {
356        }
357    }*/
358
359    //Go down the list to call the appropriate function
360    if(strcmp(buffer, "query_component_type") == 0) {
361        SendComponentType();
362    }
363    else if(strcmp(buffer, "reset_sml") == 0) {
364        Reset();
365    }
366    else if(strcmp(buffer, "shutdown_sml") == 0) {
367        Shutdown();
368    }
369    else if(strcmp(buffer, "register_engine_cognitive") == 0) {
370         RegisterCognitiveEngine(ID);
371    }
372    else if(strcmp(buffer, "register_service") == 0) {
373     ReceiveServices(ID);
374    }
375    else if(strcmp(buffer, "send_component_type") == 0) {
376     SendComponentType();
377    }
378    else if(strcmp(buffer, "list_services") == 0) {
379     ListServices();
380    }
381    else if(strcmp(buffer, "set_active_mission") == 0) {
382    SetActiveMission();
383    }
384    else if(strcmp(buffer, "request_optimization") == 0) {
385    PerformActiveMission();
386    }
387    else if(strcmp(buffer, "deregister_engine_cognitive") == 0) {
388    DeregisterCognitiveEngine(ID);
389    }
390    else if(strcmp(buffer, "deregister_service") == 0) {
391    DeregisterServices(ID);
392    }
393}
394
395//TODO Finish
396/* CALLED BY: MessageHandler
397 * INPUTS: <none>
398 * OUTPUTS: <none>
399 *
400 * DESCRIPTION: Deregisters the component from the Shell.
401 */
402void
403ServiceManagementLayer::Shutdown()
404{
405    DeregisterComponent();
406}
407
408//TODO Finish
409/* CALLED BY: MessageHandler
410 * INPUTS: <none>
411 * OUTPUTS: <none>
412 *
413 * DESCRIPTION: Deregisters the component from the Shell
414 */
415void
416ServiceManagementLayer::Reset()
417{
418    DeregisterComponent();
419    ReloadConfiguration();
420}
421
422/* CALLED BY: ConnectToShell
423 * INPUTS: <none>
424 * OUTPUTS: <none>
425 *
426 * DESCRIPTION: Sends the registration message to the Shell
427 */
428void
429ServiceManagementLayer::RegisterComponent()
430{
431    SendMessage(shellSocketFD, "register_sml");
432    LOG("ServiceManagementLayer:: Registration message sent.\n");
433    //printf("SSFD = %d\n", shellSocketFD);
434}
435
436/* CALLED BY: Shutdown
437 * INPUTS: <none>
438 * OUTPUTS: <none>
439 *
440 * DESCRIPTION: Closes the client socket with the shell, sends a deregstration message
441 */
442void
443ServiceManagementLayer::DeregisterComponent()
444{
445    SendMessage(shellSocketFD, "deregister_sml");
446    LOG("ServiceManagementLayer:: Deregistration message sent.\n");
447
448    shutdown(shellSocketFD, 2);
449    close(shellSocketFD);
450    shellSocketFD = -1;
451    LOG("ServiceManagementLayer:: Shell socket closed.\n");
452}
453
454
455/* CALLED BY: RegisterCognitiveEngine
456 * INPUTS: |ID| The ID number of the component where the data is to be transfered to
457 * OUTPUTS: <none>
458 *
459 * DESCRIPTION: Streams config data directly from the shell to the CE, and checks
460 * for an "ack" message from the CE after every sent message
461 * to know when to stop communication.
462 */
463
464//Modified to check the incoming message buffer rather than the outgoing message buffer to avoid a portion of the delay
465//May change this again to handle data more inteligently, taking advantage of it's properties.
466void
467ServiceManagementLayer::TransferRadioConfiguration(int32_t ID)
468{
469    //printf("transRadConfig\n");
470    struct timeval selTimeout;
471    fd_set sockSet;
472    int32_t rc = 1;
473    char buffer[256];
474    //Send data until the CE sends an ACK message back
475    while(rc!=0){
476        memset(buffer, 0, 256);
477    //Receive data from Shell
478    ReadMessage(shellSocketFD, buffer);
479    //printf("buffer = %s\n", buffer);
480    //Send data to CE
481    SendMessage(CE_List[ID].FD, buffer);
482        FD_ZERO(&sockSet);
483        FD_SET(shellSocketFD, &sockSet);
484        selTimeout.tv_sec = 0;
485        selTimeout.tv_usec = 5000;
486        //Check if there is a message on the shell ready to be processed
487        rc=select(shellSocketFD + 1, &sockSet, NULL, NULL, &selTimeout);
488    }
489    memset(buffer, 0, 256);
490    ReadMessage(CE_List[ID].FD, buffer);
491    SendMessage(shellSocketFD, buffer);
492    //printf("transfer done!\n");
493}
494
495
496/* CALLED BY: RegisterCognitiveEngine
497 * INPUTS: |ID| The ID number of the component where the data is to be transfered to
498 * OUTPUTS: <none>
499 *
500 * DESCRIPTION: Simmilar to TransferRadioConfig, just with Experience data
501 */
502
503//Modified to check the incoming message buffer rather than the outgoing message buffer to avoid a portion of the delay
504//May change this again to handle data more inteligently, taking advantage of it's properties.
505void
506ServiceManagementLayer::TransferExperience(int32_t ID)
507{
508    struct timeval selTimeout;
509    fd_set sockSet;
510    int32_t rc = 1;
511    char buffer[256];
512    //Send data until the CE sends an ACK message back
513    while(rc!=0){
514    //printf("transfering...\n");
515        memset(buffer, 0, 256);
516    //Receive data from Shell
517    ReadMessage(shellSocketFD, buffer);
518    //printf("buffer = %s\n", buffer);
519    //Send data to CE
520    SendMessage(CE_List[ID].FD, buffer);
521        FD_ZERO(&sockSet);
522        FD_SET(shellSocketFD, &sockSet);
523        selTimeout.tv_sec = 0;
524        selTimeout.tv_usec = 5000;
525        //Check if there is a message on the shell ready to be processed
526        rc=select(shellSocketFD + 1, &sockSet, NULL, NULL, &selTimeout);
527    }
528    memset(buffer, 0, 256);
529    //printf("done trans exp!\n");
530    ReadMessage(CE_List[ID].FD, buffer);
531    SendMessage(shellSocketFD, buffer);
532}
533
534/* CALLED BY: MessageHandler
535 * INPUTS: |ID| The ID number of the component where service is located
536 * OUTPUTS: <none>
537 *
538 * DESCRIPTION: Inserts a service into the DB with the ID of the component where it exists
539 */
540void
541ServiceManagementLayer::ReceiveServices(int32_t ID)
542{
543    char buffer[256];
544    memset(buffer, 0, 256);
545    ReadMessage(CE_List[ID].FD, buffer);
546    char* cols[] = {(char *)"ID_Num", (char *)"Service_Name"};
547    //printf("RS_buffer = %s\n", buffer);
548    // generate command
549    _services_DB->command="insert into ";
550    _services_DB->command.append(_services_DB->tablename);
551    _services_DB->command.append(" (");
552    _services_DB->command.append(cols[0]);
553    _services_DB->command.append(", ");
554    _services_DB->command.append(cols[1]);
555    _services_DB->command.append(") ");
556    _services_DB->command.append(" values(");
557    char temp[3];
558    memset(temp,0,3);
559    sprintf(temp, "%d", ID);
560    _services_DB->command.append(temp);
561    _services_DB->command.append(", '");
562    _services_DB->command.append(buffer);
563    _services_DB->command.append("');");
564   
565    //printf("search command: %s\n", _services_DB->command);
566    // execute add command
567    char *errorMsg;
568    int rc = sqlite3_exec(_services_DB->db, _services_DB->command.c_str(), callback, 0, &errorMsg);
569    if( rc!=SQLITE_OK && rc!=101 )
570        fprintf(stderr, "ServiceManagementLayer::RecieveServices DB Error %s\n", errorMsg);
571    /*sprintf(outBuffer, "SML: Registering service '%s' from component number '%d'", buffer, ID);
572    LOG(outBuffer);*/
573}
574
575/* CALLED BY: MessageHandler
576 * INPUTS: <none>
577 * OUTPUTS: <none>
578 *
579 * DESCRIPTION: This method associates the services that components provide with the services that are requested in the mission
580 * Each service in the mission is given the ID and FD of a component that has registered to provide that service
581 * Deregistration is okay until this method is called without a reload, but if deregistration occurs after this
582 * method is called it needs to be called again even if other engines also provide the services
583 */
584void
585ServiceManagementLayer::SetActiveMission()
586{
587    char buffer[256];
588    memset(buffer, 0, 256);
589    ReadMessage(shellSocketFD, buffer);
590    uint32_t missID = atoi(buffer);
591    for(activeMission = 0; activeMission < 10; activeMission++)
592    {
593    //Find the active mission by comparing mission ID's
594    if(miss[activeMission].missionID == missID)
595        break;
596    }
597
598    LOG("ServiceManagementLayer:: Received Set Active Mission command: %i.\n",missID);
599    //For each service in the mission
600    for(uint16_t i = 0; i < miss[activeMission].numServices; i++)
601    {   
602    //Check whether the current service is an actual service or a conditional
603    if(miss[activeMission].services[i].name.compare("if") && miss[activeMission].services[i].name.compare("dowhile") && \
604       miss[activeMission].services[i].name.compare("shell")){
605        //If it is a service, search the database of registered services to find the ID of the component that registered it
606            _services_DB->command="select ";
607           _services_DB->command.append(_services_DB->tablename);
608            _services_DB->command.append(".* from ");
609            _services_DB->command.append( _services_DB->tablename);
610            _services_DB->command.append(" where Service_Name=='");
611        _services_DB->command.append(miss[activeMission].services[i].name);
612        _services_DB->command.append("';");
613   
614            sqlite3_stmt * pStatement;
615            int rc = sqlite3_prepare_v2(_services_DB->db, _services_DB->command.c_str(), -1, &pStatement, NULL);
616            if (rc == SQLITE_OK){
617                if (sqlite3_step(pStatement) == SQLITE_ROW)
618                 miss[activeMission].services[i].componentID =  sqlite3_column_int(pStatement, 0);
619                else {
620                printf("services_DB:: Mission requires service %s not provided by any connected component.\n",miss[activeMission].services[i].name.c_str());
621            rc=31337;
622            }
623             } else {
624            printf("services_DB:: Error executing SQL statement. rc = %i\n%s\n",rc,_services_DB->command.c_str());
625            }
626        //printf("s_name=%s\n",miss[activeMission].services[i].name.c_str());
627            sqlite3_finalize(pStatement);
628        miss[activeMission].services[i].socketFD = CE_List[miss[activeMission].services[i].componentID].FD;
629        //Set the FD and ID of the service to refer to the component where the service exists
630    }
631    //Nothing to be done for conditionals at this stage
632    }
633 
634    SendMessage(shellSocketFD, "ack");
635    LOG("ServiceManagementLayer:: Done setting active mission.\n");
636    //printf("\nhere ---%d, %d---\n", miss[activeMission].services[0].componentID, miss[activeMission].services[1].componentID);
637}
638
639/* CALLED BY: PerformActiveMission
640 * INPUTS: |sourceID| ID of the service that is being processed
641 * OUTPUTS: <none>
642 *
643 * DESCRIPTION: This is a helper method for the "PerformActiveMission" function
644 * NOTE: This function has changed durrastically from the previous implementation
645 * Takes an ID of a service
646 * For that service, finds inputs in DB and forwords those on to the engine after sending comm-starting messages
647 * Afterwords, listenes for the outputs so that it can store those in the database for future services or the overall output
648 */
649void
650ServiceManagementLayer::TransactData(int32_t sourceID)
651{
652   // LOG("ServiceManagementLayer:: Data transaction occuring.\n");
653    char buffer[256];
654    std::string data;
655    char* cols[] = {(char *)"Tag", (char *)"Data"};
656    int i = 0;
657    char *token;
658
659   //Send a message directly to the shell
660   if(miss[activeMission].services[sourceID].name.find("shell")!=string::npos)
661   {
662    int k = 0;
663    shellFound=true;
664    while(k<10 && !miss[activeMission].input[k].empty()){
665        k++;
666    }
667    sprintf(buffer, "%d", k);
668    SendMessage(shellSocketFD, buffer);
669    for(int t = 0; t < k; t++){
670        memset(buffer, 0 , 256);
671            _data_DB->command="select ";
672           _data_DB->command.append(_data_DB->tablename);
673            _data_DB->command.append(".* from ");
674            _data_DB->command.append(_data_DB->tablename);
675            _data_DB->command.append(" where Tag=='");
676        _data_DB->command.append(miss[activeMission].input[t]);
677        _data_DB->command.append("';");
678            sqlite3_stmt * pStatement;
679            int rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), -1, &pStatement, NULL);
680            if (rc == SQLITE_OK){
681                if (sqlite3_step(pStatement) == SQLITE_ROW)
682                data=((const char*) sqlite3_column_text(pStatement, 1));
683                else {
684                printf("3data_DB:: Data not yet in DB., %s\n", _data_DB->command.c_str());
685            rc=31337;
686            }
687            }
688        else {
689            printf("data_DB:: Error executing SQL statement. rc = %i\n%s\n",rc,_data_DB->command.c_str());
690            }
691            sqlite3_finalize(pStatement);
692        token = strtok((char *)data.c_str(), "@");
693        token = strtok(NULL, "@");
694        SendMessage(shellSocketFD, token);
695        token = strtok(NULL, "@");
696        SendMessage(shellSocketFD, token);
697    }
698    return;
699   }
700
701    //If this is a service command and not a shell command...
702    //Transmission starting messages
703    SendMessage(miss[activeMission].services[sourceID].socketFD, "request_optimization_service");
704    SendMessage(miss[activeMission].services[sourceID].socketFD, miss[activeMission].services[sourceID].name.c_str());
705
706   //If the service takes a parameter, feed that parameter in
707   if(!miss[activeMission].services[sourceID].parameter.empty()){
708    //printf("sending parameter!\n");
709    SendMessage(miss[activeMission].services[sourceID].socketFD, "1");
710    SendMessage(miss[activeMission].services[sourceID].socketFD, "parameter");
711    SendMessage(miss[activeMission].services[sourceID].socketFD, miss[activeMission].services[sourceID].parameter.c_str());
712    }
713
714
715    //Load and transmit the input data
716    while(i < 10 && !miss[activeMission].services[sourceID].input[i].empty()){
717        _data_DB->command="select ";
718       _data_DB->command.append(_data_DB->tablename);
719        _data_DB->command.append(".* from ");
720        _data_DB->command.append(_data_DB->tablename);
721        _data_DB->command.append(" where Tag=='");
722    _data_DB->command.append(miss[activeMission].services[sourceID].input[i]);
723        _data_DB->command.append("';");
724        sqlite3_stmt * pStatement;
725        int rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), -1, &pStatement, NULL);
726        if (rc == SQLITE_OK){
727            if (sqlite3_step(pStatement) == SQLITE_ROW)
728             data.append((const char*) sqlite3_column_text(pStatement, 1));
729            else {
730                printf("2data_DB:: Data not yet in DB.\n");
731            rc=31337;
732        }
733        }
734    else {
735        printf("data_DB:: Error executing SQL statement. rc = %i\n%s\n",rc,_data_DB->command.c_str());
736        }
737        sqlite3_finalize(pStatement);
738
739        token = strtok((char *)data.c_str(), "@");
740    while(token){
741        SendMessage(miss[activeMission].services[sourceID].socketFD, token);
742        token = strtok(NULL, "@");
743    }
744    i++;
745    data.clear();
746    }
747    int32_t j = 0;
748   
749    //Receive and store the output data
750    while(j < 10 && !miss[activeMission].services[sourceID].output[j].empty()){
751    int rc;
752    memset(buffer, 0, 256);
753    //Read the number of datapairs for this output
754    ReadMessage(miss[activeMission].services[sourceID].socketFD, buffer);
755    data.append(buffer);
756    data.append("@");
757    int t = atoi(buffer);
758    for(int k = 0; k < t; k++){
759        //Read the datapairs incrementally and deliminate it with the "@" symbol
760        memset(buffer, 0, 256);
761        ReadMessage(miss[activeMission].services[sourceID].socketFD, buffer);
762        data.append(buffer);
763        data.append("@");
764        memset(buffer, 0, 256);
765        ReadMessage(miss[activeMission].services[sourceID].socketFD, buffer);
766        data.append(buffer);
767        data.append("@");
768    }
769
770    _data_DB->command="insert or replace into ";
771        _data_DB->command.append(_data_DB->tablename);
772          _data_DB->command.append(" (");
773        _data_DB->command.append(cols[0]);
774        _data_DB->command.append(", ");
775    _data_DB->command.append(cols[1]);
776    _data_DB->command.append(") ");
777    _data_DB->command.append(" values('");   
778    _data_DB->command.append(miss[activeMission].services[sourceID].output[j]);
779    _data_DB->command.append("', '");
780    _data_DB->command.append(data);
781    _data_DB->command.append("');");
782        char *errorMsg;
783       rc = sqlite3_exec(_data_DB->db, _data_DB->command.c_str(), callback, 0, &errorMsg);
784       if( rc!=SQLITE_OK && rc!=101 )
785           fprintf(stderr, "SQL error: %s\n", errorMsg);
786        //printf("S: done putting ouptut data into DB for ID#='%d', data=%s\n", sourceID, data.c_str());
787    j++;
788        data.clear();
789    }
790    //printf("done transact data!\n");
791   // LOG("ServiceManagementLayer:: Finished with data transaction.\n");
792
793
794    /*printf("\n\n\n");
795    // generate commandi
796    strcpy(_data_DB->command, "select ");
797    strcat(_data_DB->command, _data_DB->tablename);
798    strcat(_data_DB->command, ".* from ");
799    strcat(_data_DB->command, _data_DB->tablename);
800    strcat(_data_DB->command, ";");
801
802    // execute print (select all)  command   
803    char *errorMsg;
804    int rc = sqlite3_exec(_data_DB->db, _data_DB->command, callback, 0, &errorMsg);
805    if( rc!=SQLITE_OK && rc!=101 )
806        fprintf(stderr, "SQL error: %s\n", errorMsg);
807    printf("database %s, table %s:\n", _data_DB->filename, _data_DB->tablename);
808    printf("\n\n\n");*/
809}
810
811
812
813/* CALLED BY: MessageHandler
814 * INPUTS: <none>
815 * OUTPUTS: <none>
816 *
817 * DESCRIPTION: This function works by first sending the inputs from the shell to the appropriate components
818 * The first service should begin immeadiately, as should any others who have all of their input parameters
819 * When they complete, the output path is found and the data is transfered as it becomes available
820 * Presumably at this point the second function has all of it's parameters, so it begins to compute, and the cycle repeats
821 * 
822 *
823 * Rules for active missions (currently)
824 * -Five inputs/outputs per service and per mission
825 * -All ordering constraints have been relaxed in this version; all data is stored locally and only sent when requested
826 * -If and while support fully implemented - up to three levels (if's can be nested, but dowhiles cannot)
827 * -For dowhiles, assumes loop condition determined on last line
828 * -IMPORTANT: DB uses '@' to seperate individual statements; using '@' in the data stream will result in incorrect behavior
829 */
830
831//WHILE
832void
833ServiceManagementLayer::PerformActiveMission()
834{
835    //printf("start PAM\n");
836    uint16_t i = 0, t;
837    shellFound = false;
838    std::string data_param, data_obsv, data;
839    std::string input;
840    std::string check;
841    char buffer[256];
842    char buffer1[256];
843    std::string token, token2;
844    std::string data2;
845    int rc;
846    char *errorMsg;
847    char* cols[] = {(char *)"Tag", (char *)"Data"};
848
849    LOG("ServiceManagementLayer:: Received PerformActiveMission command.\n");
850
851    //Get the inputs
852    memset(buffer, 0, 256);
853    //Needed to read the zero passed in by GOP as the number of observables
854    ReadMessage(shellSocketFD, buffer);
855
856
857    /* Receive Set of Parameters */
858    memset(buffer, 0, 256);
859    ReadMessage(shellSocketFD, buffer);
860    t=atoi(buffer);
861    for(int m = 0; m < t; m++) {
862        memset(buffer1, 0, 256);
863        ReadMessage(shellSocketFD, buffer1);
864    _data_DB->command="insert into ";
865        _data_DB->command.append(_data_DB->tablename);
866          _data_DB->command.append(" (");
867        _data_DB->command.append(cols[0]);
868        _data_DB->command.append(", ");
869    _data_DB->command.append(cols[1]);
870    _data_DB->command.append(") ");
871        memset(buffer, 0, 256);
872        ReadMessage(shellSocketFD, buffer);
873    _data_DB->command.append(" values('");
874    _data_DB->command.append(buffer1);
875    _data_DB->command.append("', '1@");
876    _data_DB->command.append(buffer1);
877    _data_DB->command.append("@");
878    _data_DB->command.append(buffer);
879    _data_DB->command.append("');");
880       rc = sqlite3_exec(_data_DB->db, _data_DB->command.c_str(), callback, 0, &errorMsg);
881       if( rc!=SQLITE_OK && rc!=101 )
882           fprintf(stderr, "SQL error: %s\n", errorMsg);
883    }
884
885    //Useful for spotchecking what's in the database
886    /*printf("\n\n\n");
887    // generate commandi
888    strcpy(_data_DB->command, "select ");
889    strcat(_data_DB->command, _data_DB->tablename);
890    strcat(_data_DB->command, ".* from ");
891    strcat(_data_DB->command, _data_DB->tablename);
892    strcat(_data_DB->command, ";");
893
894    // execute print (select all)  command 
895    rc = sqlite3_exec(_data_DB->db, _data_DB->command, callback, 0, &errorMsg);
896    if( rc!=SQLITE_OK && rc!=101 )
897        fprintf(stderr, "SQL error: %s\n", errorMsg);
898    printf("database %s, table %s:\n", _data_DB->filename, _data_DB->tablename);
899    printf("\n\n\n");*/
900
901
902    i=0;
903    int32_t numstatements[3] = {0,0,0};
904    while(i < miss[activeMission].numServices)
905    {
906    if(miss[activeMission].services[i].name.compare("if")==0)
907    {
908       //printf("L0:if detected\n");
909        input.clear();
910        check.clear();
911        int t;
912        for(t = 0; t < 10; t++){
913        if(!miss[activeMission].services[i].output[t].empty()){
914            input=miss[activeMission].services[i-numstatements[0]-1].output[t];
915            _data_DB->command="SELECT ";
916            _data_DB->command.append(_data_DB->tablename);
917            _data_DB->command.append(".* from ");
918            _data_DB->command.append(_data_DB->tablename);
919            _data_DB->command.append(" where Tag=='");
920            _data_DB->command.append(input);
921            _data_DB->command.append("';");
922            sqlite3_stmt * pStatement;
923            rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), -1, &pStatement, NULL);
924            if (rc == SQLITE_OK){
925            if (sqlite3_step(pStatement) == SQLITE_ROW)
926                 data = (const char *) sqlite3_column_text(pStatement, 1);
927            else {
928                printf("1 data_DB:: Data not yet in DB.\n");
929                rc=31337;
930            }
931            } else {
932            printf("data_DB:: Error executing SQL statement. rc = %i\n%s\n",rc,_data_DB->command.c_str());
933            }
934                sqlite3_finalize(pStatement);
935            //printf("data=%s\n", data.c_str());
936            int pos = data.find_last_of("@", data.length()-2);
937            token = data.substr(pos+1);
938            token.erase(token.length()-1);
939            //printf("token=%s, %d\n", token.c_str(), pos);
940        data.clear();
941            break;
942        }
943        }
944        //printf("Level 0:--- %s  %s---\n", miss[activeMission].services[i].output[t].c_str(), token);
945        bool doit = false;
946        if(miss[activeMission].services[i].output[t].find(">") != string::npos){
947            std::string data2;
948            _data_DB->command="SELECT ";
949            _data_DB->command.append(_data_DB->tablename);
950            _data_DB->command.append(".* from ");
951            _data_DB->command.append(_data_DB->tablename);
952            _data_DB->command.append(" where Tag=='");
953            _data_DB->command.append(miss[activeMission].services[i].output[t].erase(0, 1));
954            _data_DB->command.append("';");
955            sqlite3_stmt * pStatement;
956            rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), -1, &pStatement, NULL);
957            if (rc == SQLITE_OK){
958            if (sqlite3_step(pStatement) == SQLITE_ROW)
959                 data2 = (const char *) sqlite3_column_text(pStatement, 1);
960            else {
961                printf("2 data_DB:: Data not yet in DB.\n");
962                rc=31337;
963            }
964            } else {
965            printf("data_DB:: Error executing SQL statement. rc = %i\n%s\n",rc,_data_DB->command.c_str());
966            }
967            sqlite3_finalize(pStatement);
968
969            int pos = data2.find_last_of("@", data2.length()-2);
970            token2 = data2.substr(pos+1);
971            token2.erase(token2.length()-1);
972            //printf("token2=%s, %d\n", token2.c_str(), pos);
973            if(atof(token.c_str()) > atof(token2.c_str()))
974            doit=true;
975            //printf("%s %s\n", buffer, token);
976        }
977        else if (miss[activeMission].services[i].output[t].find(token) != string::npos)
978                doit=true;
979        if(doit){
980        //printf("Level 0:if taken\n");
981        for(uint16_t k = i+1; k <= i+miss[activeMission].services[i].num_conds; k++){
982            if(miss[activeMission].services[k].name.compare("if")==0){
983            //printf("Level 1:if detected\n");
984                input.clear();
985                check.clear();
986                for(t = 0; t < 10; t++){
987                if(!miss[activeMission].services[k].output[t].empty()){
988                    input=miss[activeMission].services[k-numstatements[1]-1].output[t];
989                        _data_DB->command="SELECT ";
990                        _data_DB->command.append(_data_DB->tablename);
991                       _data_DB->command.append(".* from ");
992                       _data_DB->command.append(_data_DB->tablename);
993                      _data_DB->command.append(" where Tag=='");
994                     _data_DB->command.append(input);
995                    _data_DB->command.append("';");
996                    sqlite3_stmt * pStatement;
997                    rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), -1, &pStatement, NULL);
998                    if (rc == SQLITE_OK){
999                    if (sqlite3_step(pStatement) == SQLITE_ROW)
1000                         data = (const char *) sqlite3_column_text(pStatement, 1);
1001                    else {
1002                        printf("3 data_DB:: Data not yet in DB.\n");
1003                        rc=31337;
1004                    }
1005                    } else {
1006                    printf("data_DB:: Error executing SQL statement. rc = %i\n%s\n",rc,_data_DB->command.c_str());
1007                    }
1008                        sqlite3_finalize(pStatement);
1009                    //printf("Level 1:data=%s\n", data.c_str());
1010                    int pos = data.find_last_of("@", data.length()-2);
1011                    token = data.substr(pos+1);
1012                    token.erase(token.length()-1);
1013                    //printf("Level 1:token=%s\n", token);
1014                    break;
1015                }
1016                }
1017                bool doit = false;
1018                if(miss[activeMission].services[k].output[t].find(">") != string::npos){
1019                    std::string data2;
1020                        _data_DB->command="SELECT ";
1021                    _data_DB->command.append(_data_DB->tablename);
1022                    _data_DB->command.append(".* from ");
1023                    _data_DB->command.append(_data_DB->tablename);
1024                    _data_DB->command.append(" where Tag=='");
1025                    _data_DB->command.append(miss[activeMission].services[k].output[t].erase(0, 1));
1026                    _data_DB->command.append("';");
1027                    sqlite3_stmt * pStatement;
1028                    rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), -1, &pStatement, NULL);
1029                    if (rc == SQLITE_OK){
1030                    if (sqlite3_step(pStatement) == SQLITE_ROW)
1031                         data2 = (const char *) sqlite3_column_text(pStatement, 1);
1032                    else {
1033                        printf("4 data_DB:: Data not yet in DB.\n");
1034                        rc=31337;
1035                    }
1036                    } else {
1037                    printf("data_DB:: Error executing SQL statement. rc = %i\n%s\n",rc,_data_DB->command.c_str());
1038                    }
1039                        sqlite3_finalize(pStatement);
1040                    int pos = data2.find_last_of("@", data2.length()-2);
1041                    token2 = data2.substr(pos+1);
1042                    token2.erase(token2.length()-1);
1043                        //printf("token=%s token2=%s\n", token.c_str(), token2.c_str());
1044                        if(atof(token.c_str()) > atof(token2.c_str()))
1045                        doit=true;
1046                }
1047                else if (miss[activeMission].services[k].output[t].find(token) != string::npos)
1048                doit=true;
1049                if(doit){
1050                //printf("Level 1:if taken\n");
1051                for(uint16_t j = k+1; j <= k+miss[activeMission].services[k].num_conds; j++){
1052                    if(miss[activeMission].services[j].name.compare("if")==0){
1053                    //printf("Level 2:if detected\n");
1054                        input.clear();
1055                        check.clear();
1056                        for(t = 0; t < 10; t++){
1057                        if(!miss[activeMission].services[j].output[t].empty()){
1058                            input=miss[activeMission].services[j-numstatements[2]-1].output[t];
1059                                   _data_DB->command="SELECT ";
1060                                _data_DB->command.append(_data_DB->tablename);
1061                               _data_DB->command.append(".* from ");
1062                               _data_DB->command.append(_data_DB->tablename);
1063                              _data_DB->command.append(" where Tag=='");
1064                             _data_DB->command.append(input);
1065                            _data_DB->command.append("';");
1066                            sqlite3_stmt * pStatement;
1067                            rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), -1, &pStatement, NULL);
1068                            if (rc == SQLITE_OK){
1069                            if (sqlite3_step(pStatement) == SQLITE_ROW)
1070                                 data = (const char *) sqlite3_column_text(pStatement, 1);
1071                            else {
1072                                printf("5 data_DB:: Data not yet in DB.\n");
1073                                rc=31337;
1074                            }
1075                            } else {
1076                            printf("data_DB:: Error executing SQL statement. rc = %i\n%s\n",rc,_data_DB->command.c_str());
1077                            }
1078                                sqlite3_finalize(pStatement);
1079                            //printf("data=%s\n", data.c_str());
1080                            int pos = data.find_last_of("@", data.length()-2);
1081                            token = data.substr(pos+1);
1082                            token.erase(token.length()-1);
1083                            //printf("token=%s\n", token.c_str());
1084                            data.clear();
1085                            break;
1086                        }
1087                        }
1088                        bool doit = false;
1089                        if(miss[activeMission].services[j].output[t].find(">") != string::npos){
1090                            _data_DB->command="SELECT ";
1091                            _data_DB->command.append(_data_DB->tablename);
1092                            _data_DB->command.append(".* from ");
1093                           _data_DB->command.append(_data_DB->tablename);
1094                            _data_DB->command.append(" where Tag=='");
1095                            _data_DB->command.append(miss[activeMission].services[j].output[t].erase(0, 1));
1096                            _data_DB->command.append("';");
1097                            sqlite3_stmt * pStatement;
1098                            rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), -1, &pStatement, NULL);
1099                            if (rc == SQLITE_OK){
1100                            if (sqlite3_step(pStatement) == SQLITE_ROW)
1101                                 data2 = (const char *) sqlite3_column_text(pStatement, 1);
1102                            else {
1103                                printf("6 data_DB:: Data not yet in DB.\n");
1104                                rc=31337;
1105                            }
1106                            } else {
1107                            printf("data_DB:: Error executing SQL statement. rc = %i\n%s\n",rc,_data_DB->command.c_str());
1108                            }
1109                                sqlite3_finalize(pStatement);
1110
1111                        int pos = data2.find_last_of("@", data2.length()-2);
1112                        token2 = data2.substr(pos+1);
1113                        token2.erase(token2.length()-1);
1114                            if(atof(token.c_str()) > atof(token2.c_str()))
1115                            doit=true;
1116                        data.clear();
1117                        }
1118                        else if (miss[activeMission].services[j].output[t].find(token) != string::npos)
1119                        doit=true;
1120                        if(doit){
1121                        //printf("Level 1:if taken\n");
1122                        for(uint16_t l = j+1; l <= j+miss[activeMission].services[j].num_conds; l++){
1123                            TransactData(l);
1124                        }
1125                        }
1126                        else
1127                        //printf("Level 2:if not taken\n");
1128                        numstatements[2] +=miss[activeMission].services[j].num_conds+1;
1129                        j+=miss[activeMission].services[j].num_conds;
1130                        //printf("Level 2:doneif %d, %d, %d\n", numstatements, miss[activeMission].services[i].num_conds, i);
1131                    }else if(miss[activeMission].services[j].name.compare("dowhile")==0){
1132                        numstatements[0]=0;
1133                        //printf("Level 2:while detected\n");
1134                        while(true){
1135                            uint16_t l;
1136                            for(l = j+1; l <= j+miss[activeMission].services[j].num_conds; l++){
1137                            TransactData(l);
1138                            }
1139                            data.clear();
1140                            input.clear();
1141                            check.clear();
1142                            int t;
1143                            for(t = 0; t < 10; t++){
1144                            if(!miss[activeMission].services[j].output[t].empty()){
1145                                input=miss[activeMission].services[l-2].output[t];
1146                                //printf("%s\n",input.c_str());
1147                                _data_DB->command="SELECT ";
1148                                _data_DB->command.append(_data_DB->tablename);
1149                                _data_DB->command.append(".* from ");
1150                                _data_DB->command.append(_data_DB->tablename);
1151                                _data_DB->command.append(" where Tag=='");
1152                                _data_DB->command.append(input);
1153                                _data_DB->command.append("';");
1154                                sqlite3_stmt * pStatement;
1155
1156                                rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), -1, &pStatement, NULL);
1157                                if (rc == SQLITE_OK){
1158                                if (sqlite3_step(pStatement) == SQLITE_ROW)
1159                                     data = (const char *) sqlite3_column_text(pStatement, 1);
1160                                else {
1161                                    printf("7 data_DB:: Data not yet in DB.: %s\n", _data_DB->command.c_str());
1162                                    rc=31337;
1163                                }
1164                                } else {
1165                                  printf("data_DB:: SQL statement error. rc = %i\n%s\n",rc,_data_DB->command.c_str());
1166                                }
1167                                    sqlite3_finalize(pStatement);
1168                                //printf("Level 2:data=%s\n", data.c_str());
1169                                    int pos = data.find_last_of("@", data.length()-2);
1170                                    token = data.substr(pos+1);
1171                                    token.erase(token.length()-1);
1172                                //printf("Level 2:token=%s\n", token);
1173                                break;
1174                            }
1175                            }
1176                            //printf("Level 2:--- %s  %s---\n", miss[activeMission].services[j].output[t].c_str(), token);
1177                            if(miss[activeMission].services[j].output[t].find(token) != string::npos){
1178                            //printf("Level 2:while taken again!\n");
1179                            }
1180                            else {
1181                            //printf("Level 2:no more while\n");
1182                            break;}
1183                        }
1184                        j+=miss[activeMission].services[j].num_conds;
1185                    }
1186                    else{
1187                        //printf("Level 2: no conditional\n");
1188                        numstatements[2]=0;
1189                            TransactData(j);
1190                    }
1191                }
1192                }
1193                else{
1194                //printf("Level 1: if not taken\n");
1195                }
1196                    numstatements[1] +=miss[activeMission].services[k].num_conds+1;
1197                    k+=miss[activeMission].services[k].num_conds;
1198                //printf("doneif %d, %d, %d\n", numstatements, miss[activeMission].services[i].num_conds, i);
1199            }else if(miss[activeMission].services[k].name.compare("dowhile")==0){
1200                numstatements[0]=0;
1201                //printf("Level 1: while detected\n");
1202                while(true){
1203                uint16_t j;
1204                    for(j = k+1; j <= k+miss[activeMission].services[k].num_conds; j++){
1205                    TransactData(j);
1206                    }
1207                    data.clear();
1208                    input.clear();
1209                    check.clear();
1210                    int t;
1211                    for(t = 0; t < 10; t++){
1212                    if(!miss[activeMission].services[k].output[t].empty()){
1213                        input=miss[activeMission].services[j-1].output[t];
1214                        _data_DB->command="SELECT ";
1215                        _data_DB->command.append(_data_DB->tablename);
1216                        _data_DB->command.append(".* from ");
1217                        _data_DB->command.append(_data_DB->tablename);
1218                        _data_DB->command.append(" where Tag=='");
1219                        _data_DB->command.append(input);
1220                        _data_DB->command.append("';");
1221                        sqlite3_stmt * pStatement;
1222                        rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), -1, &pStatement, NULL);
1223                        if (rc == SQLITE_OK){
1224                        if (sqlite3_step(pStatement) == SQLITE_ROW)
1225                             data = (const char *) sqlite3_column_text(pStatement, 1);
1226                        else {
1227                            printf("8 data_DB:: Data not yet in DB.\n");
1228                            rc=31337;
1229                        }
1230                        } else {
1231                        printf("data_DB:: Error executing SQL statement. rc = %i\n%s\n",rc,_data_DB->command.c_str());
1232                        }
1233                            sqlite3_finalize(pStatement);
1234                        //printf("Level 1:data=%s\n", data.c_str());
1235                        int pos = data.find_last_of("@", data.length()-2);
1236                        token = data.substr(pos+1);
1237                        token.erase(token.length()-1);
1238                        //printf("Level 1:token=%s\n", token);
1239                        break;
1240                    }
1241                    }
1242                    //printf("Level 1:--- %s  %s---\n", miss[activeMission].services[k].output[t].c_str(), token);
1243                    if(miss[activeMission].services[k].output[t].find(token) != string::npos){
1244                    //printf("Level 1:while taken again!\n");
1245                    }
1246                    else {
1247                    //printf("Level 1:While finished\n");
1248                    break;}
1249                }
1250                k+=miss[activeMission].services[k].num_conds;
1251            }
1252            else{
1253                //printf("Level1:No conditional\n");
1254                numstatements[1]=0;
1255                    TransactData(k);
1256            }
1257        }
1258        }
1259            numstatements[0] +=miss[activeMission].services[i].num_conds+1;
1260        i+=miss[activeMission].services[i].num_conds;
1261    }
1262    else if(miss[activeMission].services[i].name.compare("dowhile")==0)
1263    {
1264        numstatements[0]=0;
1265        //printf("Level 0: while detected\n");
1266        while(true){
1267        uint16_t k;
1268            for(k = i+1; k <= i+miss[activeMission].services[i].num_conds; k++){
1269            TransactData(k);
1270            }
1271            data.clear();
1272            input.clear();
1273            check.clear();
1274            int t;
1275            for(t = 0; t < 10; t++){
1276            if(!miss[activeMission].services[i].output[t].empty()){
1277                input=miss[activeMission].services[k-1].output[t];
1278                _data_DB->command="SELECT ";
1279                _data_DB->command.append(_data_DB->tablename);
1280                _data_DB->command.append(".* from ");
1281                _data_DB->command.append(_data_DB->tablename);
1282                _data_DB->command.append(" where Tag=='");
1283                _data_DB->command.append(input);
1284                _data_DB->command.append("';");
1285                sqlite3_stmt * pStatement;
1286                rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), -1, &pStatement, NULL);
1287                if (rc == SQLITE_OK){
1288                if (sqlite3_step(pStatement) == SQLITE_ROW)
1289                     data = (const char *) sqlite3_column_text(pStatement, 1);
1290                else {
1291                    printf("10data_DB:: Data not yet in DB.\n");
1292                    rc=31337;
1293                }
1294                } else {
1295                printf("data_DB:: Error executing SQL statement. rc = %i\n%s\n",rc,_data_DB->command.c_str());
1296                }
1297                    sqlite3_finalize(pStatement);
1298                int pos = data.find_last_of("@", data.length()-2);
1299                token = data.substr(pos+1);
1300                token.erase(token.length()-1);
1301                break;
1302            }
1303            }
1304            //printf("Level 0:--- %s  %s---\n", miss[activeMission].services[i].output[t].c_str(), token);
1305            if(miss[activeMission].services[i].output[t].find(token) != string::npos){
1306            //printf("Level 0:while taken again!\n");
1307            }
1308            else {
1309            //printf("Level 0:no more while\n");
1310            break;}
1311        }
1312        i+=miss[activeMission].services[i].num_conds;
1313    }
1314    else{
1315        numstatements[0]=0;
1316        //printf("Level 0: No conditional\n");
1317        TransactData(i);}
1318    i++;
1319    }
1320    i=0;
1321    data.clear();
1322
1323    if(!shellFound)
1324    {
1325    int k = 0;
1326    while(k<10 && !miss[activeMission].input[k].empty()){
1327        k++;
1328    }
1329    sprintf(buffer, "%d", k);
1330    SendMessage(shellSocketFD, buffer);
1331    for(int t = 0; t < k; t++){
1332        SendMessage(shellSocketFD, miss[activeMission].input[t].c_str());
1333        SendMessage(shellSocketFD, "0");
1334    }
1335    }
1336
1337
1338    LOG("ServiceManagementLayer:: Done performing active mission.\n");
1339    /*
1340    strcpy(_data_DB->command, "select ");
1341    strcat(_data_DB->command, _data_DB->tablename);
1342    strcat(_data_DB->command, ".* from ");
1343    strcat(_data_DB->command, _data_DB->tablename);
1344    strcat(_data_DB->command, ";");
1345
1346    // execute print (select all)  command 
1347    rc = sqlite3_exec(_data_DB->db, _data_DB->command, callback, 0, &errorMsg);
1348    if( rc!=SQLITE_OK && rc!=101 )
1349        fprintf(stderr, "SQL error: %s\n", errorMsg);
1350    printf("database %s, table %s:\n", _data_DB->filename, _data_DB->tablename);
1351    printf("\n\n\n");*/
1352}
1353
1354
1355/* CALLED BY: MessageHandler
1356 * INPUTS: <none>
1357 * OUTPUTS: <none>
1358 *
1359 * DESCRIPTION: Print a list of the services currently registered and the ID's of the components that registered them
1360 */
1361void
1362ServiceManagementLayer::ListServices()
1363{
1364    _services_DB->command="select ";
1365    _services_DB->command.append(_services_DB->tablename);
1366    _services_DB->command.append(".* from ");
1367    _services_DB->command.append(_services_DB->tablename);
1368    _services_DB->command.append(";");
1369
1370    // execute print (select all)  command   
1371    char *errorMsg;
1372    int rc = sqlite3_exec(_services_DB->db, _services_DB->command.c_str(), callback, 0, &errorMsg);
1373    if( rc!=SQLITE_OK && rc!=101 )
1374        fprintf(stderr, "SQL error: %s\n", errorMsg);
1375    printf("database %s, table %s:\n", _services_DB->filename.c_str(), _services_DB->tablename.c_str());
1376}
1377
1378/* CALLED BY: Reset
1379 * INPUTS: <none>
1380 * OUTPUTS: <none>
1381 *
1382 * DESCRIPTION: Clear and reinitialize the mission array, then reload the configuration file
1383 */
1384void
1385ServiceManagementLayer::ReloadConfiguration()
1386{
1387    LOG("ServiceManagementLayer:: Reloading Configuration.\n");
1388    free(miss);
1389    miss = new Mission[10];
1390    for(int i = 0; i < 10; i++)
1391        miss[i].services = new Service[30];
1392    LoadConfiguration(_SML_Config.c_str(), miss);
1393}
1394
1395/* CALLED BY: constructor
1396 * INPUTS: |SML_Config| Address (either relitive or full) of the XML file containing mission data
1397 *        |mList| Mission array to be modified
1398 * OUTPUTS: <none>
1399 *
1400 * DESCRIPTION: IMPORTANT - See formatting instructions for correct parsing of data
1401 * Can currently handle 10 inputs and 10 outputs per service, but easily expandable
1402 * Also, can handle two layer of nested conditional statements, but could
1403 * be expanded to meet additional needs.
1404 *
1405 * Components assigned to mission during "set active mission" stage so that
1406 * components can still continue to register after the configuration is loaded
1407 */
1408void
1409ServiceManagementLayer::LoadConfiguration(const char *SML_Config, Mission* &mList)
1410{
1411    TiXmlElement *pMission;
1412    TiXmlElement *pService;
1413    TiXmlElement *pChild0, *pChild1, *pChild2, *pChild3, *pChild4;
1414    TiXmlHandle hRoot(0);
1415    printf("ServiceManagementLayer:: Loading Configuration.\n");
1416    TiXmlDocument doc(".");
1417    doc.LoadFile(SML_Config);
1418    bool loadOkay = doc.LoadFile();
1419    if(!loadOkay)
1420        printf("Loading SML configuration failed: %s\n", SML_Config);
1421
1422    TiXmlHandle hDoc(&doc);
1423   
1424    pMission = hDoc.FirstChildElement().Element();
1425
1426    if(!pMission)
1427        printf("No valid root!");
1428
1429    hRoot = TiXmlHandle(pMission);
1430    pService = pMission->FirstChildElement();
1431    int32_t mission_num = 0;
1432    //Iterate through the missions
1433    for(pChild0 = pMission->FirstChildElement(); pChild0 ; \
1434        pChild0 = pChild0->NextSiblingElement())
1435    {
1436    int32_t service_num = 0;
1437    uint16_t cond_array[] = {0, 0, 0};
1438   
1439        for(pChild1  = (pChild0->FirstChildElement())->FirstChildElement(); pChild1; \
1440              pChild1  = pChild1->NextSiblingElement())
1441        {
1442        int32_t conditional_0 = service_num;
1443        for(pChild2 = pChild1->FirstChildElement(); \
1444        pChild2; pChild2 = pChild2->NextSiblingElement())
1445        {
1446        service_num++;
1447        int32_t conditional_1 = service_num;
1448        for(pChild3 = pChild2->FirstChildElement(); \
1449            pChild3; pChild3 = pChild3->NextSiblingElement())
1450        {
1451            service_num++;
1452            int32_t conditional_2 = service_num;
1453            for(pChild4 = pChild3->FirstChildElement(); \
1454                pChild4; pChild4 = pChild4->NextSiblingElement())
1455            {
1456                   service_num++;
1457                if(pChild4->Attribute("name"))
1458                    mList[mission_num].services[service_num].name = pChild4->Attribute("name");
1459                else
1460                    mList[mission_num].services[service_num].name = pChild4->Value();
1461                for(int i = 1; i <= 10; i++) {
1462                    char buffer[9]="input";
1463                    sprintf(buffer, "%s%d", buffer, i);
1464                    //printf("buffer=%s\n", buffer);
1465                    if(pChild4->Attribute(buffer))
1466                        mList[mission_num].services[service_num].input[i-1] = pChild4->Attribute(buffer);
1467                    char buffer2[9]="output";
1468                    sprintf(buffer2, "%s%d", buffer2, i);
1469                    if(pChild4->Attribute(buffer2))
1470                    mList[mission_num].services[service_num].output[i-1] = pChild4->Attribute(buffer2);
1471                }
1472                if(pChild4->Attribute("parameter"))
1473                mList[mission_num].services[service_num].parameter = pChild4->Attribute("parameter");
1474                cond_array[2]++;
1475            }
1476            if(!strcmp(pChild3->Value(), "shell") || conditional_2 != service_num) {
1477                mList[mission_num].services[conditional_2].name = pChild3->Value();
1478            }
1479            else{
1480                mList[mission_num].services[service_num].name = pChild3->Attribute("name");
1481            }
1482            for(int i = 1; i <= 10; i++) {
1483                char buffer[9]="input";
1484                sprintf(buffer, "%s%d", buffer, i);
1485                if(pChild3->Attribute(buffer))
1486                mList[mission_num].services[conditional_2].input[i-1] = pChild3->Attribute(buffer);
1487                char buffer2[9]="output";
1488                sprintf(buffer2, "%s%d", buffer2, i);
1489                if(pChild3->Attribute(buffer2))
1490                mList[mission_num].services[conditional_2].output[i-1] = pChild3->Attribute(buffer2);
1491            }
1492                if(pChild3->Attribute("parameter"))
1493                mList[mission_num].services[conditional_2].parameter = pChild3->Attribute("parameter");
1494            mList[mission_num].services[conditional_2].num_conds = cond_array[2];
1495            cond_array[1]+=cond_array[2]+1;
1496            cond_array[2] = 0;
1497
1498
1499        }
1500        if(!strcmp(pChild2->Value(), "shell") || conditional_1 != service_num) {
1501            mList[mission_num].services[conditional_1].name = pChild2->Value();
1502        }
1503        else{
1504            mList[mission_num].services[service_num].name = pChild2->Attribute("name");
1505        }
1506           for(int i = 1; i <= 10; i++) {
1507            char buffer[9]="input";
1508            sprintf(buffer, "%s%d", buffer, i);
1509            if(pChild2->Attribute(buffer))
1510                mList[mission_num].services[conditional_1].input[i-1] = pChild2->Attribute(buffer);
1511            char buffer2[9]="output";
1512            sprintf(buffer2, "%s%d", buffer2, i);
1513            if(pChild2->Attribute(buffer2))
1514            mList[mission_num].services[conditional_1].output[i-1] = pChild2->Attribute(buffer2);
1515        }
1516            if(pChild2->Attribute("parameter"))
1517            mList[mission_num].services[conditional_1].parameter = pChild2->Attribute("parameter");
1518
1519        mList[mission_num].services[conditional_1].num_conds = cond_array[1];
1520        cond_array[0]+=cond_array[1]+1;
1521        cond_array[1] = 0;
1522        }
1523       
1524        if(!strcmp(pChild1->Value(), "shell") || conditional_0 != service_num) {
1525            mList[mission_num].services[conditional_0].name = pChild1->Value();
1526        }
1527        else{
1528        mList[mission_num].services[conditional_0].name = pChild1->Attribute("name");
1529        }
1530        for(int i = 1; i <= 10; i++) {
1531            char buffer[9]="input";
1532            sprintf(buffer, "%s%d", buffer, i);
1533            if(pChild1->Attribute(buffer))
1534                mList[mission_num].services[conditional_0].input[i-1] = pChild1->Attribute(buffer);
1535            char buffer2[9]="output";
1536            sprintf(buffer2, "%s%d", buffer2, i);
1537            if(pChild1->Attribute(buffer2))
1538            mList[mission_num].services[conditional_0].output[i-1] = pChild1->Attribute(buffer2);
1539        }
1540        if(pChild1->Attribute("parameter"))
1541            mList[mission_num].services[conditional_0].parameter = pChild1->Attribute("parameter");
1542        mList[mission_num].services[conditional_0].num_conds = cond_array[0];
1543            cond_array[0] = 0;
1544        service_num++;
1545    }
1546   
1547    mList[mission_num].numServices = service_num;
1548    mList[mission_num].name = pChild0->Attribute("name");
1549        mList[mission_num].missionID = atoi(pChild0->Attribute("id"));
1550    for(int i = 1; i <= 10; i++) {
1551        char buffer[9]="param";
1552        sprintf(buffer, "%s%d", buffer, i);
1553        if(pChild0->Attribute(buffer)){
1554            mList[mission_num].input[i-1] = pChild0->Attribute(buffer);
1555        }
1556    }
1557    mission_num++;
1558    }
1559    LOG("ServiceManagementLayer:: Done Loading Configuration\n");
1560}
1561
1562/* CALLED BY: MessageHandler
1563 * INPUTS: |ID| The ID number of the engine to be registered
1564 * OUTPUTS: <none>
1565 *
1566 * DESCRIPTION: Sends a registration message onto the shell and sends the ACK back to the component
1567 */
1568void
1569ServiceManagementLayer::RegisterCognitiveEngine(int32_t ID)
1570{
1571    SendMessage(shellSocketFD, "register_engine_cognitive");
1572
1573    LOG("ServiceManagementLayer:: CE registration message forwarded to shell.\n");
1574    char buffer[256];
1575    memset(buffer, 0, 256);
1576    ReadMessage(shellSocketFD, buffer);
1577    SendMessage(CE_List[ID].FD, buffer);
1578
1579    TransferRadioConfiguration(ID);
1580    memset(buffer, 0, 256);
1581    TransferExperience(ID);
1582    memset(buffer, 0, 256);
1583    numberOfCognitiveEngines++;
1584    CE_Present = true;
1585}
1586
1587/* CALLED BY: MessageHandler
1588 * INPUTS: |ID| The ID number of the engine to have it's services deregistered
1589 * OUTPUTS: <none>
1590 *
1591 * DESCRIPTION: Deletes individual services from the DB
1592 * NOTE THAT this function only needs to be called if service deregistration is going
1593 * to be done at a different time than component deregistration; it is handled
1594 * more efficiently and directly during that deregistration process.
1595 */
1596void
1597ServiceManagementLayer::DeregisterServices(int32_t ID)
1598{
1599    char buffer[256];
1600    memset(buffer, 0, 256);
1601    ReadMessage(CE_List[ID].FD, buffer);
1602    _services_DB->command="DELETE FROM ";
1603    _services_DB->command.append(_services_DB->tablename);
1604    _services_DB->command.append(" WHERE ID_Num IN (SELECT ");
1605    char tmp[3];
1606    memset(tmp,0,3);
1607    sprintf(tmp, "%d", ID);
1608    _services_DB->command.append(tmp);
1609    _services_DB->command.append(" FROM ");
1610    _services_DB->command.append(_services_DB->tablename);
1611    _services_DB->command.append(" WHERE Service_Name");
1612    _services_DB->command.append("=='");
1613    _services_DB->command.append(buffer);
1614    _services_DB->command.append("');");
1615    char *errorMsg;
1616    int rc = sqlite3_exec(_services_DB->db, _services_DB->command.c_str(), callback, 0, &errorMsg);
1617    if( rc!=SQLITE_OK && rc!=101 )
1618        fprintf(stderr, "SQL error: %s\n", errorMsg);
1619}
1620
1621/* CALLED BY: MessageHandler
1622 * INPUTS: |ID| The ID number of the engine to have it's services deregistered
1623 * OUTPUTS: <none>
1624 *
1625 * DESCRIPTION: Deletes the contact info for the cognitive engine, forwards a deregistration message to the shell
1626 * Also, deletes the services from the DB
1627 */
1628void
1629ServiceManagementLayer::DeregisterCognitiveEngine(int32_t ID)
1630{
1631    LOG("ServiceManagementLayer:: CE deregistration message forwarded to shell.\n");
1632
1633    numberOfCognitiveEngines--;
1634    if(numberOfCognitiveEngines == 0)
1635        CE_Present = false;
1636
1637    SendMessage(shellSocketFD, "deregister_engine_cognitive");
1638    char buffer[256];
1639    memset(buffer, 0, 256);
1640    ReadMessage(shellSocketFD, buffer);
1641    SendMessage(CE_List[ID].FD, buffer);
1642    if(strcmp("deregister_ack", buffer) != 0) {
1643    ERROR(1, "SML:: Failed to close CE socket\n");
1644    }
1645
1646    //Deregister the services
1647    _services_DB->command="DELETE FROM ";
1648    _services_DB->command.append(_services_DB->tablename);
1649    _services_DB->command.append(" WHERE ");
1650    _services_DB->command.append("ID_Num");
1651    _services_DB->command.append("==");
1652    char tmp[3];
1653    memset(tmp,0,3);
1654    sprintf(tmp, "%d", ID);
1655    _services_DB->command.append(tmp);
1656    _services_DB->command.append(";");
1657    char *errorMsg;
1658    int rc = sqlite3_exec(_services_DB->db, _services_DB->command.c_str(), callback, 0, &errorMsg);
1659    if( rc!=SQLITE_OK && rc!=101 )
1660        fprintf(stderr, "SQL error: %s\n", errorMsg);
1661
1662
1663    CE_List[ID].FD = -1;
1664    CE_List[ID].ID_num = -1;
1665
1666    LOG("Cognitive Radio Shell:: CE Socket closed for engine #%d.\n", ID);
1667}
1668
1669
1670/* CALLED BY: test class
1671 * INPUTS: <none>
1672 * OUTPUTS: <none>
1673 *
1674 * DESCRIPTION: Sets up a server socket and listens for communication on either that or the shell socket
1675 */
1676void
1677ServiceManagementLayer::StartSMLServer()
1678{
1679    //printf("Ready for CE Signal! (registration done)\n");
1680    struct timeval selTimeout;
1681    int32_t running = 1;
1682    int32_t port, rc, new_sd = 1;
1683    int32_t desc_ready = 1;
1684        //If there is, call the MessageHandler with the Shell_Msg code of -1
1685    fd_set sockSet, shellSet;
1686
1687    cogEngSrv = CreateTCPServerSocket(SMLport);
1688    int32_t maxDescriptor = cogEngSrv;
1689
1690    if(InitializeTCPServerPort(cogEngSrv) == -1)
1691        ERROR(1,"Error initializing primary port\n");
1692
1693   
1694    while (running) {
1695        /* Zero socket descriptor vector and set for server sockets */
1696        /* This must be reset every time select() is called */
1697        FD_ZERO(&sockSet);
1698        FD_SET(cogEngSrv, &sockSet);
1699       
1700        for(uint16_t k = 0; k < Current_ID; k++){
1701        if(CE_List[k].ID_num != -1)
1702            FD_SET(CE_List[k].FD, &sockSet);
1703    }
1704        //printf("k=%d, CID=%d\n", k, CE_List[k].FD);
1705
1706        /* Timeout specification */
1707        /* This must be reset every time select() is called */
1708        selTimeout.tv_sec = 0;       /* timeout (secs.) */
1709        selTimeout.tv_usec = 0;            /* 0 microseconds */
1710    //Changed both to zero so that select will check messages from the shell instead of blocking
1711    //when there is no command from the CE's to be processed
1712
1713        //Check if there is a message on the socket waiting to be read
1714    rc = select(maxDescriptor + 1, &sockSet, NULL, NULL, &selTimeout);
1715        //printf("rc=%d\n", rc);
1716        if(rc == 0){
1717            //LOG("No echo requests for %i secs...Server still alive\n", 10);
1718   
1719            FD_ZERO(&shellSet);
1720            FD_SET(shellSocketFD, &shellSet);
1721            selTimeout.tv_sec = 0;
1722            selTimeout.tv_usec = 0;
1723            //Check if there is a message on the shell socket ready to be processed
1724            select(shellSocketFD + 1, &shellSet, NULL, NULL, &selTimeout);
1725            //printf("rc2=%d\n", rc2);
1726        //If there is, call the MessageHandler with the Shell_Msg code of -1
1727        if(FD_ISSET(shellSocketFD, &shellSet)){
1728        //printf("shell_msg, %d\n", rc2);
1729            MessageHandler(-1);}
1730    }
1731        else {
1732            desc_ready = rc;
1733            for(port = 0; port <= maxDescriptor && desc_ready > 0; port++) {
1734                if(FD_ISSET(port, &sockSet)) {
1735                    desc_ready -= 1;
1736
1737                    //Check if request is new or on an existing open descriptor
1738                    if(port == cogEngSrv) {
1739            //If new, assign it a descriptor and give it an ID
1740                        new_sd = AcceptTCPConnection(port);
1741             
1742                        if(new_sd < 0)
1743                            break;
1744
1745                        CE_List[Current_ID].FD = new_sd;
1746            CE_List[Current_ID].ID_num = Current_ID;
1747                        MessageHandler(Current_ID);
1748            Current_ID++;
1749   
1750            FD_SET(new_sd,&sockSet);
1751                        if(new_sd > maxDescriptor)
1752                           maxDescriptor = new_sd;
1753                    }
1754                    else {
1755            //If old, figure out which ID it coresponds to and handle it accordingly
1756            for(uint16_t z = 0; z < Current_ID; z++)
1757            {
1758                if(CE_List[z].FD == port){
1759                    MessageHandler(z);}
1760            }
1761                    }
1762                }
1763            }
1764    }
1765    }       
1766
1767    /* Close sockets */
1768    close(cogEngSrv);
1769
1770    //delete &cogEngSrv;
1771    return;
1772}
Note: See TracBrowser for help on using the browser.