root/vtcross/trunk/src/service_management_layer/ServiceManagementLayer.cpp @ 446

Revision 446, 66.0 KB (checked in by bhilburn, 15 years ago)

Cleaned up the code, but it still needs to be heavily refactored. Lots
of scoping issues that I think may be breaking functionality.

Line 
1/*
2 Copyright 2009 Virginia Polytechnic Institute and State University 
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7 
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17/* Inter-component communication handled by sockets and FD's. 
18 * Server support has been completely implemented and tested.
19 *
20 * Services are stored in a SQLite DB by the ID of the CE that registered them.  Service
21 * support has been completely implemented and tested.
22 *
23 * Missions are loaded from an XML file, connected with services provided by components,
24 * and run.  See the documentation for the "PerformActiveMission" below for important
25 * info.
26 */
27
28
29#include <cmath>
30#include <cstdio>
31#include <cstdlib>
32#include <cstring>
33#include <stdint.h>
34
35#include <arpa/inet.h>
36#include <iostream>
37#include <netinet/in.h>
38#include <netdb.h>
39#include <fcntl.h>
40#include <sqlite3.h>
41#include <string>
42#include <sys/ioctl.h>
43#include <sys/mman.h>
44#include <sys/socket.h>
45#include <sys/types.h>
46#include <sys/wait.h>
47
48#include "tinyxml/tinyxml.h"
49#include "tinyxml/tinystr.h"
50
51#include "vtcross/debug.h"
52#include "vtcross/error.h"
53#include "vtcross/common.h"
54#include "vtcross/components.h"
55#include "vtcross/containers.h"
56#include "vtcross/socketcomm.h"
57
58
59typedef struct services_s *services_DB;
60typedef struct data_s *data_DB;
61
62using namespace std;
63
64struct services_s {
65    string filename;
66    string tablename;
67    string command;
68    sqlite3 *db;
69    unsigned int num_columns;
70};
71
72struct data_s {
73    string filename;
74    string tablename;
75    string command;
76    sqlite3 *db;
77    unsigned int num_columns;
78};
79
80services_DB _services_DB;
81data_DB _data_DB;
82string _SML_Config;
83bool shellFound;
84
85/* Callback function used internally by some of the SQLite3 commands */
86int32_t
87callback(void *notUsed, int32_t argc, char **argv, char **azColName)
88{
89    for(size_t i = 0; i < argc; i++) {
90        LOG("%s = %s\n", azColName[i], argv[i] ? argv[i] : "NULL");
91    }
92
93    LOG("\n");
94    return 0;
95}
96
97/* Useful for spotchecking what's in the database */
98void
99printDatabase()
100{
101    /*
102    LOG("\n\n\n");
103    strcpy(_data_DB->command, "select ");
104    strcat(_data_DB->command, _data_DB->tablename);
105    strcat(_data_DB->command, ".* from ");
106    strcat(_data_DB->command, _data_DB->tablename);
107    strcat(_data_DB->command, ";");
108
109    rc = sqlite3_exec(_data_DB->db, _data_DB->command, callback, 0, &errorMsg);
110    if((rc != SQLITE_OK) && (rc != 101))
111        WARNING("SQL error: %s\n", errorMsg);
112
113    LOG("database %s, table %s:\n", _data_DB->filename, _data_DB->tablename);
114    LOG("\n\n\n");
115    */
116}
117
118ServiceManagementLayer::ServiceManagementLayer()
119{
120    LOG("Creating Service Management Layer.\n");
121
122    shellSocketFD = -1;
123    numberOfCognitiveEngines = 0;
124    CE_Present = false;
125    cogEngSrv = 1;
126}
127
128/* Free and clear the DB's associated with this SML in the destructor.
129 *
130 * Note that exiting with an error condition will cause SML to not be destructed,
131 * resulting in the DB's staying in memory until the destructor is encountered in
132 * future executions. */
133ServiceManagementLayer::~ServiceManagementLayer()
134{
135    char *errorMsg;
136    int32_t rc;         /* sqlite command return code */
137
138    _services_DB->command = "drop table ";
139    _services_DB->command.append(_services_DB->tablename);
140    rc = sqlite3_exec(_services_DB->db, _services_DB->command.c_str(), callback, 0, &errorMsg);
141    if((rc != SQLITE_OK) && (rc != 101))
142        WARNING("ServiceManagementLayer::Destructor services 'drop table' error: %s\n", errorMsg);
143
144    _services_DB->command = "vacuum";
145    rc = sqlite3_exec(_services_DB->db, _services_DB->command.c_str(), callback, 0, &errorMsg);
146    if((rc != SQLITE_OK) && (rc != 101))
147        WARNING("ServiceManagementLayer::Destructor services 'vacuum' error: %s\n", errorMsg);
148
149    free(_services_DB);
150
151    _data_DB->command = "drop table ";
152    _data_DB->command.append(_data_DB->tablename);
153    rc = sqlite3_exec(_data_DB->db, _data_DB->command.c_str(), callback, 0, &errorMsg);
154    if((rc != SQLITE_OK) && (rc != 101))
155        WARNING("ServiceManagementLayer::Destructor data 'drop table' error: %s\n", errorMsg);
156
157    _data_DB->command = "vacuum";
158    rc = sqlite3_exec(_data_DB->db, _data_DB->command.c_str(), callback, 0, &errorMsg);
159    if((rc != SQLITE_OK) && (rc != 101))
160        WARNING("ServiceManagementLayer::Destructor data 'vacuum' error: %s\n", errorMsg);
161
162    free(_data_DB);
163}
164
165/* Note that sizes of CE_List, miss, and service are hardcoded for now.
166 * Also, their sizes are hardcoded into the code in various places; a fix
167 * for a future version. */
168ServiceManagementLayer::ServiceManagementLayer(const char* SML_Config, \
169    const char* serverName, const char* serverPort, int16_t clientPort)
170{
171    LOG("Creating Service Management Layer.\n");
172
173    _SML_Config = string(SML_Config);
174    SMLport = clientPort;
175
176    ConnectToShell(serverName, serverPort);
177    CE_List = new CE_Reg[10];
178
179    miss = new Mission[10];
180    for(size_t i = 0; i < 10; i++) {
181        miss[i].services = new Service[30];
182    }
183
184    Current_ID = 0;
185
186    LoadConfiguration(SML_Config, miss);
187
188    CreateServicesDB();
189    CreateDataDB();
190}
191
192/* CALLED BY: constructor
193 * INPUTS: <none>
194 * OUTPUTS: <none>
195 *
196 * DESCRIPTION: Create and initialize a DB to hold the services registered by components
197 */
198void
199ServiceManagementLayer::CreateServicesDB()
200{
201    sqlite3_stmt *ppStmt;   /* OUT: Statement handle */
202    const char *pzTail;     /* OUT: Pointer to unused portion of zSql */
203    int32_t rc;             /* sqlite command return code */
204
205    _services_DB = new services_s;
206    _services_DB->filename="Services_Table";
207    sqlite3_open(_services_DB->filename.c_str(), &(_services_DB->db));
208
209    char *cols[] = {(char *)"ID_Num", (char *)"Service_Name"};
210
211    _services_DB->tablename="Services";
212
213    /* ifprogram execution ends in anything other than a ordered shutdown, DB's will still
214     * be there for next run. Need to get rid of it so that old data isn't inadvertantly
215     * used in the next execution cycle. */
216    _services_DB->command = "DROP TABLE ifEXISTS Services;";     
217
218    rc = sqlite3_prepare_v2(_services_DB->db, _services_DB->command.c_str(), 128, &ppStmt, &pzTail);
219    if((rc != SQLITE_OK) && (rc != 101))
220        WARNING("ServiceManagementLayer::CreateServicesDB 'prepare_stmt' error %d\n", rc);
221
222    rc = sqlite3_step(ppStmt);
223    if((rc != SQLITE_OK) && (rc != 101))
224        WARNING("ServiceManagementLayer::CreateServicesDB 'step' error\n");
225
226    _services_DB->num_columns = 2;
227
228    /* Generate command */
229    _services_DB->command="CREATE TABLE ";
230    _services_DB->command.append(_services_DB->tablename);
231    _services_DB->command.append("(");
232    _services_DB->command.append(cols[0]);
233    _services_DB->command.append(" INT, ");
234    _services_DB->command.append(cols[1]);
235    _services_DB->command.append(" TEXT);");
236
237    /* Execute create table command */
238    rc = sqlite3_prepare_v2(_services_DB->db, _services_DB->command.c_str(), 128, &ppStmt, &pzTail);
239    if((rc != SQLITE_OK) && (rc != 101))
240        WARNING("ServiceManagementLayer::CreateServicesDB 'prepare_stmt' error %d\n", rc);
241
242    rc = sqlite3_step(ppStmt);
243    if((rc != SQLITE_OK) && (rc != 101))
244        WARNING("ServiceManagementLayer::CreateServicesDB 'step' error\n");
245}
246
247/* CALLED BY: constructor
248 * INPUTS: <none>
249 * OUTPUTS: <none>
250 *
251 * DESCRIPTION: Create and initialize a DB to hold the data sent by components
252 */
253void
254ServiceManagementLayer::CreateDataDB()
255{
256    sqlite3_stmt *ppStmt;   /* OUT: Statement handle */
257    const char *pzTail;     /* OUT: Pointer to unused portion of zSql */
258    int32_t rc;             /* sqlite command return code */
259
260    _data_DB = new data_s;
261
262    _data_DB->filename="Data_Table";
263    sqlite3_open(_data_DB->filename.c_str(), &(_data_DB->db));
264
265    char *cols[] = {(char *)"Tag", (char *)"Data"};
266
267    _data_DB->tablename = "Data";
268    _data_DB->command = "DROP TABLE ifEXISTS Data;";     
269
270    rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), 128, &ppStmt, &pzTail);
271    if((rc != SQLITE_OK) && (rc != 101))
272        WARNING("ServiceManagementLayer::CreateDataDB 'prepare_stmt' error %d\n", rc);
273
274    rc = sqlite3_step(ppStmt);
275    if((rc != SQLITE_OK) && (rc != 101))
276        WARNING("ServiceManagementLayer::CreateDataDB 'step' error\n");
277
278    _data_DB->num_columns = 2;
279
280    /* Generate command */
281    _data_DB->command = "CREATE TABLE ";
282    _data_DB->command.append(_data_DB->tablename);
283    _data_DB->command.append("(");
284    _data_DB->command.append(cols[0]);
285
286    /* First column is the name of the data (corresponding to the name of the output/input pair)
287     * It is the primary key so any subsequent data with the same name will replace the row. */
288    _data_DB->command.append(" TEXT PRIMARY KEY ON CONFLICT REPLACE, ");
289    _data_DB->command.append(cols[1]);
290    _data_DB->command.append(" TEXT);");
291
292    rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), 128, &ppStmt, &pzTail);
293    if((rc != SQLITE_OK) && (rc != 101))
294        WARNING("ServiceManagementLayer::CreateDataDB 'prepare_stmt' error %d\n", rc);
295
296    rc = sqlite3_step(ppStmt);
297    if((rc != SQLITE_OK) && (rc != 101))
298        WARNING("ServiceManagementLayer::CreateDataDB 'step' error\n");
299}
300
301/* CALLED BY: MessageHandler
302 * INPUTS: <none>
303 * OUTPUTS: <none>
304 *
305 * DESCRIPTION: Sends a message identifying this component as an SML to the Shell
306 */
307void
308ServiceManagementLayer::SendComponentType()
309{
310    SendMessage(shellSocketFD, "response_sml");
311    LOG("SML responded to GetRemoteComponentType query.\n");
312}
313
314/* CALLED BY: constructor
315 * INPUTS: |serverName| the IPv4 name of the server (127.0.0.1 for localhost)
316 *         |serverPort| the port on the server to connect to
317 * OUTPUTS: <none>
318 *
319 * DESCRIPTION: Connecting to the shell takes 2 steps
320 * 1) Establish a client socket for communication
321 * 2) Run the initial Registration/handshake routine
322 */
323void
324ServiceManagementLayer::ConnectToShell(const char* serverName, \
325        const char* serverPort)
326{
327    shellSocketFD = ClientSocket(serverName, serverPort);
328    RegisterComponent();
329}
330
331/* CALLED BY: StartSMLServer
332 * INPUTS: |ID| The ID number of the CE that has a message wating
333 * OUTPUTS: <none>
334 *
335 * DESCRIPTION: Called whenever a socket is identified as being ready for communication
336 *              This funciton reads the message and calls the appropriate helper
337 */
338void
339ServiceManagementLayer::MessageHandler(int32_t ID)
340{
341    char buffer[256];   
342    memset(buffer, 0, 256); 
343    int32_t _FD; 
344   
345    if(ID != -1)
346        _FD = CE_List[ID].FD;
347    else
348        _FD = shellSocketFD;
349
350    ReadMessage(_FD, buffer);
351   
352    //--------Policy Engine Stuff - no policy engine support in this version-------//
353
354    //printf("********* %s **********\n", buffer);
355    // TODO
356    // ifwe send integer op codes rather than strings, this process will be
357    // MUCH faster since instead of donig string compares we can simply
358    // switch on the integer value...
359    /*if(strcmp(buffer, "register_service") == 0) {
360        if(strcmp(buffer, "policy_geo") == 0) {
361        }
362        else if(strcmp(buffer, "policy_time") == 0) {
363        }
364        else if(strcmp(buffer, "policy_spectrum") == 0) {
365        }
366        else if(strcmp(buffer, "policy_spacial") == 0) {
367        }
368    }
369    else if(strcmp(buffer, "deregister_service") == 0) {
370        if(strcmp(buffer, "policy_geo") == 0) {
371        }
372        else if(strcmp(buffer, "policy_time") == 0) {
373        }
374        else if(strcmp(buffer, "policy_spectrum") == 0) {
375        }
376        else if(strcmp(buffer, "policy_spacial") == 0) {
377        }
378    }*/
379
380    //Go down the list to call the appropriate function
381    if(strcmp(buffer, "query_component_type") == 0) {
382        SendComponentType();
383    }
384    else if(strcmp(buffer, "reset_sml") == 0) {
385        Reset();
386    }
387    else if(strcmp(buffer, "shutdown_sml") == 0) {
388        Shutdown();
389    }
390    else if(strcmp(buffer, "register_engine_cognitive") == 0) {
391        RegisterCognitiveEngine(ID);
392    }
393    else if(strcmp(buffer, "register_service") == 0) {
394        ReceiveServices(ID);
395    }
396    else if(strcmp(buffer, "send_component_type") == 0) {
397        SendComponentType();
398    }
399    else if(strcmp(buffer, "list_services") == 0) {
400        ListServices();
401    }
402    else if(strcmp(buffer, "set_active_mission") == 0) {
403        SetActiveMission();
404    }
405    else if(strcmp(buffer, "request_optimization") == 0) {
406        PerformActiveMission();
407    }
408    else if(strcmp(buffer, "deregister_engine_cognitive") == 0) {
409        DeregisterCognitiveEngine(ID);
410    }
411    else if(strcmp(buffer, "deregister_service") == 0) {
412        DeregisterServices(ID);
413    }
414}
415
416//TODO Finish
417/* CALLED BY: MessageHandler
418 * INPUTS: <none>
419 * OUTPUTS: <none>
420 *
421 * DESCRIPTION: Deregisters the component from the Shell.
422 */
423void
424ServiceManagementLayer::Shutdown()
425{
426    DeregisterComponent();
427}
428
429//TODO Finish
430/* CALLED BY: MessageHandler
431 * INPUTS: <none>
432 * OUTPUTS: <none>
433 *
434 * DESCRIPTION: Deregisters the component from the Shell
435 */
436void
437ServiceManagementLayer::Reset()
438{
439    DeregisterComponent();
440    ReloadConfiguration();
441}
442
443/* CALLED BY: ConnectToShell
444 * INPUTS: <none>
445 * OUTPUTS: <none>
446 *
447 * DESCRIPTION: Sends the registration message to the Shell
448 */
449void
450ServiceManagementLayer::RegisterComponent()
451{
452    SendMessage(shellSocketFD, "register_sml");
453    LOG("ServiceManagementLayer:: Registration message sent.\n");
454}
455
456/* CALLED BY: Shutdown
457 * INPUTS: <none>
458 * OUTPUTS: <none>
459 *
460 * DESCRIPTION: Closes the client socket with the shell, sends a deregstration message
461 */
462void
463ServiceManagementLayer::DeregisterComponent()
464{
465    SendMessage(shellSocketFD, "deregister_sml");
466    LOG("ServiceManagementLayer:: Deregistration message sent.\n");
467
468    shutdown(shellSocketFD, 2);
469    close(shellSocketFD);
470    shellSocketFD = -1;
471    LOG("ServiceManagementLayer:: Shell socket closed.\n");
472}
473
474
475/* CALLED BY: RegisterCognitiveEngine
476 * INPUTS: |ID| The ID number of the component where the data is to be transfered to
477 * OUTPUTS: <none>
478 *
479 * DESCRIPTION: Streams config data directly from the shell to the CE, and checks
480 * for an "ack" message from the CE after every sent message
481 * to know when to stop communication.
482 *
483 * NOTE: Modified to check the incoming message buffer rather than the outgoing
484 * message buffer to avoid a portion of the delay. May change this again to handle
485 * data more inteligently, taking advantage of it's properties.
486 */
487void
488ServiceManagementLayer::TransferRadioConfiguration(int32_t ID)
489{
490    struct timeval selTimeout;
491    fd_set sockSet;
492    int32_t rc = 1;
493    char buffer[256];
494
495    /* Send data until the CE sends an ACK message back */
496    while(rc != 0) {
497        memset(buffer, 0, 256);
498
499        /* Receive data from Shell */
500        ReadMessage(shellSocketFD, buffer);
501
502        /* Send data to CE */
503        SendMessage(CE_List[ID].FD, buffer);
504        FD_ZERO(&sockSet);
505        FD_SET(shellSocketFD, &sockSet);
506        selTimeout.tv_sec = 0;
507        selTimeout.tv_usec = 5000;
508
509        /* Check ifthere is a message on the shell ready to be processed */
510        rc = select(shellSocketFD + 1, &sockSet, NULL, NULL, &selTimeout);
511    }
512
513    memset(buffer, 0, 256);
514    ReadMessage(CE_List[ID].FD, buffer);
515    SendMessage(shellSocketFD, buffer);
516}
517
518
519/* CALLED BY: RegisterCognitiveEngine
520 * INPUTS: |ID| The ID number of the component where the data is to be transfered to
521 * OUTPUTS: <none>
522 *
523 * DESCRIPTION: Simmilar to TransferRadioConfig, just with Experience data
524 *
525 * NOTE: Modified to check the incoming message buffer rather than the outgoing
526 * message buffer to avoid a portion of the delay. May change this again to handle
527 * data more inteligently, taking advantage of it's properties.
528 */
529void
530ServiceManagementLayer::TransferExperience(int32_t ID)
531{
532    struct timeval selTimeout;
533    fd_set sockSet;
534    int32_t rc = 1;
535    char buffer[256];
536    /* Send data until the CE sends an ACK message back */
537    while(rc != 0) {
538        memset(buffer, 0, 256);
539
540        /* Receive data from Shell */
541        ReadMessage(shellSocketFD, buffer);
542
543        /* Send data to CE */
544        SendMessage(CE_List[ID].FD, buffer);
545        FD_ZERO(&sockSet);
546        FD_SET(shellSocketFD, &sockSet);
547        selTimeout.tv_sec = 0;
548        selTimeout.tv_usec = 5000;
549
550        /* Check ifthere is a message on the shell ready to be processed */
551        rc = select(shellSocketFD + 1, &sockSet, NULL, NULL, &selTimeout);
552    }
553
554    memset(buffer, 0, 256);
555    ReadMessage(CE_List[ID].FD, buffer);
556    SendMessage(shellSocketFD, buffer);
557}
558
559/* CALLED BY: MessageHandler
560 * INPUTS: |ID| The ID number of the component where service is located
561 * OUTPUTS: <none>
562 *
563 * DESCRIPTION: Inserts a service into the DB with the ID of the component where it exists
564 */
565void
566ServiceManagementLayer::ReceiveServices(int32_t ID)
567{
568    char buffer[256];
569    memset(buffer, 0, 256);
570    ReadMessage(CE_List[ID].FD, buffer);
571
572    char *cols[] = {(char *) "ID_Num", (char *) "Service_Name"};
573
574    /* Generate command */
575    _services_DB->command = "insert into ";
576    _services_DB->command.append(_services_DB->tablename);
577    _services_DB->command.append(" (");
578    _services_DB->command.append(cols[0]);
579    _services_DB->command.append(", ");
580    _services_DB->command.append(cols[1]);
581    _services_DB->command.append(") ");
582    _services_DB->command.append(" values(");
583
584    char temp[3];
585    memset(temp, 0, 3);
586    sprintf(temp, "%d", ID);
587
588    _services_DB->command.append(temp);
589    _services_DB->command.append(", '");
590    _services_DB->command.append(buffer);
591    _services_DB->command.append("');");
592   
593    /* Execute add command */
594    char *errorMsg;
595    int rc = sqlite3_exec(_services_DB->db, _services_DB->command.c_str(), callback, 0, &errorMsg);
596    if((rc != SQLITE_OK) && (rc != 101))
597        WARNING("ServiceManagementLayer::RecieveServices DB Error %s\n", errorMsg);
598}
599
600/* CALLED BY: MessageHandler
601 * INPUTS: <none>
602 * OUTPUTS: <none>
603 *
604 * DESCRIPTION: This method associates the services that components provide with the
605 * services that are requested in the mission. Each service in the mission is given
606 * the ID and FD of a component that has registered to provide that service. Deregistration
607 * is okay until this method is called without a reload, but ifderegistration occurs after this
608 * method is called it needs to be called again even ifother engines also provide the services
609 */
610void
611ServiceManagementLayer::SetActiveMission()
612{
613    char buffer[256];
614    memset(buffer, 0, 256);
615    ReadMessage(shellSocketFD, buffer);
616
617    uint32_t missID = atoi(buffer);
618    for(activeMission = 0; activeMission < 10; activeMission++) {
619        /* Find the active mission by comparing mission ID's */
620        if(miss[activeMission].missionID == missID)
621            break;
622    }
623
624    LOG("ServiceManagementLayer:: Received Set Active Mission command: %i.\n", missID);
625
626    /* For each service in the mission */
627    for(size_t i = 0; i < miss[activeMission].numServices; i++) {
628        /* Check whether the current service is an actual service or a conditional */
629        if(miss[activeMission].services[i].name.compare("if") && \
630                miss[activeMission].services[i].name.compare("dowhile") && \
631                miss[activeMission].services[i].name.compare("shell")) {
632                /* ifit is a service, search the database of registered services to find
633                 * the ID of the component that registered it */
634                _services_DB->command="select ";
635                _services_DB->command.append(_services_DB->tablename);
636                _services_DB->command.append(".* from ");
637                _services_DB->command.append( _services_DB->tablename);
638                _services_DB->command.append(" where Service_Name=='");
639                _services_DB->command.append(miss[activeMission].services[i].name);
640                _services_DB->command.append("';");
641   
642                sqlite3_stmt * pStatement;
643                int32_t rc = sqlite3_prepare_v2(_services_DB->db, _services_DB->command.c_str(), \
644                        -1, &pStatement, NULL);
645                if(rc == SQLITE_OK) {
646                    if(sqlite3_step(pStatement) == SQLITE_ROW)
647                        miss[activeMission].services[i].componentID = sqlite3_column_int(pStatement, 0);
648                    else {
649                        WARNING("services_DB:: Mission requires service %s ", \
650                                miss[activeMission].services[i].name.c_str());
651                        WARNING("not provided by any connected component.\n");
652                        rc = 31337;
653                    }
654                } else {
655                    WARNING("services_DB:: Error executing SQL statement. rc = %i\n%s\n", \
656                            rc, _services_DB->command.c_str());
657                }
658
659                sqlite3_finalize(pStatement);
660                miss[activeMission].services[i].socketFD = \
661                    CE_List[miss[activeMission].services[i].componentID].FD;
662        }
663        /* TODO Nothing to be done for conditionals at this stage */
664    }
665 
666    SendMessage(shellSocketFD, "ack");
667    LOG("ServiceManagementLayer:: Done setting active mission.\n");
668}
669
670/* CALLED BY: PerformActiveMission
671 * INPUTS: |sourceID| ID of the service that is being processed
672 * OUTPUTS: <none>
673 *
674 * DESCRIPTION: This is a helper method for the "PerformActiveMission" function
675 * NOTE: This function has changed drastically from the previous implementation
676 *
677 * Takes an ID of a service. For that service, finds inputs in DB and forwords
678 * those on to the engine after sending comm-starting messages. Afterwords, listenes
679 * for the outputs so that it can store those in the database for future services or
680 * the overall output
681 */
682void
683ServiceManagementLayer::TransactData(int32_t sourceID)
684{
685    char buffer[256];
686    std::string data;
687    char *cols[] = {(char *) "Tag", (char *) "Data"};
688    char *token;
689
690   /* Send a message directly to the shell */
691   if(miss[activeMission].services[sourceID].name.find("shell") != string::npos) {
692       shellFound=true;
693
694       int32_t k = 0;
695       while((k < 10) && (!miss[activeMission].input[k].empty())) {
696           k++;
697       }
698
699       sprintf(buffer, "%d", k);
700       SendMessage(shellSocketFD, buffer);
701       for(int32_t t = 0; t < k; t++) {
702           memset(buffer, 0 , 256);
703           _data_DB->command="select ";
704           _data_DB->command.append(_data_DB->tablename);
705           _data_DB->command.append(".* from ");
706           _data_DB->command.append(_data_DB->tablename);
707           _data_DB->command.append(" where Tag=='");
708           _data_DB->command.append(miss[activeMission].input[t]);
709           _data_DB->command.append("';");
710           sqlite3_stmt * pStatement;
711
712           int32_t rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), \
713                   -1, &pStatement, NULL);
714           if(rc == SQLITE_OK) {
715               if(sqlite3_step(pStatement) == SQLITE_ROW)
716                   data=((const char*) sqlite3_column_text(pStatement, 1));
717               else {
718                   LOG("3data_DB:: Data not yet in DB., %s\n", _data_DB->command.c_str());
719                   rc = 31337;
720               }
721           }
722           else {
723               WARNING("data_DB:: Error executing SQL statement. rc = %i\n%s\n", \
724                       rc,_data_DB->command.c_str());
725           }
726
727           sqlite3_finalize(pStatement);
728           token = strtok((char *) data.c_str(), "@");
729           token = strtok(NULL, "@");
730           SendMessage(shellSocketFD, token);
731           token = strtok(NULL, "@");
732           SendMessage(shellSocketFD, token);
733       }
734
735       return;
736   }
737
738    /* ifthis is a service command and not a shell command... */
739    /* Transmission starting messages */
740    SendMessage(miss[activeMission].services[sourceID].socketFD, "request_optimization_service");
741    SendMessage(miss[activeMission].services[sourceID].socketFD, \
742            miss[activeMission].services[sourceID].name.c_str());
743}
744
745/* CALLED BY: MessageHandler
746 * INPUTS: <none>
747 * OUTPUTS: <none>
748 *
749 * DESCRIPTION: This function works by first sending the inputs from the shell to
750 * the appropriate components. The first service should begin immeadiately, as
751 * should any others who have all of their input parameters. When they complete,
752 * the output path is found and the data is transfered as it becomes available
753 * Presumably at this point the second function has all of it's parameters, so it
754 * begins to compute, and the cycle repeats.
755 *
756 * Rules for active missions (currently)
757 * -Five inputs/outputs per service and per mission
758 * -All ordering constraints have been relaxed in this version; all data is stored
759 *  locally and only sent when requested
760 * -ifand while support fully implemented - up to three levels (if's can be nested, but dowhiles cannot)
761 * -For dowhiles, assumes loop condition determined on last line
762 *
763 * -IMPORTANT: DB uses '@' to seperate individual statements; using '@' in the data
764 *  stream will result in incorrect behavior
765 */
766void
767ServiceManagementLayer::PerformActiveMission()
768{
769    shellFound = false;
770    std::string data_param;
771    std::string data_obsv;
772    std::string data;
773    std::string input;
774    std::string check;
775
776    char buffer[256];
777    char buffer1[256];
778    std::string token, token2;
779    std::string data2;
780
781    int32_t rc;
782    char *errorMsg;
783    char *cols[] = {(char *) "Tag", (char *) "Data"};
784
785    LOG("ServiceManagementLayer:: Received PerformActiveMission command.\n");
786
787    /* Get the inputs */
788    memset(buffer, 0, 256);
789    ReadMessage(shellSocketFD, buffer);
790
791    /* Receive Set of Parameters */
792    memset(buffer, 0, 256);
793    ReadMessage(shellSocketFD, buffer);
794    int32_t t = atoi(buffer);
795    for(size_t m = 0; m < t; m++) {
796        memset(buffer1, 0, 256);
797        ReadMessage(shellSocketFD, buffer1);
798        _data_DB->command="insert into ";
799        _data_DB->command.append(_data_DB->tablename);
800        _data_DB->command.append(" (");
801        _data_DB->command.append(cols[0]);
802        _data_DB->command.append(", ");
803        _data_DB->command.append(cols[1]);
804        _data_DB->command.append(") ");
805
806        memset(buffer, 0, 256);
807        ReadMessage(shellSocketFD, buffer);
808        _data_DB->command.append(" values('");
809        _data_DB->command.append(buffer1);
810        _data_DB->command.append("', '1@");
811        _data_DB->command.append(buffer1);
812        _data_DB->command.append("@");
813        _data_DB->command.append(buffer);
814        _data_DB->command.append("');");
815
816        rc = sqlite3_exec(_data_DB->db, _data_DB->command.c_str(), callback, 0, &errorMsg);
817        if((rc != SQLITE_OK) && (rc != 101))
818           WARNING("SQL error: %s\n", errorMsg);
819    }
820
821    int32_t numstatements[3] = {0 ,0 ,0};
822    for(size_t i; i < miss[activeMission].numServices; i++) {
823        if(miss[activeMission].services[i].name.compare("if") == 0) {
824            input.clear();
825            check.clear();
826
827            for(size_t t = 0; t < 10; t++) {
828                if(!miss[activeMission].services[i].output[t].empty()) {
829                    input = miss[activeMission].services[i - numstatements[0] - 1].output[t];
830
831                    _data_DB->command="SELECT ";
832                    _data_DB->command.append(_data_DB->tablename);
833                    _data_DB->command.append(".* from ");
834                    _data_DB->command.append(_data_DB->tablename);
835                    _data_DB->command.append(" where Tag=='");
836                    _data_DB->command.append(input);
837                    _data_DB->command.append("';");
838
839                    sqlite3_stmt *pStatement;
840                    rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), \
841                            -1, &pStatement, NULL);
842                    if(rc == SQLITE_OK) {
843                        if(sqlite3_step(pStatement) == SQLITE_ROW)
844                             data = (const char *) sqlite3_column_text(pStatement, 1);
845                        else {
846                            WARNING("1 data_DB:: Data not yet in DB.\n");
847                            rc=31337;
848                        }
849                    } else {
850                        WARNING("data_DB:: Error executing SQL statement. rc = %i\n%s\n", \
851                                rc,_data_DB->command.c_str());
852                    }
853
854                    sqlite3_finalize(pStatement);
855                    int32_t pos = data.find_last_of("@", data.length() - 2);
856                    token = data.substr(pos + 1);
857                    token.erase(token.length() - 1);
858                    data.clear();
859                    break;
860                }
861            }
862
863            bool doit = false;
864            if(miss[activeMission].services[i].output[t].find(">") != string::npos) {
865                std::string data2;
866                _data_DB->command="SELECT ";
867                _data_DB->command.append(_data_DB->tablename);
868                _data_DB->command.append(".* from ");
869                _data_DB->command.append(_data_DB->tablename);
870                _data_DB->command.append(" where Tag=='");
871                _data_DB->command.append(miss[activeMission].services[i].output[t].erase(0, 1));
872                _data_DB->command.append("';");
873                sqlite3_stmt *pStatement;
874
875                rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), \
876                        -1, &pStatement, NULL);
877                if(rc == SQLITE_OK) {
878                    if(sqlite3_step(pStatement) == SQLITE_ROW)
879                         data2 = (const char *) sqlite3_column_text(pStatement, 1);
880                    else {
881                        WARNING("2 data_DB:: Data not yet in DB.\n");
882                        rc = 31337;
883                    }
884                } else {
885                    WARNING("data_DB:: Error executing SQL statement. rc = %i\n%s\n", \
886                            rc, _data_DB->command.c_str());
887                }
888
889                sqlite3_finalize(pStatement);
890
891                int32_t pos = data2.find_last_of("@", data2.length() - 2);
892                token2 = data2.substr(pos + 1);
893                token2.erase(token2.length() - 1);
894                if(atof(token.c_str()) > atof(token2.c_str()))
895                    doit = true;
896            }
897            else if(miss[activeMission].services[i].output[t].find(token) != string::npos)
898                doit = true;
899
900            if(doit) {
901                for(size_t k = i + 1; k <= i+miss[activeMission].services[i].num_conds; k++) {
902                    if(miss[activeMission].services[k].name.compare("if") == 0) {
903                        input.clear();
904                        check.clear();
905                        for(size_t t = 0; t < 10; t++) {
906                            if(!miss[activeMission].services[k].output[t].empty()) {
907                                input = miss[activeMission].services[k - numstatements[1] - 1].output[t];
908                                _data_DB->command="SELECT ";
909                                _data_DB->command.append(_data_DB->tablename);
910                                _data_DB->command.append(".* from ");
911                                _data_DB->command.append(_data_DB->tablename);
912                                _data_DB->command.append(" where Tag=='");
913                                _data_DB->command.append(input);
914                                _data_DB->command.append("';");
915
916                                sqlite3_stmt *pStatement;
917                                rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), \
918                                        -1, &pStatement, NULL);
919                                if(rc == SQLITE_OK) {
920                                    if(sqlite3_step(pStatement) == SQLITE_ROW)
921                                         data = (const char *) sqlite3_column_text(pStatement, 1);
922                                    else {
923                                        WARNING("3 data_DB:: Data not yet in DB.\n");
924                                        rc = 31337;
925                                    }
926                                } else {
927                                    WARNING("data_DB:: Error executing SQL statement. rc = %i\n%s\n", \
928                                            rc,_data_DB->command.c_str());
929                                }
930
931                                sqlite3_finalize(pStatement);
932                                int32_t pos = data.find_last_of("@", data.length() - 2);
933                                token = data.substr(pos + 1);
934                                token.erase(token.length() - 1);
935                                break;
936                            }
937                        }
938
939                        bool doit = false;
940                        if(miss[activeMission].services[k].output[t].find(">") != string::npos) {
941                            std::string data2;
942                            _data_DB->command="SELECT ";
943                            _data_DB->command.append(_data_DB->tablename);
944                            _data_DB->command.append(".* from ");
945                            _data_DB->command.append(_data_DB->tablename);
946                            _data_DB->command.append(" where Tag=='");
947                            _data_DB->command.append(miss[activeMission].services[k].output[t].erase(0, 1));
948                            _data_DB->command.append("';");
949
950                            sqlite3_stmt *pStatement;
951                            rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), \
952                                    -1, &pStatement, NULL);
953                            if(rc == SQLITE_OK) {
954                                if(sqlite3_step(pStatement) == SQLITE_ROW)
955                                     data2 = (const char *) sqlite3_column_text(pStatement, 1);
956                                else {
957                                    WARNING("4 data_DB:: Data not yet in DB.\n");
958                                    rc = 31337;
959                                }
960                            } else {
961                                WARNING("data_DB:: Error executing SQL statement. rc = %i\n%s\n", \
962                                        rc, _data_DB->command.c_str());
963                            }
964
965                            sqlite3_finalize(pStatement);
966                            int32_t pos = data2.find_last_of("@", data2.length() - 2);
967                            token2 = data2.substr(pos + 1);
968                            token2.erase(token2.length() - 1);
969                            if(atof(token.c_str()) > atof(token2.c_str()))
970                                doit = true;
971                        }
972                        else if(miss[activeMission].services[k].output[t].find(token) != string::npos)
973                            doit=true;
974
975                        if(doit) {
976                            for(size_t j = k + 1; j <= k+miss[activeMission].services[k].num_conds; j++) {
977                                if(miss[activeMission].services[j].name.compare("if") == 0) {
978                                    input.clear();
979                                    check.clear();
980                                    for(t = 0; t < 10; t++) {
981                                        if(!miss[activeMission].services[j].output[t].empty()) {
982                                            input = miss[activeMission].services[j - numstatements[2] - 1].output[t];
983
984                                            _data_DB->command="SELECT ";
985                                            _data_DB->command.append(_data_DB->tablename);
986                                            _data_DB->command.append(".* from ");
987                                            _data_DB->command.append(_data_DB->tablename);
988                                            _data_DB->command.append(" where Tag=='");
989                                            _data_DB->command.append(input);
990                                            _data_DB->command.append("';");
991
992                                            sqlite3_stmt *pStatement;
993
994                                            rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), -1, &pStatement, NULL);
995                                            if(rc == SQLITE_OK) {
996                                                if(sqlite3_step(pStatement) == SQLITE_ROW)
997                                                     data = (const char *) sqlite3_column_text(pStatement, 1);
998                                                else {
999                                                    WARNING("5 data_DB:: Data not yet in DB.\n");
1000                                                    rc = 31337;
1001                                                }
1002                                            } else {
1003                                                WARNING("data_DB:: Error executing SQL statement. rc = %i\n%s\n", \
1004                                                        rc, _data_DB->command.c_str());
1005                                            }
1006
1007                                            sqlite3_finalize(pStatement);
1008                                            int32_t pos = data.find_last_of("@", data.length()-2);
1009                                            token = data.substr(pos+1);
1010                                            token.erase(token.length()-1);
1011                                            data.clear();
1012                                            break;
1013                                        }
1014                                    }
1015
1016                                    bool doit = false;
1017                                    if(miss[activeMission].services[j].output[t].find(">") != string::npos) {
1018                                        _data_DB->command="SELECT ";
1019                                        _data_DB->command.append(_data_DB->tablename);
1020                                        _data_DB->command.append(".* from ");
1021                                        _data_DB->command.append(_data_DB->tablename);
1022                                        _data_DB->command.append(" where Tag=='");
1023                                        _data_DB->command.append(miss[activeMission].services[j].output[t].erase(0, 1));
1024                                        _data_DB->command.append("';");
1025
1026                                        sqlite3_stmt *pStatement;
1027                                        rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), -1, &pStatement, NULL);
1028                                        if(rc == SQLITE_OK) {
1029                                            if(sqlite3_step(pStatement) == SQLITE_ROW)
1030                                                 data2 = (const char *) sqlite3_column_text(pStatement, 1);
1031                                            else {
1032                                                WARNING("6 data_DB:: Data not yet in DB.\n");
1033                                                rc=31337;
1034                                            }
1035                                        } else {
1036                                            WARNING("data_DB:: Error executing SQL statement. rc = %i\n%s\n", \
1037                                                    rc, _data_DB->command.c_str());
1038                                        }
1039
1040                                        sqlite3_finalize(pStatement);
1041
1042                                        int32_t pos = data2.find_last_of("@", data2.length()-2);
1043                                        token2 = data2.substr(pos + 1);
1044                                        token2.erase(token2.length()-1);
1045
1046                                        if(atof(token.c_str()) > atof(token2.c_str()))
1047                                            doit=true;
1048
1049                                        data.clear();
1050                                    }
1051
1052                                    else if(miss[activeMission].services[j].output[t].find(token) != string::npos)
1053                                        doit = true;
1054
1055                                    if(doit) {
1056                                        for(size_t l = j + 1; l <= j + miss[activeMission].services[j].num_conds; l++) {
1057                                            TransactData(l);
1058                                        }
1059                                    }
1060
1061                                    numstatements[2] += miss[activeMission].services[j].num_conds + 1;
1062                                    j += miss[activeMission].services[j].num_conds;
1063                                } else if(miss[activeMission].services[j].name.compare("dowhile") == 0) {
1064                                    numstatements[0]=0;
1065
1066                                    while(true) {
1067                                        uint32_t l;
1068                                        for(l = j + 1; l <= j+miss[activeMission].services[j].num_conds; l++) {
1069                                            TransactData(l);
1070                                        }
1071
1072                                        data.clear();
1073                                        input.clear();
1074                                        check.clear();
1075
1076                                        int32_t t;
1077                                        for(t = 0; t < 10; t++) {
1078                                            if(!miss[activeMission].services[j].output[t].empty()){
1079                                                input=miss[activeMission].services[l-2].output[t];
1080                                                _data_DB->command="SELECT ";
1081                                                _data_DB->command.append(_data_DB->tablename);
1082                                                _data_DB->command.append(".* from ");
1083                                                _data_DB->command.append(_data_DB->tablename);
1084                                                _data_DB->command.append(" where Tag=='");
1085                                                _data_DB->command.append(input);
1086                                                _data_DB->command.append("';");
1087                                                sqlite3_stmt *pStatement;
1088
1089                                                rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), -1, &pStatement, NULL);
1090                                                if(rc == SQLITE_OK){
1091                                                    if(sqlite3_step(pStatement) == SQLITE_ROW)
1092                                                         data = (const char *) sqlite3_column_text(pStatement, 1);
1093                                                    else {
1094                                                        WARNING("7 data_DB:: Data not yet in DB.: %s\n", _data_DB->command.c_str());
1095                                                        rc = 31337;
1096                                                    }
1097                                                } else {
1098                                                    WARNING("data_DB:: SQL statement error. rc = %i\n%s\n", \
1099                                                            rc, _data_DB->command.c_str());
1100                                                }
1101
1102                                                sqlite3_finalize(pStatement);
1103                                                int32_t pos = data.find_last_of("@", data.length() - 2);
1104                                                token = data.substr(pos + 1);
1105                                                token.erase(token.length() - 1);
1106                                                break;
1107                                            }
1108                                        }
1109
1110                                        if(miss[activeMission].services[j].output[t].find(token) == string::npos) {
1111                                            break;
1112                                        }
1113                                    }
1114
1115                                    j += miss[activeMission].services[j].num_conds;
1116                                } else {
1117                                    numstatements[2] = 0;
1118                                    TransactData(j);
1119                                }
1120                            }
1121                        }
1122
1123                        numstatements[1] += miss[activeMission].services[k].num_conds + 1;
1124                        k += miss[activeMission].services[k].num_conds;
1125                    } else if(miss[activeMission].services[k].name.compare("dowhile") == 0) {
1126                        numstatements[0] = 0;
1127                        while(true) {
1128                            int32_t j;
1129                            for(j = k + 1; j <= k + miss[activeMission].services[k].num_conds; j++) {
1130                                TransactData(j);
1131                            }
1132
1133                            data.clear();
1134                            input.clear();
1135                            check.clear();
1136                            int32_t t;
1137                            for(t = 0; t < 10; t++) {
1138                                if(!miss[activeMission].services[k].output[t].empty()) {
1139                                    input = miss[activeMission].services[j - 1].output[t];
1140                                    _data_DB->command="SELECT ";
1141                                    _data_DB->command.append(_data_DB->tablename);
1142                                    _data_DB->command.append(".* from ");
1143                                    _data_DB->command.append(_data_DB->tablename);
1144                                    _data_DB->command.append(" where Tag=='");
1145                                    _data_DB->command.append(input);
1146                                    _data_DB->command.append("';");
1147
1148                                    sqlite3_stmt *pStatement;
1149                                    rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), -1, &pStatement, NULL);
1150                                    if(rc == SQLITE_OK) {
1151                                        if(sqlite3_step(pStatement) == SQLITE_ROW)
1152                                             data = (const char *) sqlite3_column_text(pStatement, 1);
1153                                        else {
1154                                            WARNING("8 data_DB:: Data not yet in DB.\n");
1155                                            rc = 31337;
1156                                        }
1157                                    } else {
1158                                        WARNING("data_DB:: Error executing SQL statement. rc = %i\n%s\n", \
1159                                                rc, _data_DB->command.c_str());
1160                                    }
1161
1162                                    sqlite3_finalize(pStatement);
1163                                    int32_t pos = data.find_last_of("@", data.length() - 2);
1164                                    token = data.substr(pos + 1);
1165                                    token.erase(token.length() - 1);
1166                                    break;
1167                                }
1168                            }
1169
1170                            if(miss[activeMission].services[k].output[t].find(token) == string::npos) {
1171                                break;
1172                            }
1173                        }
1174
1175                        k += miss[activeMission].services[k].num_conds;
1176                    } else{
1177                        numstatements[1] = 0;
1178                        TransactData(k);
1179                    }
1180                }
1181            }
1182
1183            numstatements[0] += miss[activeMission].services[i].num_conds + 1;
1184            i += miss[activeMission].services[i].num_conds;
1185        } else if(miss[activeMission].services[i].name.compare("dowhile") == 0) {
1186            numstatements[0] = 0;
1187            while(true) {
1188                uint32_t k;
1189                for(k = i + 1; k <= i + miss[activeMission].services[i].num_conds; k++){
1190                    TransactData(k);
1191                }
1192
1193                data.clear();
1194                input.clear();
1195                check.clear();
1196                int32_t t;
1197                for(t = 0; t < 10; t++){
1198                    if(!miss[activeMission].services[i].output[t].empty()) {
1199                        input = miss[activeMission].services[k - 1].output[t];
1200                        _data_DB->command="SELECT ";
1201                        _data_DB->command.append(_data_DB->tablename);
1202                        _data_DB->command.append(".* from ");
1203                        _data_DB->command.append(_data_DB->tablename);
1204                        _data_DB->command.append(" where Tag=='");
1205                        _data_DB->command.append(input);
1206                        _data_DB->command.append("';");
1207
1208                        sqlite3_stmt *pStatement;
1209                        rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), -1, &pStatement, NULL);
1210                        if(rc == SQLITE_OK) {
1211                            if(sqlite3_step(pStatement) == SQLITE_ROW)
1212                                 data = (const char *) sqlite3_column_text(pStatement, 1);
1213                            else {
1214                                WARNING("10data_DB:: Data not yet in DB.\n");
1215                                rc = 31337;
1216                            }
1217                        } else {
1218                            WARNING("data_DB:: Error executing SQL statement. rc = %i\n%s\n", \
1219                                    rc, _data_DB->command.c_str());
1220                        }
1221
1222                        sqlite3_finalize(pStatement);
1223                        int32_t pos = data.find_last_of("@", data.length()-2);
1224                        token = data.substr(pos + 1);
1225                        token.erase(token.length() - 1);
1226                        break;
1227                    }
1228                }
1229
1230                if(miss[activeMission].services[i].output[t].find(token) == string::npos) {
1231                    break;
1232                }
1233            }
1234
1235            i += miss[activeMission].services[i].num_conds;
1236        } else{
1237            numstatements[0] = 0;
1238            TransactData(i);
1239        }
1240    }
1241
1242    int32_t i = 0;
1243    data.clear();
1244
1245    if(!shellFound) {
1246        int k = 0;
1247        while(k < 10 && !miss[activeMission].input[k].empty()) {
1248            k++;
1249        }
1250
1251        sprintf(buffer, "%d", k);
1252        SendMessage(shellSocketFD, buffer);
1253
1254        for(size_t t = 0; t < k; t++) {
1255            SendMessage(shellSocketFD, miss[activeMission].input[t].c_str());
1256            SendMessage(shellSocketFD, "0");
1257        }
1258    }
1259
1260    LOG("ServiceManagementLayer:: Done performing active mission.\n");
1261}
1262
1263
1264/* CALLED BY: MessageHandler
1265 * INPUTS: <none>
1266 * OUTPUTS: <none>
1267 *
1268 * DESCRIPTION: Print a list of the services currently registered and the ID's of the components that registered them
1269 */
1270void
1271ServiceManagementLayer::ListServices()
1272{
1273    _services_DB->command="select ";
1274    _services_DB->command.append(_services_DB->tablename);
1275    _services_DB->command.append(".* from ");
1276    _services_DB->command.append(_services_DB->tablename);
1277    _services_DB->command.append(";");
1278
1279    // Execute print (select all) command
1280    char *errorMsg;
1281    int32_t rc = sqlite3_exec(_services_DB->db, _services_DB->command.c_str(), callback, 0, &errorMsg);
1282    if((rc != SQLITE_OK) && (rc != 101))
1283        WARNING("SQL error: %s\n", errorMsg);
1284
1285    LOG("database %s, table %s:\n", _services_DB->filename.c_str(), _services_DB->tablename.c_str());
1286}
1287
1288/* CALLED BY: Reset
1289 * INPUTS: <none>
1290 * OUTPUTS: <none>
1291 *
1292 * DESCRIPTION: Clear and reinitialize the mission array, then reload the configuration file
1293 */
1294void
1295ServiceManagementLayer::ReloadConfiguration()
1296{
1297    LOG("ServiceManagementLayer:: Reloading Configuration.\n");
1298
1299    free(miss);
1300
1301    miss = new Mission[10];
1302    for(size_t i = 0; i < 10; i++)
1303        miss[i].services = new Service[30];
1304
1305    LoadConfiguration(_SML_Config.c_str(), miss);
1306}
1307
1308/* CALLED BY: constructor
1309 * INPUTS: |SML_Config| Address (either relitive or full) of the XML file containing mission data
1310 *        |mList| Mission array to be modified
1311 * OUTPUTS: <none>
1312 *
1313 * DESCRIPTION: IMPORTANT - See formatting instructions for correct parsing of data
1314 * Can currently handle 10 inputs and 10 outputs per service, but easily expandable
1315 * Also, can handle two layer of nested conditional statements, but could
1316 * be expanded to meet additional needs.
1317 *
1318 * Components assigned to mission during "set active mission" stage so that
1319 * components can still continue to register after the configuration is loaded
1320 */
1321void
1322ServiceManagementLayer::LoadConfiguration(const char *SML_Config, Mission* &mList)
1323{
1324    TiXmlElement *pMission;
1325    TiXmlElement *pService;
1326    TiXmlElement *pChild0, *pChild1, *pChild2, *pChild3, *pChild4;
1327    TiXmlHandle hRoot(0);
1328
1329    LOG("ServiceManagementLayer:: Loading Configuration.\n");
1330
1331    TiXmlDocument doc(".");
1332    doc.LoadFile(SML_Config);
1333    bool loadOkay = doc.LoadFile();
1334    if(!loadOkay)
1335        WARNING("Loading SML configuration failed: %s\n", SML_Config);
1336
1337    TiXmlHandle hDoc(&doc);
1338   
1339    pMission = hDoc.FirstChildElement().Element();
1340
1341    if(!pMission)
1342        WARNING("No valid root!");
1343
1344    hRoot = TiXmlHandle(pMission);
1345    pService = pMission->FirstChildElement();
1346
1347    int32_t mission_num = 0;
1348
1349    /* Iterate through the missions */
1350    for(pChild0 = pMission->FirstChildElement(); pChild0 ; pChild0 = pChild0->NextSiblingElement())  {
1351        int32_t service_num = 0;
1352        uint16_t cond_array[] = {0, 0, 0};
1353   
1354        for(pChild1  = (pChild0->FirstChildElement())->FirstChildElement(); pChild1; \
1355                pChild1  = pChild1->NextSiblingElement()) {
1356
1357            int32_t conditional_0 = service_num;
1358            for(pChild2 = pChild1->FirstChildElement(); pChild2; pChild2 = pChild2->NextSiblingElement()) {
1359                service_num++;
1360
1361                int32_t conditional_1 = service_num;
1362                for(pChild3 = pChild2->FirstChildElement(); pChild3; pChild3 = pChild3->NextSiblingElement()) {
1363                    service_num++;
1364                    int32_t conditional_2 = service_num;
1365                    for(pChild4 = pChild3->FirstChildElement(); pChild4; pChild4 = pChild4->NextSiblingElement()) {
1366                        service_num++;
1367                        if(pChild4->Attribute("name"))
1368                            mList[mission_num].services[service_num].name = pChild4->Attribute("name");
1369                        else
1370                            mList[mission_num].services[service_num].name = pChild4->Value();
1371
1372                        for(size_t i = 1; i <= 10; i++) {
1373                            char buffer[9]="input";
1374                            sprintf(buffer, "%s%d", buffer, i);
1375                            if(pChild4->Attribute(buffer))
1376                                mList[mission_num].services[service_num].input[i - 1] = pChild4->Attribute(buffer);
1377
1378                            char buffer2[9]="output";
1379                            sprintf(buffer2, "%s%d", buffer2, i);
1380                            if(pChild4->Attribute(buffer2))
1381                                mList[mission_num].services[service_num].output[i - 1] = pChild4->Attribute(buffer2);
1382                        }
1383
1384                        if(pChild4->Attribute("parameter"))
1385                            mList[mission_num].services[service_num].parameter = pChild4->Attribute("parameter");
1386
1387                        cond_array[2]++;
1388                    }
1389
1390                    if(!strcmp(pChild3->Value(), "shell") || conditional_2 != service_num) {
1391                        mList[mission_num].services[conditional_2].name = pChild3->Value();
1392                    } else {
1393                        mList[mission_num].services[service_num].name = pChild3->Attribute("name");
1394                    }
1395
1396                    for(size_t i = 1; i <= 10; i++) {
1397                        char buffer[9]="input";
1398                        sprintf(buffer, "%s%d", buffer, i);
1399                        if(pChild3->Attribute(buffer))
1400                            mList[mission_num].services[conditional_2].input[i - 1] = pChild3->Attribute(buffer);
1401
1402                        char buffer2[9]="output";
1403                        sprintf(buffer2, "%s%d", buffer2, i);
1404                        if(pChild3->Attribute(buffer2))
1405                            mList[mission_num].services[conditional_2].output[i - 1] = pChild3->Attribute(buffer2);
1406                    }
1407
1408                    if(pChild3->Attribute("parameter"))
1409                        mList[mission_num].services[conditional_2].parameter = pChild3->Attribute("parameter");
1410
1411                    mList[mission_num].services[conditional_2].num_conds = cond_array[2];
1412                    cond_array[1] += cond_array[2] + 1;
1413                    cond_array[2] = 0;
1414                }
1415
1416                if(!strcmp(pChild2->Value(), "shell") || (conditional_1 != service_num)) {
1417                    mList[mission_num].services[conditional_1].name = pChild2->Value();
1418                } else{
1419                    mList[mission_num].services[service_num].name = pChild2->Attribute("name");
1420                }
1421
1422                for(int i = 1; i <= 10; i++) {
1423                    char buffer[9]="input";
1424                    sprintf(buffer, "%s%d", buffer, i);
1425                    if(pChild2->Attribute(buffer))
1426                        mList[mission_num].services[conditional_1].input[i - 1] = pChild2->Attribute(buffer);
1427
1428                    char buffer2[9]="output";
1429                    sprintf(buffer2, "%s%d", buffer2, i);
1430                    if(pChild2->Attribute(buffer2))
1431                        mList[mission_num].services[conditional_1].output[i - 1] = pChild2->Attribute(buffer2);
1432                }
1433
1434                if(pChild2->Attribute("parameter"))
1435                    mList[mission_num].services[conditional_1].parameter = pChild2->Attribute("parameter");
1436
1437                mList[mission_num].services[conditional_1].num_conds = cond_array[1];
1438                cond_array[0] += cond_array[1] + 1;
1439                cond_array[1] = 0;
1440            }
1441       
1442            if(!strcmp(pChild1->Value(), "shell") || conditional_0 != service_num) {
1443                mList[mission_num].services[conditional_0].name = pChild1->Value();
1444            } else{
1445                mList[mission_num].services[conditional_0].name = pChild1->Attribute("name");
1446            }
1447
1448            for(size_t i = 1; i <= 10; i++) {
1449                char buffer[9]="input";
1450                sprintf(buffer, "%s%d", buffer, i);
1451                if(pChild1->Attribute(buffer))
1452                    mList[mission_num].services[conditional_0].input[i-1] = pChild1->Attribute(buffer);
1453
1454                char buffer2[9]="output";
1455                sprintf(buffer2, "%s%d", buffer2, i);
1456                if(pChild1->Attribute(buffer2))
1457                    mList[mission_num].services[conditional_0].output[i-1] = pChild1->Attribute(buffer2);
1458            }
1459
1460            if(pChild1->Attribute("parameter"))
1461                mList[mission_num].services[conditional_0].parameter = pChild1->Attribute("parameter");
1462            mList[mission_num].services[conditional_0].num_conds = cond_array[0];
1463                cond_array[0] = 0;
1464
1465            service_num++;
1466        }
1467   
1468        mList[mission_num].numServices = service_num;
1469        mList[mission_num].name = pChild0->Attribute("name");
1470        mList[mission_num].missionID = atoi(pChild0->Attribute("id"));
1471
1472        for(size_t i = 1; i <= 10; i++) {
1473            char buffer[9]="param";
1474            sprintf(buffer, "%s%d", buffer, i);
1475            if(pChild0->Attribute(buffer)){
1476                mList[mission_num].input[i-1] = pChild0->Attribute(buffer);
1477            }
1478        }
1479
1480        mission_num++;
1481    }
1482
1483    LOG("ServiceManagementLayer:: Done Loading Configuration\n");
1484}
1485
1486/* CALLED BY: MessageHandler
1487 * INPUTS: |ID| The ID number of the engine to be registered
1488 * OUTPUTS: <none>
1489 *
1490 * DESCRIPTION: Sends a registration message onto the shell and sends the ACK back to the component
1491 */
1492void
1493ServiceManagementLayer::RegisterCognitiveEngine(int32_t ID)
1494{
1495    SendMessage(shellSocketFD, "register_engine_cognitive");
1496
1497    LOG("ServiceManagementLayer:: CE registration message forwarded to shell.\n");
1498    char buffer[256];
1499    memset(buffer, 0, 256);
1500    ReadMessage(shellSocketFD, buffer);
1501    SendMessage(CE_List[ID].FD, buffer);
1502
1503    TransferRadioConfiguration(ID);
1504    memset(buffer, 0, 256);
1505    TransferExperience(ID);
1506    memset(buffer, 0, 256);
1507    numberOfCognitiveEngines++;
1508    CE_Present = true;
1509}
1510
1511/* CALLED BY: MessageHandler
1512 * INPUTS: |ID| The ID number of the engine to have it's services deregistered
1513 * OUTPUTS: <none>
1514 *
1515 * DESCRIPTION: Deletes individual services from the DB
1516 * NOTE THAT this function only needs to be called ifservice deregistration is going
1517 * to be done at a different time than component deregistration; it is handled
1518 * more efficiently and directly during that deregistration process.
1519 */
1520void
1521ServiceManagementLayer::DeregisterServices(int32_t ID)
1522{
1523    char buffer[256];
1524    memset(buffer, 0, 256);
1525    ReadMessage(CE_List[ID].FD, buffer);
1526    _services_DB->command="DELETE FROM ";
1527    _services_DB->command.append(_services_DB->tablename);
1528    _services_DB->command.append(" WHERE ID_Num IN (SELECT ");
1529
1530    char tmp[3];
1531    memset(tmp,0,3);
1532    sprintf(tmp, "%d", ID);
1533    _services_DB->command.append(tmp);
1534    _services_DB->command.append(" FROM ");
1535    _services_DB->command.append(_services_DB->tablename);
1536    _services_DB->command.append(" WHERE Service_Name");
1537    _services_DB->command.append("=='");
1538    _services_DB->command.append(buffer);
1539    _services_DB->command.append("');");
1540
1541    char *errorMsg;
1542    int32_t rc = sqlite3_exec(_services_DB->db, _services_DB->command.c_str(), callback, 0, &errorMsg);
1543    if((rc != SQLITE_OK) && (rc != 101))
1544        WARNING("SQL error: %s\n", errorMsg);
1545}
1546
1547/* CALLED BY: MessageHandler
1548 * INPUTS: |ID| The ID number of the engine to have it's services deregistered
1549 * OUTPUTS: <none>
1550 *
1551 * DESCRIPTION: Deletes the contact info for the cognitive engine, forwards a deregistration message to the shell
1552 * Also, deletes the services from the DB
1553 */
1554void
1555ServiceManagementLayer::DeregisterCognitiveEngine(int32_t ID)
1556{
1557    LOG("ServiceManagementLayer:: CE deregistration message forwarded to shell.\n");
1558
1559    numberOfCognitiveEngines--;
1560    if(numberOfCognitiveEngines == 0)
1561        CE_Present = false;
1562
1563    SendMessage(shellSocketFD, "deregister_engine_cognitive");
1564    char buffer[256];
1565    memset(buffer, 0, 256);
1566    ReadMessage(shellSocketFD, buffer);
1567    SendMessage(CE_List[ID].FD, buffer);
1568    if(strcmp("deregister_ack", buffer) != 0) {
1569        ERROR(1, "SML:: Failed to close CE socket\n");
1570    }
1571
1572    //Deregister the services
1573    _services_DB->command="DELETE FROM ";
1574    _services_DB->command.append(_services_DB->tablename);
1575    _services_DB->command.append(" WHERE ");
1576    _services_DB->command.append("ID_Num");
1577    _services_DB->command.append("==");
1578
1579    char tmp[3];
1580    memset(tmp,0,3);
1581    sprintf(tmp, "%d", ID);
1582    _services_DB->command.append(tmp);
1583    _services_DB->command.append(";");
1584
1585    char *errorMsg;
1586    int32_t rc = sqlite3_exec(_services_DB->db, _services_DB->command.c_str(), callback, 0, &errorMsg);
1587    if((rc != SQLITE_OK) && (rc != 101))
1588        WARNING("SQL error: %s\n", errorMsg);
1589
1590    CE_List[ID].FD = -1;
1591    CE_List[ID].ID_num = -1;
1592
1593    LOG("Cognitive Radio Shell:: CE Socket closed for engine #%d.\n", ID);
1594}
1595
1596
1597/* CALLED BY: test class
1598 * INPUTS: <none>
1599 * OUTPUTS: <none>
1600 *
1601 * DESCRIPTION: Sets up a server socket and listens for communication on either that or the shell socket
1602 */
1603void
1604ServiceManagementLayer::StartSMLServer()
1605{
1606    struct timeval selTimeout;
1607    int32_t running = 1;
1608    int32_t port, rc, new_sd = 1;
1609    int32_t desc_ready = 1;
1610    fd_set sockSet, shellSet;
1611
1612    cogEngSrv = CreateTCPServerSocket(SMLport);
1613    int32_t maxDescriptor = cogEngSrv;
1614
1615    if(InitializeTCPServerPort(cogEngSrv) == -1)
1616        ERROR(1,"Error initializing primary port\n");
1617
1618    while (running) {
1619        /* Zero socket descriptor vector and set for server sockets */
1620        /* This must be reset every time select() is called */
1621        FD_ZERO(&sockSet);
1622        FD_SET(cogEngSrv, &sockSet);
1623       
1624        for(uint16_t k = 0; k < Current_ID; k++){
1625            if(CE_List[k].ID_num != -1)
1626                FD_SET(CE_List[k].FD, &sockSet);
1627        }
1628
1629        /* Timeout specification */
1630        /* This must be reset every time select() is called */
1631        selTimeout.tv_sec = 0;      /* timeout (secs.) */
1632        selTimeout.tv_usec = 0;     /* 0 microseconds */
1633
1634        /* Changed both to zero so that select will check messages from the shell
1635         * instead of blocking when there is no command from the CE's to be processed */
1636
1637        /* Check ifthere is a message on the socket waiting to be read */
1638        rc = select(maxDescriptor + 1, &sockSet, NULL, NULL, &selTimeout);
1639        if(rc == 0) {
1640            FD_ZERO(&shellSet);
1641            FD_SET(shellSocketFD, &shellSet);
1642            selTimeout.tv_sec = 0;
1643            selTimeout.tv_usec = 0;
1644            /* Check ifthere is a message on the shell socket ready to be processed */
1645            select(shellSocketFD + 1, &shellSet, NULL, NULL, &selTimeout);
1646            if(FD_ISSET(shellSocketFD, &shellSet)){
1647                MessageHandler(-1);}
1648        } else {
1649            desc_ready = rc;
1650            for(port = 0; port <= maxDescriptor && desc_ready > 0; port++) {
1651                if(FD_ISSET(port, &sockSet)) {
1652                    desc_ready -= 1;
1653
1654                    /* Check ifrequest is new or on an existing open descriptor */
1655                    if(port == cogEngSrv) {
1656                        /* If new, assign it a descriptor and give it an ID */
1657                        new_sd = AcceptTCPConnection(port);
1658             
1659                        if(new_sd < 0)
1660                            break;
1661
1662                        CE_List[Current_ID].FD = new_sd;
1663                        CE_List[Current_ID].ID_num = Current_ID;
1664                        MessageHandler(Current_ID);
1665                        Current_ID++;
1666   
1667                        FD_SET(new_sd,&sockSet);
1668                        if(new_sd > maxDescriptor)
1669                           maxDescriptor = new_sd;
1670                    } else {
1671                        /* If old, figure out which ID it coresponds to and handle it accordingly */
1672                        for(size_t z = 0; z < Current_ID; z++) {
1673                            if(CE_List[z].FD == port) {
1674                                MessageHandler(z);
1675                            }
1676                        }
1677                    }
1678                }
1679            }
1680        }
1681    }       
1682
1683    /* Close sockets */
1684    close(cogEngSrv);
1685
1686    return;
1687}
1688
Note: See TracBrowser for help on using the browser.