root/vtcross/trunk/src/service_management_layer/ServiceManagementLayer.cpp @ 444

Revision 444, 68.7 KB (checked in by bhilburn, 15 years ago)

40% done reworking the SML source.

Line 
1/*
2 Copyright 2009 Virginia Polytechnic Institute and State University 
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7 
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17/* Inter-component communication handled by sockets and FD's. 
18 * Server support has been completely implemented and tested.
19 *
20 * Services are stored in a SQLite DB by the ID of the CE that registered them.  Service
21 * support has been completely implemented and tested.
22 *
23 * Missions are loaded from an XML file, connected with services provided by components,
24 * and run.  See the documentation for the "PerformActiveMission" below for important
25 * info.
26 */
27
28
29#include <cmath>
30#include <cstdio>
31#include <cstdlib>
32#include <cstring>
33#include <stdint.h>
34
35#include <arpa/inet.h>
36#include <iostream>
37#include <netinet/in.h>
38#include <netdb.h>
39#include <fcntl.h>
40#include <sqlite3.h>
41#include <string>
42#include <sys/ioctl.h>
43#include <sys/mman.h>
44#include <sys/socket.h>
45#include <sys/types.h>
46#include <sys/wait.h>
47
48#include "tinyxml/tinyxml.h"
49#include "tinyxml/tinystr.h"
50
51#include "vtcross/debug.h"
52#include "vtcross/error.h"
53#include "vtcross/common.h"
54#include "vtcross/components.h"
55#include "vtcross/containers.h"
56#include "vtcross/socketcomm.h"
57
58
59typedef struct services_s *services_DB;
60typedef struct data_s *data_DB;
61
62using namespace std;
63
64struct services_s {
65    string filename;
66    string tablename;
67    string command;
68    sqlite3 *db;
69    unsigned int num_columns;
70};
71
72struct data_s {
73    string filename;
74    string tablename;
75    string command;
76    sqlite3 *db;
77    unsigned int num_columns;
78};
79
80services_DB _services_DB;
81data_DB _data_DB;
82string _SML_Config;
83bool shellFound;
84
85/* Callback function used internally by some of the SQLite3 commands */
86int32_t
87callback(void *notUsed, int32_t argc, char **argv, char **azColName)
88{
89    for(size_t i = 0; i < argc; i++) {
90        printf("%s = %s\n", azColName[i], argv[i] ? argv[i] : "NULL");
91    }
92
93    printf("\n");
94    return 0;
95}
96
97ServiceManagementLayer::ServiceManagementLayer()
98{
99    LOG("Creating Service Management Layer.\n");
100
101    shellSocketFD = -1;
102    numberOfCognitiveEngines = 0;
103    CE_Present = false;
104    cogEngSrv = 1;
105}
106
107/* Free and clear the DB's associated with this SML in the destructor.
108 *
109 * Note that exiting with an error condition will cause SML to not be destructed,
110 * resulting in the DB's staying in memory until the destructor is encountered in
111 * future executions. */
112ServiceManagementLayer::~ServiceManagementLayer()
113{
114    char *errorMsg;
115    int32_t rc;         /* sqlite command return code */
116
117    _services_DB->command = "drop table ";
118    _services_DB->command.append(_services_DB->tablename);
119    rc = sqlite3_exec(_services_DB->db, _services_DB->command.c_str(), callback, 0, &errorMsg);
120    if((rc != SQLITE_OK) && (rc != 101))
121        WARNING("ServiceManagementLayer::Destructor services 'drop table' error: %s\n", errorMsg);
122
123    _services_DB->command = "vacuum";
124    rc = sqlite3_exec(_services_DB->db, _services_DB->command.c_str(), callback, 0, &errorMsg);
125    if((rc != SQLITE_OK) && (rc != 101))
126        WARNING("ServiceManagementLayer::Destructor services 'vacuum' error: %s\n", errorMsg);
127
128    free(_services_DB);
129
130    _data_DB->command = "drop table ";
131    _data_DB->command.append(_data_DB->tablename);
132    rc = sqlite3_exec(_data_DB->db, _data_DB->command.c_str(), callback, 0, &errorMsg);
133    if((rc != SQLITE_OK) && (rc != 101))
134        WARNING("ServiceManagementLayer::Destructor data 'drop table' error: %s\n", errorMsg);
135
136    _data_DB->command = "vacuum";
137    rc = sqlite3_exec(_data_DB->db, _data_DB->command.c_str(), callback, 0, &errorMsg);
138    if((rc != SQLITE_OK) && (rc != 101))
139        WARNING("ServiceManagementLayer::Destructor data 'vacuum' error: %s\n", errorMsg);
140
141    free(_data_DB);
142}
143
144/* Note that sizes of CE_List, miss, and service are hardcoded for now.
145 * Also, their sizes are hardcoded into the code in various places; a fix
146 * for a future version. */
147ServiceManagementLayer::ServiceManagementLayer(const char* SML_Config, \
148    const char* serverName, const char* serverPort, int16_t clientPort)
149{
150    LOG("Creating Service Management Layer.\n");
151
152    _SML_Config = string(SML_Config);
153    SMLport = clientPort;
154
155    ConnectToShell(serverName, serverPort);
156    CE_List = new CE_Reg[10];
157
158    miss = new Mission[10];
159    for(size_t i = 0; i < 10; i++) {
160        miss[i].services = new Service[30];
161    }
162
163    Current_ID = 0;
164
165    LoadConfiguration(SML_Config, miss);
166
167    CreateServicesDB();
168    CreateDataDB();
169}
170
171/* CALLED BY: constructor
172 * INPUTS: <none>
173 * OUTPUTS: <none>
174 *
175 * DESCRIPTION: Create and initialize a DB to hold the services registered by components
176 */
177void
178ServiceManagementLayer::CreateServicesDB()
179{
180    sqlite3_stmt *ppStmt;   /* OUT: Statement handle */
181    const char *pzTail;     /* OUT: Pointer to unused portion of zSql */
182    int32_t rc;             /* sqlite command return code */
183
184    _services_DB = new services_s;
185    _services_DB->filename="Services_Table";
186    sqlite3_open(_services_DB->filename.c_str(), &(_services_DB->db));
187
188    char *cols[] = {(char *)"ID_Num", (char *)"Service_Name"};
189
190    _services_DB->tablename="Services";
191
192    /* If program execution ends in anything other than a ordered shutdown, DB's will still
193     * be there for next run. Need to get rid of it so that old data isn't inadvertantly
194     * used in the next execution cycle. */
195    _services_DB->command = "DROP TABLE IF EXISTS Services;";     
196
197    rc = sqlite3_prepare_v2(_services_DB->db, _services_DB->command.c_str(), 128, &ppStmt, &pzTail);
198    if((rc != SQLITE_OK) && (rc != 101))
199        WARNING("ServiceManagementLayer::CreateServicesDB 'prepare_stmt' error %d\n", rc);
200
201    rc = sqlite3_step(ppStmt);
202    if((rc != SQLITE_OK) && (rc != 101))
203        WARNING("ServiceManagementLayer::CreateServicesDB 'step' error\n");
204
205    _services_DB->num_columns = 2;
206
207    /* Generate command */
208    _services_DB->command="CREATE TABLE ";
209    _services_DB->command.append(_services_DB->tablename);
210    _services_DB->command.append("(");
211    _services_DB->command.append(cols[0]);
212    _services_DB->command.append(" INT, ");
213    _services_DB->command.append(cols[1]);
214    _services_DB->command.append(" TEXT);");
215
216    /* Execute create table command */
217    rc = sqlite3_prepare_v2(_services_DB->db, _services_DB->command.c_str(), 128, &ppStmt, &pzTail);
218    if((rc != SQLITE_OK) && (rc != 101))
219        WARNING("ServiceManagementLayer::CreateServicesDB 'prepare_stmt' error %d\n", rc);
220
221    rc = sqlite3_step(ppStmt);
222    if((rc != SQLITE_OK) && (rc != 101))
223        WARNING("ServiceManagementLayer::CreateServicesDB 'step' error\n");
224}
225
226/* CALLED BY: constructor
227 * INPUTS: <none>
228 * OUTPUTS: <none>
229 *
230 * DESCRIPTION: Create and initialize a DB to hold the data sent by components
231 */
232void
233ServiceManagementLayer::CreateDataDB()
234{
235    sqlite3_stmt *ppStmt;   /* OUT: Statement handle */
236    const char *pzTail;     /* OUT: Pointer to unused portion of zSql */
237    int32_t rc;             /* sqlite command return code */
238
239    _data_DB = new data_s;
240
241    _data_DB->filename="Data_Table";
242    sqlite3_open(_data_DB->filename.c_str(), &(_data_DB->db));
243
244    char *cols[] = {(char *)"Tag", (char *)"Data"};
245
246    _data_DB->tablename = "Data";
247    _data_DB->command = "DROP TABLE IF EXISTS Data;";     
248
249    rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), 128, &ppStmt, &pzTail);
250    if((rc != SQLITE_OK) && (rc != 101))
251        WARNING("ServiceManagementLayer::CreateDataDB 'prepare_stmt' error %d\n", rc);
252
253    rc = sqlite3_step(ppStmt);
254    if((rc != SQLITE_OK) && (rc != 101))
255        WARNING("ServiceManagementLayer::CreateDataDB 'step' error\n");
256
257    _data_DB->num_columns = 2;
258
259    /* Generate command */
260    _data_DB->command = "CREATE TABLE ";
261    _data_DB->command.append(_data_DB->tablename);
262    _data_DB->command.append("(");
263    _data_DB->command.append(cols[0]);
264
265    /* First column is the name of the data (corresponding to the name of the output/input pair)
266     * It is the primary key so any subsequent data with the same name will replace the row. */
267    _data_DB->command.append(" TEXT PRIMARY KEY ON CONFLICT REPLACE, ");
268    _data_DB->command.append(cols[1]);
269    _data_DB->command.append(" TEXT);");
270
271    rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), 128, &ppStmt, &pzTail);
272    if((rc != SQLITE_OK) && (rc != 101))
273        WARNING("ServiceManagementLayer::CreateDataDB 'prepare_stmt' error %d\n", rc);
274
275    rc = sqlite3_step(ppStmt);
276    if((rc != SQLITE_OK) && (rc != 101))
277        WARNING("ServiceManagementLayer::CreateDataDB 'step' error\n");
278}
279
280/* CALLED BY: MessageHandler
281 * INPUTS: <none>
282 * OUTPUTS: <none>
283 *
284 * DESCRIPTION: Sends a message identifying this component as an SML to the Shell
285 */
286void
287ServiceManagementLayer::SendComponentType()
288{
289    SendMessage(shellSocketFD, "response_sml");
290    LOG("SML responded to GetRemoteComponentType query.\n");
291}
292
293/* CALLED BY: constructor
294 * INPUTS: |serverName| the IPv4 name of the server (127.0.0.1 for localhost)
295 *         |serverPort| the port on the server to connect to
296 * OUTPUTS: <none>
297 *
298 * DESCRIPTION: Connecting to the shell takes 2 steps
299 * 1) Establish a client socket for communication
300 * 2) Run the initial Registration/handshake routine
301 */
302void
303ServiceManagementLayer::ConnectToShell(const char* serverName, \
304        const char* serverPort)
305{
306    shellSocketFD = ClientSocket(serverName, serverPort);
307    RegisterComponent();
308}
309
310/* CALLED BY: StartSMLServer
311 * INPUTS: |ID| The ID number of the CE that has a message wating
312 * OUTPUTS: <none>
313 *
314 * DESCRIPTION: Called whenever a socket is identified as being ready for communication
315 *              This funciton reads the message and calls the appropriate helper
316 */
317void
318ServiceManagementLayer::MessageHandler(int32_t ID)
319{
320    char buffer[256];   
321    memset(buffer, 0, 256); 
322    int32_t _FD; 
323   
324    if(ID != -1)
325        _FD = CE_List[ID].FD;
326    else
327        _FD = shellSocketFD;
328
329    ReadMessage(_FD, buffer);
330   
331    //--------Policy Engine Stuff - no policy engine support in this version-------//
332
333    //printf("********* %s **********\n", buffer);
334    // TODO
335    // If we send integer op codes rather than strings, this process will be
336    // MUCH faster since instead of donig string compares we can simply
337    // switch on the integer value...
338    /*if(strcmp(buffer, "register_service") == 0) {
339        if(strcmp(buffer, "policy_geo") == 0) {
340        }
341        else if(strcmp(buffer, "policy_time") == 0) {
342        }
343        else if(strcmp(buffer, "policy_spectrum") == 0) {
344        }
345        else if(strcmp(buffer, "policy_spacial") == 0) {
346        }
347    }
348    else if(strcmp(buffer, "deregister_service") == 0) {
349        if(strcmp(buffer, "policy_geo") == 0) {
350        }
351        else if(strcmp(buffer, "policy_time") == 0) {
352        }
353        else if(strcmp(buffer, "policy_spectrum") == 0) {
354        }
355        else if(strcmp(buffer, "policy_spacial") == 0) {
356        }
357    }*/
358
359    //Go down the list to call the appropriate function
360    if(strcmp(buffer, "query_component_type") == 0) {
361        SendComponentType();
362    }
363    else if(strcmp(buffer, "reset_sml") == 0) {
364        Reset();
365    }
366    else if(strcmp(buffer, "shutdown_sml") == 0) {
367        Shutdown();
368    }
369    else if(strcmp(buffer, "register_engine_cognitive") == 0) {
370        RegisterCognitiveEngine(ID);
371    }
372    else if(strcmp(buffer, "register_service") == 0) {
373        ReceiveServices(ID);
374    }
375    else if(strcmp(buffer, "send_component_type") == 0) {
376        SendComponentType();
377    }
378    else if(strcmp(buffer, "list_services") == 0) {
379        ListServices();
380    }
381    else if(strcmp(buffer, "set_active_mission") == 0) {
382        SetActiveMission();
383    }
384    else if(strcmp(buffer, "request_optimization") == 0) {
385        PerformActiveMission();
386    }
387    else if(strcmp(buffer, "deregister_engine_cognitive") == 0) {
388        DeregisterCognitiveEngine(ID);
389    }
390    else if(strcmp(buffer, "deregister_service") == 0) {
391        DeregisterServices(ID);
392    }
393}
394
395//TODO Finish
396/* CALLED BY: MessageHandler
397 * INPUTS: <none>
398 * OUTPUTS: <none>
399 *
400 * DESCRIPTION: Deregisters the component from the Shell.
401 */
402void
403ServiceManagementLayer::Shutdown()
404{
405    DeregisterComponent();
406}
407
408//TODO Finish
409/* CALLED BY: MessageHandler
410 * INPUTS: <none>
411 * OUTPUTS: <none>
412 *
413 * DESCRIPTION: Deregisters the component from the Shell
414 */
415void
416ServiceManagementLayer::Reset()
417{
418    DeregisterComponent();
419    ReloadConfiguration();
420}
421
422/* CALLED BY: ConnectToShell
423 * INPUTS: <none>
424 * OUTPUTS: <none>
425 *
426 * DESCRIPTION: Sends the registration message to the Shell
427 */
428void
429ServiceManagementLayer::RegisterComponent()
430{
431    SendMessage(shellSocketFD, "register_sml");
432    LOG("ServiceManagementLayer:: Registration message sent.\n");
433}
434
435/* CALLED BY: Shutdown
436 * INPUTS: <none>
437 * OUTPUTS: <none>
438 *
439 * DESCRIPTION: Closes the client socket with the shell, sends a deregstration message
440 */
441void
442ServiceManagementLayer::DeregisterComponent()
443{
444    SendMessage(shellSocketFD, "deregister_sml");
445    LOG("ServiceManagementLayer:: Deregistration message sent.\n");
446
447    shutdown(shellSocketFD, 2);
448    close(shellSocketFD);
449    shellSocketFD = -1;
450    LOG("ServiceManagementLayer:: Shell socket closed.\n");
451}
452
453
454/* CALLED BY: RegisterCognitiveEngine
455 * INPUTS: |ID| The ID number of the component where the data is to be transfered to
456 * OUTPUTS: <none>
457 *
458 * DESCRIPTION: Streams config data directly from the shell to the CE, and checks
459 * for an "ack" message from the CE after every sent message
460 * to know when to stop communication.
461 *
462 * NOTE: Modified to check the incoming message buffer rather than the outgoing
463 * message buffer to avoid a portion of the delay. May change this again to handle
464 * data more inteligently, taking advantage of it's properties.
465 */
466void
467ServiceManagementLayer::TransferRadioConfiguration(int32_t ID)
468{
469    struct timeval selTimeout;
470    fd_set sockSet;
471    int32_t rc = 1;
472    char buffer[256];
473
474    /* Send data until the CE sends an ACK message back */
475    while(rc != 0) {
476        memset(buffer, 0, 256);
477
478        /* Receive data from Shell */
479        ReadMessage(shellSocketFD, buffer);
480
481        /* Send data to CE */
482        SendMessage(CE_List[ID].FD, buffer);
483        FD_ZERO(&sockSet);
484        FD_SET(shellSocketFD, &sockSet);
485        selTimeout.tv_sec = 0;
486        selTimeout.tv_usec = 5000;
487
488        /* Check if there is a message on the shell ready to be processed */
489        rc = select(shellSocketFD + 1, &sockSet, NULL, NULL, &selTimeout);
490    }
491
492    memset(buffer, 0, 256);
493    ReadMessage(CE_List[ID].FD, buffer);
494    SendMessage(shellSocketFD, buffer);
495}
496
497
498/* CALLED BY: RegisterCognitiveEngine
499 * INPUTS: |ID| The ID number of the component where the data is to be transfered to
500 * OUTPUTS: <none>
501 *
502 * DESCRIPTION: Simmilar to TransferRadioConfig, just with Experience data
503 *
504 * NOTE: Modified to check the incoming message buffer rather than the outgoing
505 * message buffer to avoid a portion of the delay. May change this again to handle
506 * data more inteligently, taking advantage of it's properties.
507 */
508void
509ServiceManagementLayer::TransferExperience(int32_t ID)
510{
511    struct timeval selTimeout;
512    fd_set sockSet;
513    int32_t rc = 1;
514    char buffer[256];
515    /* Send data until the CE sends an ACK message back */
516    while(rc != 0) {
517        memset(buffer, 0, 256);
518
519        /* Receive data from Shell */
520        ReadMessage(shellSocketFD, buffer);
521
522        /* Send data to CE */
523        SendMessage(CE_List[ID].FD, buffer);
524        FD_ZERO(&sockSet);
525        FD_SET(shellSocketFD, &sockSet);
526        selTimeout.tv_sec = 0;
527        selTimeout.tv_usec = 5000;
528
529        /* Check if there is a message on the shell ready to be processed */
530        rc = select(shellSocketFD + 1, &sockSet, NULL, NULL, &selTimeout);
531    }
532
533    memset(buffer, 0, 256);
534    ReadMessage(CE_List[ID].FD, buffer);
535    SendMessage(shellSocketFD, buffer);
536}
537
538/* CALLED BY: MessageHandler
539 * INPUTS: |ID| The ID number of the component where service is located
540 * OUTPUTS: <none>
541 *
542 * DESCRIPTION: Inserts a service into the DB with the ID of the component where it exists
543 */
544void
545ServiceManagementLayer::ReceiveServices(int32_t ID)
546{
547    char buffer[256];
548    memset(buffer, 0, 256);
549    ReadMessage(CE_List[ID].FD, buffer);
550
551    char *cols[] = {(char *) "ID_Num", (char *) "Service_Name"};
552
553    /* Generate command */
554    _services_DB->command = "insert into ";
555    _services_DB->command.append(_services_DB->tablename);
556    _services_DB->command.append(" (");
557    _services_DB->command.append(cols[0]);
558    _services_DB->command.append(", ");
559    _services_DB->command.append(cols[1]);
560    _services_DB->command.append(") ");
561    _services_DB->command.append(" values(");
562
563    char temp[3];
564    memset(temp, 0, 3);
565    sprintf(temp, "%d", ID);
566
567    _services_DB->command.append(temp);
568    _services_DB->command.append(", '");
569    _services_DB->command.append(buffer);
570    _services_DB->command.append("');");
571   
572    /* Execute add command */
573    char *errorMsg;
574    int rc = sqlite3_exec(_services_DB->db, _services_DB->command.c_str(), callback, 0, &errorMsg);
575    if((rc != SQLITE_OK) && (rc != 101))
576        WARNING("ServiceManagementLayer::RecieveServices DB Error %s\n", errorMsg);
577}
578
579/* CALLED BY: MessageHandler
580 * INPUTS: <none>
581 * OUTPUTS: <none>
582 *
583 * DESCRIPTION: This method associates the services that components provide with the
584 * services that are requested in the mission. Each service in the mission is given
585 * the ID and FD of a component that has registered to provide that service. Deregistration
586 * is okay until this method is called without a reload, but if deregistration occurs after this
587 * method is called it needs to be called again even if other engines also provide the services
588 */
589void
590ServiceManagementLayer::SetActiveMission()
591{
592    char buffer[256];
593    memset(buffer, 0, 256);
594    ReadMessage(shellSocketFD, buffer);
595
596    uint32_t missID = atoi(buffer);
597    for(activeMission = 0; activeMission < 10; activeMission++) {
598        /* Find the active mission by comparing mission ID's */
599        if(miss[activeMission].missionID == missID)
600            break;
601    }
602
603    LOG("ServiceManagementLayer:: Received Set Active Mission command: %i.\n", missID);
604
605    /* For each service in the mission */
606    for(size_t i = 0; i < miss[activeMission].numServices; i++) {
607        /* Check whether the current service is an actual service or a conditional */
608        if(miss[activeMission].services[i].name.compare("if") && \
609                miss[activeMission].services[i].name.compare("dowhile") && \
610                miss[activeMission].services[i].name.compare("shell")) {
611                /* If it is a service, search the database of registered services to find
612                 * the ID of the component that registered it */
613                _services_DB->command="select ";
614                _services_DB->command.append(_services_DB->tablename);
615                _services_DB->command.append(".* from ");
616                _services_DB->command.append( _services_DB->tablename);
617                _services_DB->command.append(" where Service_Name=='");
618                _services_DB->command.append(miss[activeMission].services[i].name);
619                _services_DB->command.append("';");
620   
621                sqlite3_stmt * pStatement;
622                int32_t rc = sqlite3_prepare_v2(_services_DB->db, _services_DB->command.c_str(), \
623                        -1, &pStatement, NULL);
624                if(rc == SQLITE_OK) {
625                    if (sqlite3_step(pStatement) == SQLITE_ROW)
626                        miss[activeMission].services[i].componentID = sqlite3_column_int(pStatement, 0);
627                    else {
628                        WARNING("services_DB:: Mission requires service %s ", \
629                                miss[activeMission].services[i].name.c_str());
630                        WARNING("not provided by any connected component.\n")
631                        rc = 31337;
632                    }
633                } else {
634                    WARNING("services_DB:: Error executing SQL statement. rc = %i\n%s\n", \
635                            rc, _services_DB->command.c_str());
636                }
637
638                sqlite3_finalize(pStatement);
639                miss[activeMission].services[i].socketFD = \
640                    CE_List[miss[activeMission].services[i].componentID].FD;
641        }
642        /* TODO Nothing to be done for conditionals at this stage */
643    }
644 
645    SendMessage(shellSocketFD, "ack");
646    LOG("ServiceManagementLayer:: Done setting active mission.\n");
647}
648
649/* CALLED BY: PerformActiveMission
650 * INPUTS: |sourceID| ID of the service that is being processed
651 * OUTPUTS: <none>
652 *
653 * DESCRIPTION: This is a helper method for the "PerformActiveMission" function
654 * NOTE: This function has changed drastically from the previous implementation
655 *
656 * Takes an ID of a service. For that service, finds inputs in DB and forwords
657 * those on to the engine after sending comm-starting messages. Afterwords, listenes
658 * for the outputs so that it can store those in the database for future services or
659 * the overall output
660 */
661void
662ServiceManagementLayer::TransactData(int32_t sourceID)
663{
664    char buffer[256];
665    std::string data;
666    char *cols[] = {(char *) "Tag", (char *) "Data"};
667    char *token;
668
669   /* Send a message directly to the shell */
670   if(miss[activeMission].services[sourceID].name.find("shell") != string::npos) {
671       shellFound=true;
672
673       int32_t k = 0;
674       while((k < 10) && (!miss[activeMission].input[k].empty())) {
675           k++;
676       }
677
678       sprintf(buffer, "%d", k);
679       SendMessage(shellSocketFD, buffer);
680       for(int32_t t = 0; t < k; t++) {
681           memset(buffer, 0 , 256);
682           _data_DB->command="select ";
683           _data_DB->command.append(_data_DB->tablename);
684           _data_DB->command.append(".* from ");
685           _data_DB->command.append(_data_DB->tablename);
686           _data_DB->command.append(" where Tag=='");
687           _data_DB->command.append(miss[activeMission].input[t]);
688           _data_DB->command.append("';");
689           sqlite3_stmt * pStatement;
690
691           int32_t rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), \
692                   -1, &pStatement, NULL);
693           if(rc == SQLITE_OK) {
694               if(sqlite3_step(pStatement) == SQLITE_ROW)
695                   data=((const char*) sqlite3_column_text(pStatement, 1));
696               else {
697                   LOG("3data_DB:: Data not yet in DB., %s\n", _data_DB->command.c_str());
698                   rc = 31337;
699               }
700           }
701           else {
702               WARNING("data_DB:: Error executing SQL statement. rc = %i\n%s\n", \
703                       rc,_data_DB->command.c_str());
704           }
705
706           sqlite3_finalize(pStatement);
707           token = strtok((char *) data.c_str(), "@");
708           token = strtok(NULL, "@");
709           SendMessage(shellSocketFD, token);
710           token = strtok(NULL, "@");
711           SendMessage(shellSocketFD, token);
712       }
713
714       return;
715   }
716
717    /* If this is a service command and not a shell command... */
718    /* Transmission starting messages */
719    SendMessage(miss[activeMission].services[sourceID].socketFD, "request_optimization_service");
720    SendMessage(miss[activeMission].services[sourceID].socketFD, \
721            miss[activeMission].services[sourceID].name.c_str());
722
723   /* If the service takes a parameter, feed that parameter in
724   if(!miss[activeMission].services[sourceID].parameter.empty()){
725    //printf("sending parameter!\n");
726    SendMessage(miss[activeMission].services[sourceID].socketFD, "1");
727    SendMessage(miss[activeMission].services[sourceID].socketFD, "parameter");
728    SendMessage(miss[activeMission].services[sourceID].socketFD, miss[activeMission].services[sourceID].parameter.c_str());
729    }
730
731
732    //Load and transmit the input data
733    int i = 0;
734    while(i < 10 && !miss[activeMission].services[sourceID].input[i].empty()){
735        _data_DB->command="select ";
736       _data_DB->command.append(_data_DB->tablename);
737        _data_DB->command.append(".* from ");
738        _data_DB->command.append(_data_DB->tablename);
739        _data_DB->command.append(" where Tag=='");
740    _data_DB->command.append(miss[activeMission].services[sourceID].input[i]);
741        _data_DB->command.append("';");
742        sqlite3_stmt * pStatement;
743        int rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), -1, &pStatement, NULL);
744        if (rc == SQLITE_OK){
745            if (sqlite3_step(pStatement) == SQLITE_ROW)
746             data.append((const char*) sqlite3_column_text(pStatement, 1));
747            else {
748                printf("2data_DB:: Data not yet in DB.\n");
749            rc=31337;
750        }
751        }
752    else {
753        printf("data_DB:: Error executing SQL statement. rc = %i\n%s\n",rc,_data_DB->command.c_str());
754        }
755        sqlite3_finalize(pStatement);
756
757        token = strtok((char *)data.c_str(), "@");
758    while(token){
759        SendMessage(miss[activeMission].services[sourceID].socketFD, token);
760        token = strtok(NULL, "@");
761    }
762    i++;
763    data.clear();
764    }
765    int32_t j = 0;
766   
767    //Receive and store the output data
768    while(j < 10 && !miss[activeMission].services[sourceID].output[j].empty()){
769    int rc;
770    memset(buffer, 0, 256);
771    //Read the number of datapairs for this output
772    ReadMessage(miss[activeMission].services[sourceID].socketFD, buffer);
773    data.append(buffer);
774    data.append("@");
775    int t = atoi(buffer);
776    for(int k = 0; k < t; k++){
777        //Read the datapairs incrementally and deliminate it with the "@" symbol
778        memset(buffer, 0, 256);
779        ReadMessage(miss[activeMission].services[sourceID].socketFD, buffer);
780        data.append(buffer);
781        data.append("@");
782        memset(buffer, 0, 256);
783        ReadMessage(miss[activeMission].services[sourceID].socketFD, buffer);
784        data.append(buffer);
785        data.append("@");
786    }
787
788    _data_DB->command="insert or replace into ";
789        _data_DB->command.append(_data_DB->tablename);
790          _data_DB->command.append(" (");
791        _data_DB->command.append(cols[0]);
792        _data_DB->command.append(", ");
793    _data_DB->command.append(cols[1]);
794    _data_DB->command.append(") ");
795    _data_DB->command.append(" values('");   
796    _data_DB->command.append(miss[activeMission].services[sourceID].output[j]);
797    _data_DB->command.append("', '");
798    _data_DB->command.append(data);
799    _data_DB->command.append("');");
800        char *errorMsg;
801       rc = sqlite3_exec(_data_DB->db, _data_DB->command.c_str(), callback, 0, &errorMsg);
802       if( rc!=SQLITE_OK && rc!=101 )
803           fprintf(stderr, "SQL error: %s\n", errorMsg);
804        //printf("S: done putting ouptut data into DB for ID#='%d', data=%s\n", sourceID, data.c_str());
805    j++;
806        data.clear();
807    }
808    //printf("done transact data!\n");
809   // LOG("ServiceManagementLayer:: Finished with data transaction.\n");
810
811
812    /*printf("\n\n\n");
813    // generate commandi
814    strcpy(_data_DB->command, "select ");
815    strcat(_data_DB->command, _data_DB->tablename);
816    strcat(_data_DB->command, ".* from ");
817    strcat(_data_DB->command, _data_DB->tablename);
818    strcat(_data_DB->command, ";");
819
820    // execute print (select all)  command   
821    char *errorMsg;
822    int rc = sqlite3_exec(_data_DB->db, _data_DB->command, callback, 0, &errorMsg);
823    if( rc!=SQLITE_OK && rc!=101 )
824        fprintf(stderr, "SQL error: %s\n", errorMsg);
825    printf("database %s, table %s:\n", _data_DB->filename, _data_DB->tablename);
826    printf("\n\n\n");*/
827}
828
829
830
831/* CALLED BY: MessageHandler
832 * INPUTS: <none>
833 * OUTPUTS: <none>
834 *
835 * DESCRIPTION: This function works by first sending the inputs from the shell to the appropriate components
836 * The first service should begin immeadiately, as should any others who have all of their input parameters
837 * When they complete, the output path is found and the data is transfered as it becomes available
838 * Presumably at this point the second function has all of it's parameters, so it begins to compute, and the cycle repeats
839 * 
840 *
841 * Rules for active missions (currently)
842 * -Five inputs/outputs per service and per mission
843 * -All ordering constraints have been relaxed in this version; all data is stored locally and only sent when requested
844 * -If and while support fully implemented - up to three levels (if's can be nested, but dowhiles cannot)
845 * -For dowhiles, assumes loop condition determined on last line
846 * -IMPORTANT: DB uses '@' to seperate individual statements; using '@' in the data stream will result in incorrect behavior
847 */
848
849//WHILE
850void
851ServiceManagementLayer::PerformActiveMission()
852{
853    //printf("start PAM\n");
854    uint16_t i = 0, t;
855    shellFound = false;
856    std::string data_param, data_obsv, data;
857    std::string input;
858    std::string check;
859    char buffer[256];
860    char buffer1[256];
861    std::string token, token2;
862    std::string data2;
863    int rc;
864    char *errorMsg;
865    char* cols[] = {(char *)"Tag", (char *)"Data"};
866
867    LOG("ServiceManagementLayer:: Received PerformActiveMission command.\n");
868
869    //Get the inputs
870    memset(buffer, 0, 256);
871    //Needed to read the zero passed in by GOP as the number of observables
872    ReadMessage(shellSocketFD, buffer);
873
874
875    /* Receive Set of Parameters */
876    memset(buffer, 0, 256);
877    ReadMessage(shellSocketFD, buffer);
878    t=atoi(buffer);
879    for(int m = 0; m < t; m++) {
880        memset(buffer1, 0, 256);
881        ReadMessage(shellSocketFD, buffer1);
882    _data_DB->command="insert into ";
883        _data_DB->command.append(_data_DB->tablename);
884          _data_DB->command.append(" (");
885        _data_DB->command.append(cols[0]);
886        _data_DB->command.append(", ");
887    _data_DB->command.append(cols[1]);
888    _data_DB->command.append(") ");
889        memset(buffer, 0, 256);
890        ReadMessage(shellSocketFD, buffer);
891    _data_DB->command.append(" values('");
892    _data_DB->command.append(buffer1);
893    _data_DB->command.append("', '1@");
894    _data_DB->command.append(buffer1);
895    _data_DB->command.append("@");
896    _data_DB->command.append(buffer);
897    _data_DB->command.append("');");
898       rc = sqlite3_exec(_data_DB->db, _data_DB->command.c_str(), callback, 0, &errorMsg);
899       if( rc!=SQLITE_OK && rc!=101 )
900           fprintf(stderr, "SQL error: %s\n", errorMsg);
901    }
902
903    //Useful for spotchecking what's in the database
904    /*printf("\n\n\n");
905    // generate commandi
906    strcpy(_data_DB->command, "select ");
907    strcat(_data_DB->command, _data_DB->tablename);
908    strcat(_data_DB->command, ".* from ");
909    strcat(_data_DB->command, _data_DB->tablename);
910    strcat(_data_DB->command, ";");
911
912    // execute print (select all)  command 
913    rc = sqlite3_exec(_data_DB->db, _data_DB->command, callback, 0, &errorMsg);
914    if( rc!=SQLITE_OK && rc!=101 )
915        fprintf(stderr, "SQL error: %s\n", errorMsg);
916    printf("database %s, table %s:\n", _data_DB->filename, _data_DB->tablename);
917    printf("\n\n\n");*/
918
919
920    i=0;
921    int32_t numstatements[3] = {0,0,0};
922    while(i < miss[activeMission].numServices)
923    {
924    if(miss[activeMission].services[i].name.compare("if")==0)
925    {
926       //printf("L0:if detected\n");
927        input.clear();
928        check.clear();
929        int t;
930        for(t = 0; t < 10; t++){
931        if(!miss[activeMission].services[i].output[t].empty()){
932            input=miss[activeMission].services[i-numstatements[0]-1].output[t];
933            _data_DB->command="SELECT ";
934            _data_DB->command.append(_data_DB->tablename);
935            _data_DB->command.append(".* from ");
936            _data_DB->command.append(_data_DB->tablename);
937            _data_DB->command.append(" where Tag=='");
938            _data_DB->command.append(input);
939            _data_DB->command.append("';");
940            sqlite3_stmt * pStatement;
941            rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), -1, &pStatement, NULL);
942            if (rc == SQLITE_OK){
943            if (sqlite3_step(pStatement) == SQLITE_ROW)
944                 data = (const char *) sqlite3_column_text(pStatement, 1);
945            else {
946                printf("1 data_DB:: Data not yet in DB.\n");
947                rc=31337;
948            }
949            } else {
950            printf("data_DB:: Error executing SQL statement. rc = %i\n%s\n",rc,_data_DB->command.c_str());
951            }
952                sqlite3_finalize(pStatement);
953            //printf("data=%s\n", data.c_str());
954            int pos = data.find_last_of("@", data.length()-2);
955            token = data.substr(pos+1);
956            token.erase(token.length()-1);
957            //printf("token=%s, %d\n", token.c_str(), pos);
958        data.clear();
959            break;
960        }
961        }
962        //printf("Level 0:--- %s  %s---\n", miss[activeMission].services[i].output[t].c_str(), token);
963        bool doit = false;
964        if(miss[activeMission].services[i].output[t].find(">") != string::npos){
965            std::string data2;
966            _data_DB->command="SELECT ";
967            _data_DB->command.append(_data_DB->tablename);
968            _data_DB->command.append(".* from ");
969            _data_DB->command.append(_data_DB->tablename);
970            _data_DB->command.append(" where Tag=='");
971            _data_DB->command.append(miss[activeMission].services[i].output[t].erase(0, 1));
972            _data_DB->command.append("';");
973            sqlite3_stmt * pStatement;
974            rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), -1, &pStatement, NULL);
975            if (rc == SQLITE_OK){
976            if (sqlite3_step(pStatement) == SQLITE_ROW)
977                 data2 = (const char *) sqlite3_column_text(pStatement, 1);
978            else {
979                printf("2 data_DB:: Data not yet in DB.\n");
980                rc=31337;
981            }
982            } else {
983            printf("data_DB:: Error executing SQL statement. rc = %i\n%s\n",rc,_data_DB->command.c_str());
984            }
985            sqlite3_finalize(pStatement);
986
987            int pos = data2.find_last_of("@", data2.length()-2);
988            token2 = data2.substr(pos+1);
989            token2.erase(token2.length()-1);
990            //printf("token2=%s, %d\n", token2.c_str(), pos);
991            if(atof(token.c_str()) > atof(token2.c_str()))
992            doit=true;
993            //printf("%s %s\n", buffer, token);
994        }
995        else if (miss[activeMission].services[i].output[t].find(token) != string::npos)
996                doit=true;
997        if(doit){
998        //printf("Level 0:if taken\n");
999        for(uint16_t k = i+1; k <= i+miss[activeMission].services[i].num_conds; k++){
1000            if(miss[activeMission].services[k].name.compare("if")==0){
1001            //printf("Level 1:if detected\n");
1002                input.clear();
1003                check.clear();
1004                for(t = 0; t < 10; t++){
1005                if(!miss[activeMission].services[k].output[t].empty()){
1006                    input=miss[activeMission].services[k-numstatements[1]-1].output[t];
1007                        _data_DB->command="SELECT ";
1008                        _data_DB->command.append(_data_DB->tablename);
1009                       _data_DB->command.append(".* from ");
1010                       _data_DB->command.append(_data_DB->tablename);
1011                      _data_DB->command.append(" where Tag=='");
1012                     _data_DB->command.append(input);
1013                    _data_DB->command.append("';");
1014                    sqlite3_stmt * pStatement;
1015                    rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), -1, &pStatement, NULL);
1016                    if (rc == SQLITE_OK){
1017                    if (sqlite3_step(pStatement) == SQLITE_ROW)
1018                         data = (const char *) sqlite3_column_text(pStatement, 1);
1019                    else {
1020                        printf("3 data_DB:: Data not yet in DB.\n");
1021                        rc=31337;
1022                    }
1023                    } else {
1024                    printf("data_DB:: Error executing SQL statement. rc = %i\n%s\n",rc,_data_DB->command.c_str());
1025                    }
1026                        sqlite3_finalize(pStatement);
1027                    //printf("Level 1:data=%s\n", data.c_str());
1028                    int pos = data.find_last_of("@", data.length()-2);
1029                    token = data.substr(pos+1);
1030                    token.erase(token.length()-1);
1031                    //printf("Level 1:token=%s\n", token);
1032                    break;
1033                }
1034                }
1035                bool doit = false;
1036                if(miss[activeMission].services[k].output[t].find(">") != string::npos){
1037                    std::string data2;
1038                        _data_DB->command="SELECT ";
1039                    _data_DB->command.append(_data_DB->tablename);
1040                    _data_DB->command.append(".* from ");
1041                    _data_DB->command.append(_data_DB->tablename);
1042                    _data_DB->command.append(" where Tag=='");
1043                    _data_DB->command.append(miss[activeMission].services[k].output[t].erase(0, 1));
1044                    _data_DB->command.append("';");
1045                    sqlite3_stmt * pStatement;
1046                    rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), -1, &pStatement, NULL);
1047                    if (rc == SQLITE_OK){
1048                    if (sqlite3_step(pStatement) == SQLITE_ROW)
1049                         data2 = (const char *) sqlite3_column_text(pStatement, 1);
1050                    else {
1051                        printf("4 data_DB:: Data not yet in DB.\n");
1052                        rc=31337;
1053                    }
1054                    } else {
1055                    printf("data_DB:: Error executing SQL statement. rc = %i\n%s\n",rc,_data_DB->command.c_str());
1056                    }
1057                        sqlite3_finalize(pStatement);
1058                    int pos = data2.find_last_of("@", data2.length()-2);
1059                    token2 = data2.substr(pos+1);
1060                    token2.erase(token2.length()-1);
1061                        //printf("token=%s token2=%s\n", token.c_str(), token2.c_str());
1062                        if(atof(token.c_str()) > atof(token2.c_str()))
1063                        doit=true;
1064                }
1065                else if (miss[activeMission].services[k].output[t].find(token) != string::npos)
1066                doit=true;
1067                if(doit){
1068                //printf("Level 1:if taken\n");
1069                for(uint16_t j = k+1; j <= k+miss[activeMission].services[k].num_conds; j++){
1070                    if(miss[activeMission].services[j].name.compare("if")==0){
1071                    //printf("Level 2:if detected\n");
1072                        input.clear();
1073                        check.clear();
1074                        for(t = 0; t < 10; t++){
1075                        if(!miss[activeMission].services[j].output[t].empty()){
1076                            input=miss[activeMission].services[j-numstatements[2]-1].output[t];
1077                                   _data_DB->command="SELECT ";
1078                                _data_DB->command.append(_data_DB->tablename);
1079                               _data_DB->command.append(".* from ");
1080                               _data_DB->command.append(_data_DB->tablename);
1081                              _data_DB->command.append(" where Tag=='");
1082                             _data_DB->command.append(input);
1083                            _data_DB->command.append("';");
1084                            sqlite3_stmt * pStatement;
1085                            rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), -1, &pStatement, NULL);
1086                            if (rc == SQLITE_OK){
1087                            if (sqlite3_step(pStatement) == SQLITE_ROW)
1088                                 data = (const char *) sqlite3_column_text(pStatement, 1);
1089                            else {
1090                                printf("5 data_DB:: Data not yet in DB.\n");
1091                                rc=31337;
1092                            }
1093                            } else {
1094                            printf("data_DB:: Error executing SQL statement. rc = %i\n%s\n",rc,_data_DB->command.c_str());
1095                            }
1096                                sqlite3_finalize(pStatement);
1097                            //printf("data=%s\n", data.c_str());
1098                            int pos = data.find_last_of("@", data.length()-2);
1099                            token = data.substr(pos+1);
1100                            token.erase(token.length()-1);
1101                            //printf("token=%s\n", token.c_str());
1102                            data.clear();
1103                            break;
1104                        }
1105                        }
1106                        bool doit = false;
1107                        if(miss[activeMission].services[j].output[t].find(">") != string::npos){
1108                            _data_DB->command="SELECT ";
1109                            _data_DB->command.append(_data_DB->tablename);
1110                            _data_DB->command.append(".* from ");
1111                           _data_DB->command.append(_data_DB->tablename);
1112                            _data_DB->command.append(" where Tag=='");
1113                            _data_DB->command.append(miss[activeMission].services[j].output[t].erase(0, 1));
1114                            _data_DB->command.append("';");
1115                            sqlite3_stmt * pStatement;
1116                            rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), -1, &pStatement, NULL);
1117                            if (rc == SQLITE_OK){
1118                            if (sqlite3_step(pStatement) == SQLITE_ROW)
1119                                 data2 = (const char *) sqlite3_column_text(pStatement, 1);
1120                            else {
1121                                printf("6 data_DB:: Data not yet in DB.\n");
1122                                rc=31337;
1123                            }
1124                            } else {
1125                            printf("data_DB:: Error executing SQL statement. rc = %i\n%s\n",rc,_data_DB->command.c_str());
1126                            }
1127                                sqlite3_finalize(pStatement);
1128
1129                        int pos = data2.find_last_of("@", data2.length()-2);
1130                        token2 = data2.substr(pos+1);
1131                        token2.erase(token2.length()-1);
1132                            if(atof(token.c_str()) > atof(token2.c_str()))
1133                            doit=true;
1134                        data.clear();
1135                        }
1136                        else if (miss[activeMission].services[j].output[t].find(token) != string::npos)
1137                        doit=true;
1138                        if(doit){
1139                        //printf("Level 1:if taken\n");
1140                        for(uint16_t l = j+1; l <= j+miss[activeMission].services[j].num_conds; l++){
1141                            TransactData(l);
1142                        }
1143                        }
1144                        else
1145                        //printf("Level 2:if not taken\n");
1146                        numstatements[2] +=miss[activeMission].services[j].num_conds+1;
1147                        j+=miss[activeMission].services[j].num_conds;
1148                        //printf("Level 2:doneif %d, %d, %d\n", numstatements, miss[activeMission].services[i].num_conds, i);
1149                    }else if(miss[activeMission].services[j].name.compare("dowhile")==0){
1150                        numstatements[0]=0;
1151                        //printf("Level 2:while detected\n");
1152                        while(true){
1153                            uint16_t l;
1154                            for(l = j+1; l <= j+miss[activeMission].services[j].num_conds; l++){
1155                            TransactData(l);
1156                            }
1157                            data.clear();
1158                            input.clear();
1159                            check.clear();
1160                            int t;
1161                            for(t = 0; t < 10; t++){
1162                            if(!miss[activeMission].services[j].output[t].empty()){
1163                                input=miss[activeMission].services[l-2].output[t];
1164                                //printf("%s\n",input.c_str());
1165                                _data_DB->command="SELECT ";
1166                                _data_DB->command.append(_data_DB->tablename);
1167                                _data_DB->command.append(".* from ");
1168                                _data_DB->command.append(_data_DB->tablename);
1169                                _data_DB->command.append(" where Tag=='");
1170                                _data_DB->command.append(input);
1171                                _data_DB->command.append("';");
1172                                sqlite3_stmt * pStatement;
1173
1174                                rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), -1, &pStatement, NULL);
1175                                if (rc == SQLITE_OK){
1176                                if (sqlite3_step(pStatement) == SQLITE_ROW)
1177                                     data = (const char *) sqlite3_column_text(pStatement, 1);
1178                                else {
1179                                    printf("7 data_DB:: Data not yet in DB.: %s\n", _data_DB->command.c_str());
1180                                    rc=31337;
1181                                }
1182                                } else {
1183                                  printf("data_DB:: SQL statement error. rc = %i\n%s\n",rc,_data_DB->command.c_str());
1184                                }
1185                                    sqlite3_finalize(pStatement);
1186                                //printf("Level 2:data=%s\n", data.c_str());
1187                                    int pos = data.find_last_of("@", data.length()-2);
1188                                    token = data.substr(pos+1);
1189                                    token.erase(token.length()-1);
1190                                //printf("Level 2:token=%s\n", token);
1191                                break;
1192                            }
1193                            }
1194                            //printf("Level 2:--- %s  %s---\n", miss[activeMission].services[j].output[t].c_str(), token);
1195                            if(miss[activeMission].services[j].output[t].find(token) != string::npos){
1196                            //printf("Level 2:while taken again!\n");
1197                            }
1198                            else {
1199                            //printf("Level 2:no more while\n");
1200                            break;}
1201                        }
1202                        j+=miss[activeMission].services[j].num_conds;
1203                    }
1204                    else{
1205                        //printf("Level 2: no conditional\n");
1206                        numstatements[2]=0;
1207                            TransactData(j);
1208                    }
1209                }
1210                }
1211                else{
1212                //printf("Level 1: if not taken\n");
1213                }
1214                    numstatements[1] +=miss[activeMission].services[k].num_conds+1;
1215                    k+=miss[activeMission].services[k].num_conds;
1216                //printf("doneif %d, %d, %d\n", numstatements, miss[activeMission].services[i].num_conds, i);
1217            }else if(miss[activeMission].services[k].name.compare("dowhile")==0){
1218                numstatements[0]=0;
1219                //printf("Level 1: while detected\n");
1220                while(true){
1221                uint16_t j;
1222                    for(j = k+1; j <= k+miss[activeMission].services[k].num_conds; j++){
1223                    TransactData(j);
1224                    }
1225                    data.clear();
1226                    input.clear();
1227                    check.clear();
1228                    int t;
1229                    for(t = 0; t < 10; t++){
1230                    if(!miss[activeMission].services[k].output[t].empty()){
1231                        input=miss[activeMission].services[j-1].output[t];
1232                        _data_DB->command="SELECT ";
1233                        _data_DB->command.append(_data_DB->tablename);
1234                        _data_DB->command.append(".* from ");
1235                        _data_DB->command.append(_data_DB->tablename);
1236                        _data_DB->command.append(" where Tag=='");
1237                        _data_DB->command.append(input);
1238                        _data_DB->command.append("';");
1239                        sqlite3_stmt * pStatement;
1240                        rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), -1, &pStatement, NULL);
1241                        if (rc == SQLITE_OK){
1242                        if (sqlite3_step(pStatement) == SQLITE_ROW)
1243                             data = (const char *) sqlite3_column_text(pStatement, 1);
1244                        else {
1245                            printf("8 data_DB:: Data not yet in DB.\n");
1246                            rc=31337;
1247                        }
1248                        } else {
1249                        printf("data_DB:: Error executing SQL statement. rc = %i\n%s\n",rc,_data_DB->command.c_str());
1250                        }
1251                            sqlite3_finalize(pStatement);
1252                        //printf("Level 1:data=%s\n", data.c_str());
1253                        int pos = data.find_last_of("@", data.length()-2);
1254                        token = data.substr(pos+1);
1255                        token.erase(token.length()-1);
1256                        //printf("Level 1:token=%s\n", token);
1257                        break;
1258                    }
1259                    }
1260                    //printf("Level 1:--- %s  %s---\n", miss[activeMission].services[k].output[t].c_str(), token);
1261                    if(miss[activeMission].services[k].output[t].find(token) != string::npos){
1262                    //printf("Level 1:while taken again!\n");
1263                    }
1264                    else {
1265                    //printf("Level 1:While finished\n");
1266                    break;}
1267                }
1268                k+=miss[activeMission].services[k].num_conds;
1269            }
1270            else{
1271                //printf("Level1:No conditional\n");
1272                numstatements[1]=0;
1273                    TransactData(k);
1274            }
1275        }
1276        }
1277            numstatements[0] +=miss[activeMission].services[i].num_conds+1;
1278        i+=miss[activeMission].services[i].num_conds;
1279    }
1280    else if(miss[activeMission].services[i].name.compare("dowhile")==0)
1281    {
1282        numstatements[0]=0;
1283        //printf("Level 0: while detected\n");
1284        while(true){
1285        uint16_t k;
1286            for(k = i+1; k <= i+miss[activeMission].services[i].num_conds; k++){
1287            TransactData(k);
1288            }
1289            data.clear();
1290            input.clear();
1291            check.clear();
1292            int t;
1293            for(t = 0; t < 10; t++){
1294            if(!miss[activeMission].services[i].output[t].empty()){
1295                input=miss[activeMission].services[k-1].output[t];
1296                _data_DB->command="SELECT ";
1297                _data_DB->command.append(_data_DB->tablename);
1298                _data_DB->command.append(".* from ");
1299                _data_DB->command.append(_data_DB->tablename);
1300                _data_DB->command.append(" where Tag=='");
1301                _data_DB->command.append(input);
1302                _data_DB->command.append("';");
1303                sqlite3_stmt * pStatement;
1304                rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), -1, &pStatement, NULL);
1305                if (rc == SQLITE_OK){
1306                if (sqlite3_step(pStatement) == SQLITE_ROW)
1307                     data = (const char *) sqlite3_column_text(pStatement, 1);
1308                else {
1309                    printf("10data_DB:: Data not yet in DB.\n");
1310                    rc=31337;
1311                }
1312                } else {
1313                printf("data_DB:: Error executing SQL statement. rc = %i\n%s\n",rc,_data_DB->command.c_str());
1314                }
1315                    sqlite3_finalize(pStatement);
1316                int pos = data.find_last_of("@", data.length()-2);
1317                token = data.substr(pos+1);
1318                token.erase(token.length()-1);
1319                break;
1320            }
1321            }
1322            //printf("Level 0:--- %s  %s---\n", miss[activeMission].services[i].output[t].c_str(), token);
1323            if(miss[activeMission].services[i].output[t].find(token) != string::npos){
1324            //printf("Level 0:while taken again!\n");
1325            }
1326            else {
1327            //printf("Level 0:no more while\n");
1328            break;}
1329        }
1330        i+=miss[activeMission].services[i].num_conds;
1331    }
1332    else{
1333        numstatements[0]=0;
1334        //printf("Level 0: No conditional\n");
1335        TransactData(i);}
1336    i++;
1337    }
1338    i=0;
1339    data.clear();
1340
1341    if(!shellFound)
1342    {
1343    int k = 0;
1344    while(k<10 && !miss[activeMission].input[k].empty()){
1345        k++;
1346    }
1347    sprintf(buffer, "%d", k);
1348    SendMessage(shellSocketFD, buffer);
1349    for(int t = 0; t < k; t++){
1350        SendMessage(shellSocketFD, miss[activeMission].input[t].c_str());
1351        SendMessage(shellSocketFD, "0");
1352    }
1353    }
1354
1355
1356    LOG("ServiceManagementLayer:: Done performing active mission.\n");
1357    /*
1358    strcpy(_data_DB->command, "select ");
1359    strcat(_data_DB->command, _data_DB->tablename);
1360    strcat(_data_DB->command, ".* from ");
1361    strcat(_data_DB->command, _data_DB->tablename);
1362    strcat(_data_DB->command, ";");
1363
1364    // execute print (select all)  command 
1365    rc = sqlite3_exec(_data_DB->db, _data_DB->command, callback, 0, &errorMsg);
1366    if( rc!=SQLITE_OK && rc!=101 )
1367        fprintf(stderr, "SQL error: %s\n", errorMsg);
1368    printf("database %s, table %s:\n", _data_DB->filename, _data_DB->tablename);
1369    printf("\n\n\n");*/
1370}
1371
1372
1373/* CALLED BY: MessageHandler
1374 * INPUTS: <none>
1375 * OUTPUTS: <none>
1376 *
1377 * DESCRIPTION: Print a list of the services currently registered and the ID's of the components that registered them
1378 */
1379void
1380ServiceManagementLayer::ListServices()
1381{
1382    _services_DB->command="select ";
1383    _services_DB->command.append(_services_DB->tablename);
1384    _services_DB->command.append(".* from ");
1385    _services_DB->command.append(_services_DB->tablename);
1386    _services_DB->command.append(";");
1387
1388    // execute print (select all)  command   
1389    char *errorMsg;
1390    int rc = sqlite3_exec(_services_DB->db, _services_DB->command.c_str(), callback, 0, &errorMsg);
1391    if( rc!=SQLITE_OK && rc!=101 )
1392        fprintf(stderr, "SQL error: %s\n", errorMsg);
1393    printf("database %s, table %s:\n", _services_DB->filename.c_str(), _services_DB->tablename.c_str());
1394}
1395
1396/* CALLED BY: Reset
1397 * INPUTS: <none>
1398 * OUTPUTS: <none>
1399 *
1400 * DESCRIPTION: Clear and reinitialize the mission array, then reload the configuration file
1401 */
1402void
1403ServiceManagementLayer::ReloadConfiguration()
1404{
1405    LOG("ServiceManagementLayer:: Reloading Configuration.\n");
1406    free(miss);
1407    miss = new Mission[10];
1408    for(int i = 0; i < 10; i++)
1409        miss[i].services = new Service[30];
1410    LoadConfiguration(_SML_Config.c_str(), miss);
1411}
1412
1413/* CALLED BY: constructor
1414 * INPUTS: |SML_Config| Address (either relitive or full) of the XML file containing mission data
1415 *        |mList| Mission array to be modified
1416 * OUTPUTS: <none>
1417 *
1418 * DESCRIPTION: IMPORTANT - See formatting instructions for correct parsing of data
1419 * Can currently handle 10 inputs and 10 outputs per service, but easily expandable
1420 * Also, can handle two layer of nested conditional statements, but could
1421 * be expanded to meet additional needs.
1422 *
1423 * Components assigned to mission during "set active mission" stage so that
1424 * components can still continue to register after the configuration is loaded
1425 */
1426void
1427ServiceManagementLayer::LoadConfiguration(const char *SML_Config, Mission* &mList)
1428{
1429    TiXmlElement *pMission;
1430    TiXmlElement *pService;
1431    TiXmlElement *pChild0, *pChild1, *pChild2, *pChild3, *pChild4;
1432    TiXmlHandle hRoot(0);
1433    printf("ServiceManagementLayer:: Loading Configuration.\n");
1434    TiXmlDocument doc(".");
1435    doc.LoadFile(SML_Config);
1436    bool loadOkay = doc.LoadFile();
1437    if(!loadOkay)
1438        printf("Loading SML configuration failed: %s\n", SML_Config);
1439
1440    TiXmlHandle hDoc(&doc);
1441   
1442    pMission = hDoc.FirstChildElement().Element();
1443
1444    if(!pMission)
1445        printf("No valid root!");
1446
1447    hRoot = TiXmlHandle(pMission);
1448    pService = pMission->FirstChildElement();
1449    int32_t mission_num = 0;
1450    //Iterate through the missions
1451    for(pChild0 = pMission->FirstChildElement(); pChild0 ; \
1452        pChild0 = pChild0->NextSiblingElement())
1453    {
1454    int32_t service_num = 0;
1455    uint16_t cond_array[] = {0, 0, 0};
1456   
1457        for(pChild1  = (pChild0->FirstChildElement())->FirstChildElement(); pChild1; \
1458              pChild1  = pChild1->NextSiblingElement())
1459        {
1460        int32_t conditional_0 = service_num;
1461        for(pChild2 = pChild1->FirstChildElement(); \
1462        pChild2; pChild2 = pChild2->NextSiblingElement())
1463        {
1464        service_num++;
1465        int32_t conditional_1 = service_num;
1466        for(pChild3 = pChild2->FirstChildElement(); \
1467            pChild3; pChild3 = pChild3->NextSiblingElement())
1468        {
1469            service_num++;
1470            int32_t conditional_2 = service_num;
1471            for(pChild4 = pChild3->FirstChildElement(); \
1472                pChild4; pChild4 = pChild4->NextSiblingElement())
1473            {
1474                   service_num++;
1475                if(pChild4->Attribute("name"))
1476                    mList[mission_num].services[service_num].name = pChild4->Attribute("name");
1477                else
1478                    mList[mission_num].services[service_num].name = pChild4->Value();
1479                for(int i = 1; i <= 10; i++) {
1480                    char buffer[9]="input";
1481                    sprintf(buffer, "%s%d", buffer, i);
1482                    //printf("buffer=%s\n", buffer);
1483                    if(pChild4->Attribute(buffer))
1484                        mList[mission_num].services[service_num].input[i-1] = pChild4->Attribute(buffer);
1485                    char buffer2[9]="output";
1486                    sprintf(buffer2, "%s%d", buffer2, i);
1487                    if(pChild4->Attribute(buffer2))
1488                    mList[mission_num].services[service_num].output[i-1] = pChild4->Attribute(buffer2);
1489                }
1490                if(pChild4->Attribute("parameter"))
1491                mList[mission_num].services[service_num].parameter = pChild4->Attribute("parameter");
1492                cond_array[2]++;
1493            }
1494            if(!strcmp(pChild3->Value(), "shell") || conditional_2 != service_num) {
1495                mList[mission_num].services[conditional_2].name = pChild3->Value();
1496            }
1497            else{
1498                mList[mission_num].services[service_num].name = pChild3->Attribute("name");
1499            }
1500            for(int i = 1; i <= 10; i++) {
1501                char buffer[9]="input";
1502                sprintf(buffer, "%s%d", buffer, i);
1503                if(pChild3->Attribute(buffer))
1504                mList[mission_num].services[conditional_2].input[i-1] = pChild3->Attribute(buffer);
1505                char buffer2[9]="output";
1506                sprintf(buffer2, "%s%d", buffer2, i);
1507                if(pChild3->Attribute(buffer2))
1508                mList[mission_num].services[conditional_2].output[i-1] = pChild3->Attribute(buffer2);
1509            }
1510                if(pChild3->Attribute("parameter"))
1511                mList[mission_num].services[conditional_2].parameter = pChild3->Attribute("parameter");
1512            mList[mission_num].services[conditional_2].num_conds = cond_array[2];
1513            cond_array[1]+=cond_array[2]+1;
1514            cond_array[2] = 0;
1515
1516
1517        }
1518        if(!strcmp(pChild2->Value(), "shell") || conditional_1 != service_num) {
1519            mList[mission_num].services[conditional_1].name = pChild2->Value();
1520        }
1521        else{
1522            mList[mission_num].services[service_num].name = pChild2->Attribute("name");
1523        }
1524           for(int i = 1; i <= 10; i++) {
1525            char buffer[9]="input";
1526            sprintf(buffer, "%s%d", buffer, i);
1527            if(pChild2->Attribute(buffer))
1528                mList[mission_num].services[conditional_1].input[i-1] = pChild2->Attribute(buffer);
1529            char buffer2[9]="output";
1530            sprintf(buffer2, "%s%d", buffer2, i);
1531            if(pChild2->Attribute(buffer2))
1532            mList[mission_num].services[conditional_1].output[i-1] = pChild2->Attribute(buffer2);
1533        }
1534            if(pChild2->Attribute("parameter"))
1535            mList[mission_num].services[conditional_1].parameter = pChild2->Attribute("parameter");
1536
1537        mList[mission_num].services[conditional_1].num_conds = cond_array[1];
1538        cond_array[0]+=cond_array[1]+1;
1539        cond_array[1] = 0;
1540        }
1541       
1542        if(!strcmp(pChild1->Value(), "shell") || conditional_0 != service_num) {
1543            mList[mission_num].services[conditional_0].name = pChild1->Value();
1544        }
1545        else{
1546        mList[mission_num].services[conditional_0].name = pChild1->Attribute("name");
1547        }
1548        for(int i = 1; i <= 10; i++) {
1549            char buffer[9]="input";
1550            sprintf(buffer, "%s%d", buffer, i);
1551            if(pChild1->Attribute(buffer))
1552                mList[mission_num].services[conditional_0].input[i-1] = pChild1->Attribute(buffer);
1553            char buffer2[9]="output";
1554            sprintf(buffer2, "%s%d", buffer2, i);
1555            if(pChild1->Attribute(buffer2))
1556            mList[mission_num].services[conditional_0].output[i-1] = pChild1->Attribute(buffer2);
1557        }
1558        if(pChild1->Attribute("parameter"))
1559            mList[mission_num].services[conditional_0].parameter = pChild1->Attribute("parameter");
1560        mList[mission_num].services[conditional_0].num_conds = cond_array[0];
1561            cond_array[0] = 0;
1562        service_num++;
1563    }
1564   
1565    mList[mission_num].numServices = service_num;
1566    mList[mission_num].name = pChild0->Attribute("name");
1567        mList[mission_num].missionID = atoi(pChild0->Attribute("id"));
1568    for(int i = 1; i <= 10; i++) {
1569        char buffer[9]="param";
1570        sprintf(buffer, "%s%d", buffer, i);
1571        if(pChild0->Attribute(buffer)){
1572            mList[mission_num].input[i-1] = pChild0->Attribute(buffer);
1573        }
1574    }
1575    mission_num++;
1576    }
1577    LOG("ServiceManagementLayer:: Done Loading Configuration\n");
1578}
1579
1580/* CALLED BY: MessageHandler
1581 * INPUTS: |ID| The ID number of the engine to be registered
1582 * OUTPUTS: <none>
1583 *
1584 * DESCRIPTION: Sends a registration message onto the shell and sends the ACK back to the component
1585 */
1586void
1587ServiceManagementLayer::RegisterCognitiveEngine(int32_t ID)
1588{
1589    SendMessage(shellSocketFD, "register_engine_cognitive");
1590
1591    LOG("ServiceManagementLayer:: CE registration message forwarded to shell.\n");
1592    char buffer[256];
1593    memset(buffer, 0, 256);
1594    ReadMessage(shellSocketFD, buffer);
1595    SendMessage(CE_List[ID].FD, buffer);
1596
1597    TransferRadioConfiguration(ID);
1598    memset(buffer, 0, 256);
1599    TransferExperience(ID);
1600    memset(buffer, 0, 256);
1601    numberOfCognitiveEngines++;
1602    CE_Present = true;
1603}
1604
1605/* CALLED BY: MessageHandler
1606 * INPUTS: |ID| The ID number of the engine to have it's services deregistered
1607 * OUTPUTS: <none>
1608 *
1609 * DESCRIPTION: Deletes individual services from the DB
1610 * NOTE THAT this function only needs to be called if service deregistration is going
1611 * to be done at a different time than component deregistration; it is handled
1612 * more efficiently and directly during that deregistration process.
1613 */
1614void
1615ServiceManagementLayer::DeregisterServices(int32_t ID)
1616{
1617    char buffer[256];
1618    memset(buffer, 0, 256);
1619    ReadMessage(CE_List[ID].FD, buffer);
1620    _services_DB->command="DELETE FROM ";
1621    _services_DB->command.append(_services_DB->tablename);
1622    _services_DB->command.append(" WHERE ID_Num IN (SELECT ");
1623    char tmp[3];
1624    memset(tmp,0,3);
1625    sprintf(tmp, "%d", ID);
1626    _services_DB->command.append(tmp);
1627    _services_DB->command.append(" FROM ");
1628    _services_DB->command.append(_services_DB->tablename);
1629    _services_DB->command.append(" WHERE Service_Name");
1630    _services_DB->command.append("=='");
1631    _services_DB->command.append(buffer);
1632    _services_DB->command.append("');");
1633    char *errorMsg;
1634    int rc = sqlite3_exec(_services_DB->db, _services_DB->command.c_str(), callback, 0, &errorMsg);
1635    if( rc!=SQLITE_OK && rc!=101 )
1636        fprintf(stderr, "SQL error: %s\n", errorMsg);
1637}
1638
1639/* CALLED BY: MessageHandler
1640 * INPUTS: |ID| The ID number of the engine to have it's services deregistered
1641 * OUTPUTS: <none>
1642 *
1643 * DESCRIPTION: Deletes the contact info for the cognitive engine, forwards a deregistration message to the shell
1644 * Also, deletes the services from the DB
1645 */
1646void
1647ServiceManagementLayer::DeregisterCognitiveEngine(int32_t ID)
1648{
1649    LOG("ServiceManagementLayer:: CE deregistration message forwarded to shell.\n");
1650
1651    numberOfCognitiveEngines--;
1652    if(numberOfCognitiveEngines == 0)
1653        CE_Present = false;
1654
1655    SendMessage(shellSocketFD, "deregister_engine_cognitive");
1656    char buffer[256];
1657    memset(buffer, 0, 256);
1658    ReadMessage(shellSocketFD, buffer);
1659    SendMessage(CE_List[ID].FD, buffer);
1660    if(strcmp("deregister_ack", buffer) != 0) {
1661    ERROR(1, "SML:: Failed to close CE socket\n");
1662    }
1663
1664    //Deregister the services
1665    _services_DB->command="DELETE FROM ";
1666    _services_DB->command.append(_services_DB->tablename);
1667    _services_DB->command.append(" WHERE ");
1668    _services_DB->command.append("ID_Num");
1669    _services_DB->command.append("==");
1670    char tmp[3];
1671    memset(tmp,0,3);
1672    sprintf(tmp, "%d", ID);
1673    _services_DB->command.append(tmp);
1674    _services_DB->command.append(";");
1675    char *errorMsg;
1676    int rc = sqlite3_exec(_services_DB->db, _services_DB->command.c_str(), callback, 0, &errorMsg);
1677    if( rc!=SQLITE_OK && rc!=101 )
1678        fprintf(stderr, "SQL error: %s\n", errorMsg);
1679
1680
1681    CE_List[ID].FD = -1;
1682    CE_List[ID].ID_num = -1;
1683
1684    LOG("Cognitive Radio Shell:: CE Socket closed for engine #%d.\n", ID);
1685}
1686
1687
1688/* CALLED BY: test class
1689 * INPUTS: <none>
1690 * OUTPUTS: <none>
1691 *
1692 * DESCRIPTION: Sets up a server socket and listens for communication on either that or the shell socket
1693 */
1694void
1695ServiceManagementLayer::StartSMLServer()
1696{
1697    //printf("Ready for CE Signal! (registration done)\n");
1698    struct timeval selTimeout;
1699    int32_t running = 1;
1700    int32_t port, rc, new_sd = 1;
1701    int32_t desc_ready = 1;
1702        //If there is, call the MessageHandler with the Shell_Msg code of -1
1703    fd_set sockSet, shellSet;
1704
1705    cogEngSrv = CreateTCPServerSocket(SMLport);
1706    int32_t maxDescriptor = cogEngSrv;
1707
1708    if(InitializeTCPServerPort(cogEngSrv) == -1)
1709        ERROR(1,"Error initializing primary port\n");
1710
1711   
1712    while (running) {
1713        /* Zero socket descriptor vector and set for server sockets */
1714        /* This must be reset every time select() is called */
1715        FD_ZERO(&sockSet);
1716        FD_SET(cogEngSrv, &sockSet);
1717       
1718        for(uint16_t k = 0; k < Current_ID; k++){
1719        if(CE_List[k].ID_num != -1)
1720            FD_SET(CE_List[k].FD, &sockSet);
1721    }
1722        //printf("k=%d, CID=%d\n", k, CE_List[k].FD);
1723
1724        /* Timeout specification */
1725        /* This must be reset every time select() is called */
1726        selTimeout.tv_sec = 0;       /* timeout (secs.) */
1727        selTimeout.tv_usec = 0;            /* 0 microseconds */
1728    //Changed both to zero so that select will check messages from the shell instead of blocking
1729    //when there is no command from the CE's to be processed
1730
1731        //Check if there is a message on the socket waiting to be read
1732    rc = select(maxDescriptor + 1, &sockSet, NULL, NULL, &selTimeout);
1733        //printf("rc=%d\n", rc);
1734        if(rc == 0){
1735            //LOG("No echo requests for %i secs...Server still alive\n", 10);
1736   
1737            FD_ZERO(&shellSet);
1738            FD_SET(shellSocketFD, &shellSet);
1739            selTimeout.tv_sec = 0;
1740            selTimeout.tv_usec = 0;
1741            //Check if there is a message on the shell socket ready to be processed
1742            select(shellSocketFD + 1, &shellSet, NULL, NULL, &selTimeout);
1743            //printf("rc2=%d\n", rc2);
1744        //If there is, call the MessageHandler with the Shell_Msg code of -1
1745        if(FD_ISSET(shellSocketFD, &shellSet)){
1746        //printf("shell_msg, %d\n", rc2);
1747            MessageHandler(-1);}
1748    }
1749        else {
1750            desc_ready = rc;
1751            for(port = 0; port <= maxDescriptor && desc_ready > 0; port++) {
1752                if(FD_ISSET(port, &sockSet)) {
1753                    desc_ready -= 1;
1754
1755                    //Check if request is new or on an existing open descriptor
1756                    if(port == cogEngSrv) {
1757            //If new, assign it a descriptor and give it an ID
1758                        new_sd = AcceptTCPConnection(port);
1759             
1760                        if(new_sd < 0)
1761                            break;
1762
1763                        CE_List[Current_ID].FD = new_sd;
1764            CE_List[Current_ID].ID_num = Current_ID;
1765                        MessageHandler(Current_ID);
1766            Current_ID++;
1767   
1768            FD_SET(new_sd,&sockSet);
1769                        if(new_sd > maxDescriptor)
1770                           maxDescriptor = new_sd;
1771                    }
1772                    else {
1773            //If old, figure out which ID it coresponds to and handle it accordingly
1774            for(uint16_t z = 0; z < Current_ID; z++)
1775            {
1776                if(CE_List[z].FD == port){
1777                    MessageHandler(z);}
1778            }
1779                    }
1780                }
1781            }
1782    }
1783    }       
1784
1785    /* Close sockets */
1786    close(cogEngSrv);
1787
1788    //delete &cogEngSrv;
1789    return;
1790}
Note: See TracBrowser for help on using the browser.