root/vtcross/trunk/src/service_management_layer/ServiceManagementLayer.cpp @ 448

Revision 448, 66.0 KB (checked in by bhilburn, 15 years ago)

Commenting out the printDatabase function since it isn't complete, and
just generates a ton of warnings currently.

Line 
1/*
2 Copyright 2009 Virginia Polytechnic Institute and State University 
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7 
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17/* Inter-component communication handled by sockets and FD's. 
18 * Server support has been completely implemented and tested.
19 *
20 * Services are stored in a SQLite DB by the ID of the CE that registered them.  Service
21 * support has been completely implemented and tested.
22 *
23 * Missions are loaded from an XML file, connected with services provided by components,
24 * and run.  See the documentation for the "PerformActiveMission" below for important
25 * info.
26 */
27
28
29#include <cmath>
30#include <cstdio>
31#include <cstdlib>
32#include <cstring>
33#include <stdint.h>
34
35#include <arpa/inet.h>
36#include <iostream>
37#include <netinet/in.h>
38#include <netdb.h>
39#include <fcntl.h>
40#include <sqlite3.h>
41#include <string>
42#include <sys/ioctl.h>
43#include <sys/mman.h>
44#include <sys/socket.h>
45#include <sys/types.h>
46#include <sys/wait.h>
47
48#include "tinyxml/tinyxml.h"
49#include "tinyxml/tinystr.h"
50
51#include "vtcross/debug.h"
52#include "vtcross/error.h"
53#include "vtcross/common.h"
54#include "vtcross/components.h"
55#include "vtcross/containers.h"
56#include "vtcross/socketcomm.h"
57
58
59typedef struct services_s *services_DB;
60typedef struct data_s *data_DB;
61
62using namespace std;
63
64struct services_s {
65    string filename;
66    string tablename;
67    string command;
68    sqlite3 *db;
69    unsigned int num_columns;
70};
71
72struct data_s {
73    string filename;
74    string tablename;
75    string command;
76    sqlite3 *db;
77    unsigned int num_columns;
78};
79
80services_DB _services_DB;
81data_DB _data_DB;
82string _SML_Config;
83bool shellFound;
84
85/* Callback function used internally by some of the SQLite3 commands */
86int32_t
87callback(void *notUsed, int32_t argc, char **argv, char **azColName)
88{
89    for(size_t i = 0; i < argc; i++) {
90        LOG("%s = %s\n", azColName[i], argv[i] ? argv[i] : "NULL");
91    }
92
93    LOG("\n");
94    return 0;
95}
96
97/* Useful for spotchecking what's in the database */
98void
99printDatabase()
100{
101    /*
102    LOG("\n\n\n");
103    _data_DB->command = "select ";
104    _data_DB->command.append(_data_DB->tablename);
105    _data_DB->command.append(".* from ");
106    _data_DB->command.append(_data_DB->tablename);
107    _data_DB->command.append(";");
108
109    char *errorMsg;
110    int32_t rc = sqlite3_exec(_data_DB->db, _data_DB->command.c_str(), callback, 0, &errorMsg);
111    if((rc != SQLITE_OK) && (rc != 101))
112        WARNING("SQL error: %s\n", errorMsg);
113
114    LOG("database %s, table %s:\n", _data_DB->filename, _data_DB->tablename);
115    LOG("\n\n\n");
116    */
117}
118
119ServiceManagementLayer::ServiceManagementLayer()
120{
121    LOG("Creating Service Management Layer.\n");
122
123    shellSocketFD = -1;
124    numberOfCognitiveEngines = 0;
125    CE_Present = false;
126    cogEngSrv = 1;
127}
128
129/* Free and clear the DB's associated with this SML in the destructor.
130 *
131 * Note that exiting with an error condition will cause SML to not be destructed,
132 * resulting in the DB's staying in memory until the destructor is encountered in
133 * future executions. */
134ServiceManagementLayer::~ServiceManagementLayer()
135{
136    char *errorMsg;
137    int32_t rc;         /* sqlite command return code */
138
139    _services_DB->command = "drop table ";
140    _services_DB->command.append(_services_DB->tablename);
141    rc = sqlite3_exec(_services_DB->db, _services_DB->command.c_str(), callback, 0, &errorMsg);
142    if((rc != SQLITE_OK) && (rc != 101))
143        WARNING("ServiceManagementLayer::Destructor services 'drop table' error: %s\n", errorMsg);
144
145    _services_DB->command = "vacuum";
146    rc = sqlite3_exec(_services_DB->db, _services_DB->command.c_str(), callback, 0, &errorMsg);
147    if((rc != SQLITE_OK) && (rc != 101))
148        WARNING("ServiceManagementLayer::Destructor services 'vacuum' error: %s\n", errorMsg);
149
150    free(_services_DB);
151
152    _data_DB->command = "drop table ";
153    _data_DB->command.append(_data_DB->tablename);
154    rc = sqlite3_exec(_data_DB->db, _data_DB->command.c_str(), callback, 0, &errorMsg);
155    if((rc != SQLITE_OK) && (rc != 101))
156        WARNING("ServiceManagementLayer::Destructor data 'drop table' error: %s\n", errorMsg);
157
158    _data_DB->command = "vacuum";
159    rc = sqlite3_exec(_data_DB->db, _data_DB->command.c_str(), callback, 0, &errorMsg);
160    if((rc != SQLITE_OK) && (rc != 101))
161        WARNING("ServiceManagementLayer::Destructor data 'vacuum' error: %s\n", errorMsg);
162
163    free(_data_DB);
164}
165
166/* Note that sizes of CE_List, miss, and service are hardcoded for now.
167 * Also, their sizes are hardcoded into the code in various places; a fix
168 * for a future version. */
169ServiceManagementLayer::ServiceManagementLayer(const char* SML_Config, \
170    const char* serverName, const char* serverPort, int16_t clientPort)
171{
172    LOG("Creating Service Management Layer.\n");
173
174    _SML_Config = string(SML_Config);
175    SMLport = clientPort;
176
177    ConnectToShell(serverName, serverPort);
178    CE_List = new CE_Reg[10];
179
180    miss = new Mission[10];
181    for(size_t i = 0; i < 10; i++) {
182        miss[i].services = new Service[30];
183    }
184
185    Current_ID = 0;
186
187    LoadConfiguration(SML_Config, miss);
188
189    CreateServicesDB();
190    CreateDataDB();
191}
192
193/* CALLED BY: constructor
194 * INPUTS: <none>
195 * OUTPUTS: <none>
196 *
197 * DESCRIPTION: Create and initialize a DB to hold the services registered by components
198 */
199void
200ServiceManagementLayer::CreateServicesDB()
201{
202    sqlite3_stmt *ppStmt;   /* OUT: Statement handle */
203    const char *pzTail;     /* OUT: Pointer to unused portion of zSql */
204    int32_t rc;             /* sqlite command return code */
205
206    _services_DB = new services_s;
207    _services_DB->filename="Services_Table";
208    sqlite3_open(_services_DB->filename.c_str(), &(_services_DB->db));
209
210    char *cols[] = {(char *)"ID_Num", (char *)"Service_Name"};
211
212    _services_DB->tablename="Services";
213
214    /* ifprogram execution ends in anything other than a ordered shutdown, DB's will still
215     * be there for next run. Need to get rid of it so that old data isn't inadvertantly
216     * used in the next execution cycle. */
217    _services_DB->command = "DROP TABLE ifEXISTS Services;";     
218
219    rc = sqlite3_prepare_v2(_services_DB->db, _services_DB->command.c_str(), 128, &ppStmt, &pzTail);
220    if((rc != SQLITE_OK) && (rc != 101))
221        WARNING("ServiceManagementLayer::CreateServicesDB 'prepare_stmt' error %d\n", rc);
222
223    rc = sqlite3_step(ppStmt);
224    if((rc != SQLITE_OK) && (rc != 101))
225        WARNING("ServiceManagementLayer::CreateServicesDB 'step' error\n");
226
227    _services_DB->num_columns = 2;
228
229    /* Generate command */
230    _services_DB->command="CREATE TABLE ";
231    _services_DB->command.append(_services_DB->tablename);
232    _services_DB->command.append("(");
233    _services_DB->command.append(cols[0]);
234    _services_DB->command.append(" INT, ");
235    _services_DB->command.append(cols[1]);
236    _services_DB->command.append(" TEXT);");
237
238    /* Execute create table command */
239    rc = sqlite3_prepare_v2(_services_DB->db, _services_DB->command.c_str(), 128, &ppStmt, &pzTail);
240    if((rc != SQLITE_OK) && (rc != 101))
241        WARNING("ServiceManagementLayer::CreateServicesDB 'prepare_stmt' error %d\n", rc);
242
243    rc = sqlite3_step(ppStmt);
244    if((rc != SQLITE_OK) && (rc != 101))
245        WARNING("ServiceManagementLayer::CreateServicesDB 'step' error\n");
246}
247
248/* CALLED BY: constructor
249 * INPUTS: <none>
250 * OUTPUTS: <none>
251 *
252 * DESCRIPTION: Create and initialize a DB to hold the data sent by components
253 */
254void
255ServiceManagementLayer::CreateDataDB()
256{
257    sqlite3_stmt *ppStmt;   /* OUT: Statement handle */
258    const char *pzTail;     /* OUT: Pointer to unused portion of zSql */
259    int32_t rc;             /* sqlite command return code */
260
261    _data_DB = new data_s;
262
263    _data_DB->filename="Data_Table";
264    sqlite3_open(_data_DB->filename.c_str(), &(_data_DB->db));
265
266    char *cols[] = {(char *)"Tag", (char *)"Data"};
267
268    _data_DB->tablename = "Data";
269    _data_DB->command = "DROP TABLE ifEXISTS Data;";     
270
271    rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), 128, &ppStmt, &pzTail);
272    if((rc != SQLITE_OK) && (rc != 101))
273        WARNING("ServiceManagementLayer::CreateDataDB 'prepare_stmt' error %d\n", rc);
274
275    rc = sqlite3_step(ppStmt);
276    if((rc != SQLITE_OK) && (rc != 101))
277        WARNING("ServiceManagementLayer::CreateDataDB 'step' error\n");
278
279    _data_DB->num_columns = 2;
280
281    /* Generate command */
282    _data_DB->command = "CREATE TABLE ";
283    _data_DB->command.append(_data_DB->tablename);
284    _data_DB->command.append("(");
285    _data_DB->command.append(cols[0]);
286
287    /* First column is the name of the data (corresponding to the name of the output/input pair)
288     * It is the primary key so any subsequent data with the same name will replace the row. */
289    _data_DB->command.append(" TEXT PRIMARY KEY ON CONFLICT REPLACE, ");
290    _data_DB->command.append(cols[1]);
291    _data_DB->command.append(" TEXT);");
292
293    rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), 128, &ppStmt, &pzTail);
294    if((rc != SQLITE_OK) && (rc != 101))
295        WARNING("ServiceManagementLayer::CreateDataDB 'prepare_stmt' error %d\n", rc);
296
297    rc = sqlite3_step(ppStmt);
298    if((rc != SQLITE_OK) && (rc != 101))
299        WARNING("ServiceManagementLayer::CreateDataDB 'step' error\n");
300}
301
302/* CALLED BY: MessageHandler
303 * INPUTS: <none>
304 * OUTPUTS: <none>
305 *
306 * DESCRIPTION: Sends a message identifying this component as an SML to the Shell
307 */
308void
309ServiceManagementLayer::SendComponentType()
310{
311    SendMessage(shellSocketFD, "response_sml");
312    LOG("SML responded to GetRemoteComponentType query.\n");
313}
314
315/* CALLED BY: constructor
316 * INPUTS: |serverName| the IPv4 name of the server (127.0.0.1 for localhost)
317 *         |serverPort| the port on the server to connect to
318 * OUTPUTS: <none>
319 *
320 * DESCRIPTION: Connecting to the shell takes 2 steps
321 * 1) Establish a client socket for communication
322 * 2) Run the initial Registration/handshake routine
323 */
324void
325ServiceManagementLayer::ConnectToShell(const char* serverName, \
326        const char* serverPort)
327{
328    shellSocketFD = ClientSocket(serverName, serverPort);
329    RegisterComponent();
330}
331
332/* CALLED BY: StartSMLServer
333 * INPUTS: |ID| The ID number of the CE that has a message wating
334 * OUTPUTS: <none>
335 *
336 * DESCRIPTION: Called whenever a socket is identified as being ready for communication
337 *              This funciton reads the message and calls the appropriate helper
338 */
339void
340ServiceManagementLayer::MessageHandler(int32_t ID)
341{
342    char buffer[256];   
343    memset(buffer, 0, 256); 
344    int32_t _FD; 
345   
346    if(ID != -1)
347        _FD = CE_List[ID].FD;
348    else
349        _FD = shellSocketFD;
350
351    ReadMessage(_FD, buffer);
352   
353    //--------Policy Engine Stuff - no policy engine support in this version-------//
354
355    //printf("********* %s **********\n", buffer);
356    // TODO
357    // ifwe send integer op codes rather than strings, this process will be
358    // MUCH faster since instead of donig string compares we can simply
359    // switch on the integer value...
360    /*if(strcmp(buffer, "register_service") == 0) {
361        if(strcmp(buffer, "policy_geo") == 0) {
362        }
363        else if(strcmp(buffer, "policy_time") == 0) {
364        }
365        else if(strcmp(buffer, "policy_spectrum") == 0) {
366        }
367        else if(strcmp(buffer, "policy_spacial") == 0) {
368        }
369    }
370    else if(strcmp(buffer, "deregister_service") == 0) {
371        if(strcmp(buffer, "policy_geo") == 0) {
372        }
373        else if(strcmp(buffer, "policy_time") == 0) {
374        }
375        else if(strcmp(buffer, "policy_spectrum") == 0) {
376        }
377        else if(strcmp(buffer, "policy_spacial") == 0) {
378        }
379    }*/
380
381    //Go down the list to call the appropriate function
382    if(strcmp(buffer, "query_component_type") == 0) {
383        SendComponentType();
384    }
385    else if(strcmp(buffer, "reset_sml") == 0) {
386        Reset();
387    }
388    else if(strcmp(buffer, "shutdown_sml") == 0) {
389        Shutdown();
390    }
391    else if(strcmp(buffer, "register_engine_cognitive") == 0) {
392        RegisterCognitiveEngine(ID);
393    }
394    else if(strcmp(buffer, "register_service") == 0) {
395        ReceiveServices(ID);
396    }
397    else if(strcmp(buffer, "send_component_type") == 0) {
398        SendComponentType();
399    }
400    else if(strcmp(buffer, "list_services") == 0) {
401        ListServices();
402    }
403    else if(strcmp(buffer, "set_active_mission") == 0) {
404        SetActiveMission();
405    }
406    else if(strcmp(buffer, "request_optimization") == 0) {
407        PerformActiveMission();
408    }
409    else if(strcmp(buffer, "deregister_engine_cognitive") == 0) {
410        DeregisterCognitiveEngine(ID);
411    }
412    else if(strcmp(buffer, "deregister_service") == 0) {
413        DeregisterServices(ID);
414    }
415}
416
417//TODO Finish
418/* CALLED BY: MessageHandler
419 * INPUTS: <none>
420 * OUTPUTS: <none>
421 *
422 * DESCRIPTION: Deregisters the component from the Shell.
423 */
424void
425ServiceManagementLayer::Shutdown()
426{
427    DeregisterComponent();
428}
429
430//TODO Finish
431/* CALLED BY: MessageHandler
432 * INPUTS: <none>
433 * OUTPUTS: <none>
434 *
435 * DESCRIPTION: Deregisters the component from the Shell
436 */
437void
438ServiceManagementLayer::Reset()
439{
440    DeregisterComponent();
441    ReloadConfiguration();
442}
443
444/* CALLED BY: ConnectToShell
445 * INPUTS: <none>
446 * OUTPUTS: <none>
447 *
448 * DESCRIPTION: Sends the registration message to the Shell
449 */
450void
451ServiceManagementLayer::RegisterComponent()
452{
453    SendMessage(shellSocketFD, "register_sml");
454    LOG("ServiceManagementLayer:: Registration message sent.\n");
455}
456
457/* CALLED BY: Shutdown
458 * INPUTS: <none>
459 * OUTPUTS: <none>
460 *
461 * DESCRIPTION: Closes the client socket with the shell, sends a deregstration message
462 */
463void
464ServiceManagementLayer::DeregisterComponent()
465{
466    SendMessage(shellSocketFD, "deregister_sml");
467    LOG("ServiceManagementLayer:: Deregistration message sent.\n");
468
469    shutdown(shellSocketFD, 2);
470    close(shellSocketFD);
471    shellSocketFD = -1;
472    LOG("ServiceManagementLayer:: Shell socket closed.\n");
473}
474
475
476/* CALLED BY: RegisterCognitiveEngine
477 * INPUTS: |ID| The ID number of the component where the data is to be transfered to
478 * OUTPUTS: <none>
479 *
480 * DESCRIPTION: Streams config data directly from the shell to the CE, and checks
481 * for an "ack" message from the CE after every sent message
482 * to know when to stop communication.
483 *
484 * NOTE: Modified to check the incoming message buffer rather than the outgoing
485 * message buffer to avoid a portion of the delay. May change this again to handle
486 * data more inteligently, taking advantage of it's properties.
487 */
488void
489ServiceManagementLayer::TransferRadioConfiguration(int32_t ID)
490{
491    struct timeval selTimeout;
492    fd_set sockSet;
493    int32_t rc = 1;
494    char buffer[256];
495
496    /* Send data until the CE sends an ACK message back */
497    while(rc != 0) {
498        memset(buffer, 0, 256);
499
500        /* Receive data from Shell */
501        ReadMessage(shellSocketFD, buffer);
502
503        /* Send data to CE */
504        SendMessage(CE_List[ID].FD, buffer);
505        FD_ZERO(&sockSet);
506        FD_SET(shellSocketFD, &sockSet);
507        selTimeout.tv_sec = 0;
508        selTimeout.tv_usec = 5000;
509
510        /* Check ifthere is a message on the shell ready to be processed */
511        rc = select(shellSocketFD + 1, &sockSet, NULL, NULL, &selTimeout);
512    }
513
514    memset(buffer, 0, 256);
515    ReadMessage(CE_List[ID].FD, buffer);
516    SendMessage(shellSocketFD, buffer);
517}
518
519
520/* CALLED BY: RegisterCognitiveEngine
521 * INPUTS: |ID| The ID number of the component where the data is to be transfered to
522 * OUTPUTS: <none>
523 *
524 * DESCRIPTION: Simmilar to TransferRadioConfig, just with Experience data
525 *
526 * NOTE: Modified to check the incoming message buffer rather than the outgoing
527 * message buffer to avoid a portion of the delay. May change this again to handle
528 * data more inteligently, taking advantage of it's properties.
529 */
530void
531ServiceManagementLayer::TransferExperience(int32_t ID)
532{
533    struct timeval selTimeout;
534    fd_set sockSet;
535    int32_t rc = 1;
536    char buffer[256];
537    /* Send data until the CE sends an ACK message back */
538    while(rc != 0) {
539        memset(buffer, 0, 256);
540
541        /* Receive data from Shell */
542        ReadMessage(shellSocketFD, buffer);
543
544        /* Send data to CE */
545        SendMessage(CE_List[ID].FD, buffer);
546        FD_ZERO(&sockSet);
547        FD_SET(shellSocketFD, &sockSet);
548        selTimeout.tv_sec = 0;
549        selTimeout.tv_usec = 5000;
550
551        /* Check ifthere is a message on the shell ready to be processed */
552        rc = select(shellSocketFD + 1, &sockSet, NULL, NULL, &selTimeout);
553    }
554
555    memset(buffer, 0, 256);
556    ReadMessage(CE_List[ID].FD, buffer);
557    SendMessage(shellSocketFD, buffer);
558}
559
560/* CALLED BY: MessageHandler
561 * INPUTS: |ID| The ID number of the component where service is located
562 * OUTPUTS: <none>
563 *
564 * DESCRIPTION: Inserts a service into the DB with the ID of the component where it exists
565 */
566void
567ServiceManagementLayer::ReceiveServices(int32_t ID)
568{
569    char buffer[256];
570    memset(buffer, 0, 256);
571    ReadMessage(CE_List[ID].FD, buffer);
572
573    char *cols[] = {(char *) "ID_Num", (char *) "Service_Name"};
574
575    /* Generate command */
576    _services_DB->command = "insert into ";
577    _services_DB->command.append(_services_DB->tablename);
578    _services_DB->command.append(" (");
579    _services_DB->command.append(cols[0]);
580    _services_DB->command.append(", ");
581    _services_DB->command.append(cols[1]);
582    _services_DB->command.append(") ");
583    _services_DB->command.append(" values(");
584
585    char temp[3];
586    memset(temp, 0, 3);
587    sprintf(temp, "%d", ID);
588
589    _services_DB->command.append(temp);
590    _services_DB->command.append(", '");
591    _services_DB->command.append(buffer);
592    _services_DB->command.append("');");
593   
594    /* Execute add command */
595    char *errorMsg;
596    int rc = sqlite3_exec(_services_DB->db, _services_DB->command.c_str(), callback, 0, &errorMsg);
597    if((rc != SQLITE_OK) && (rc != 101))
598        WARNING("ServiceManagementLayer::RecieveServices DB Error %s\n", errorMsg);
599}
600
601/* CALLED BY: MessageHandler
602 * INPUTS: <none>
603 * OUTPUTS: <none>
604 *
605 * DESCRIPTION: This method associates the services that components provide with the
606 * services that are requested in the mission. Each service in the mission is given
607 * the ID and FD of a component that has registered to provide that service. Deregistration
608 * is okay until this method is called without a reload, but ifderegistration occurs after this
609 * method is called it needs to be called again even ifother engines also provide the services
610 */
611void
612ServiceManagementLayer::SetActiveMission()
613{
614    char buffer[256];
615    memset(buffer, 0, 256);
616    ReadMessage(shellSocketFD, buffer);
617
618    uint32_t missID = atoi(buffer);
619    for(activeMission = 0; activeMission < 10; activeMission++) {
620        /* Find the active mission by comparing mission ID's */
621        if(miss[activeMission].missionID == missID)
622            break;
623    }
624
625    LOG("ServiceManagementLayer:: Received Set Active Mission command: %i.\n", missID);
626
627    /* For each service in the mission */
628    for(size_t i = 0; i < miss[activeMission].numServices; i++) {
629        /* Check whether the current service is an actual service or a conditional */
630        if(miss[activeMission].services[i].name.compare("if") && \
631                miss[activeMission].services[i].name.compare("dowhile") && \
632                miss[activeMission].services[i].name.compare("shell")) {
633                /* ifit is a service, search the database of registered services to find
634                 * the ID of the component that registered it */
635                _services_DB->command="select ";
636                _services_DB->command.append(_services_DB->tablename);
637                _services_DB->command.append(".* from ");
638                _services_DB->command.append( _services_DB->tablename);
639                _services_DB->command.append(" where Service_Name=='");
640                _services_DB->command.append(miss[activeMission].services[i].name);
641                _services_DB->command.append("';");
642   
643                sqlite3_stmt * pStatement;
644                int32_t rc = sqlite3_prepare_v2(_services_DB->db, _services_DB->command.c_str(), \
645                        -1, &pStatement, NULL);
646                if(rc == SQLITE_OK) {
647                    if(sqlite3_step(pStatement) == SQLITE_ROW)
648                        miss[activeMission].services[i].componentID = sqlite3_column_int(pStatement, 0);
649                    else {
650                        WARNING("services_DB:: Mission requires service %s ", \
651                                miss[activeMission].services[i].name.c_str());
652                        WARNING("not provided by any connected component.\n");
653                        rc = 31337;
654                    }
655                } else {
656                    WARNING("services_DB:: Error executing SQL statement. rc = %i\n%s\n", \
657                            rc, _services_DB->command.c_str());
658                }
659
660                sqlite3_finalize(pStatement);
661                miss[activeMission].services[i].socketFD = \
662                    CE_List[miss[activeMission].services[i].componentID].FD;
663        }
664        /* TODO Nothing to be done for conditionals at this stage */
665    }
666 
667    SendMessage(shellSocketFD, "ack");
668    LOG("ServiceManagementLayer:: Done setting active mission.\n");
669}
670
671/* CALLED BY: PerformActiveMission
672 * INPUTS: |sourceID| ID of the service that is being processed
673 * OUTPUTS: <none>
674 *
675 * DESCRIPTION: This is a helper method for the "PerformActiveMission" function
676 * NOTE: This function has changed drastically from the previous implementation
677 *
678 * Takes an ID of a service. For that service, finds inputs in DB and forwords
679 * those on to the engine after sending comm-starting messages. Afterwords, listenes
680 * for the outputs so that it can store those in the database for future services or
681 * the overall output
682 */
683void
684ServiceManagementLayer::TransactData(int32_t sourceID)
685{
686    char buffer[256];
687    std::string data;
688    char *cols[] = {(char *) "Tag", (char *) "Data"};
689    char *token;
690
691   /* Send a message directly to the shell */
692   if(miss[activeMission].services[sourceID].name.find("shell") != string::npos) {
693       shellFound=true;
694
695       int32_t k = 0;
696       while((k < 10) && (!miss[activeMission].input[k].empty())) {
697           k++;
698       }
699
700       sprintf(buffer, "%d", k);
701       SendMessage(shellSocketFD, buffer);
702       for(int32_t t = 0; t < k; t++) {
703           memset(buffer, 0 , 256);
704           _data_DB->command="select ";
705           _data_DB->command.append(_data_DB->tablename);
706           _data_DB->command.append(".* from ");
707           _data_DB->command.append(_data_DB->tablename);
708           _data_DB->command.append(" where Tag=='");
709           _data_DB->command.append(miss[activeMission].input[t]);
710           _data_DB->command.append("';");
711           sqlite3_stmt * pStatement;
712
713           int32_t rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), \
714                   -1, &pStatement, NULL);
715           if(rc == SQLITE_OK) {
716               if(sqlite3_step(pStatement) == SQLITE_ROW)
717                   data=((const char*) sqlite3_column_text(pStatement, 1));
718               else {
719                   LOG("3data_DB:: Data not yet in DB., %s\n", _data_DB->command.c_str());
720                   rc = 31337;
721               }
722           }
723           else {
724               WARNING("data_DB:: Error executing SQL statement. rc = %i\n%s\n", \
725                       rc,_data_DB->command.c_str());
726           }
727
728           sqlite3_finalize(pStatement);
729           token = strtok((char *) data.c_str(), "@");
730           token = strtok(NULL, "@");
731           SendMessage(shellSocketFD, token);
732           token = strtok(NULL, "@");
733           SendMessage(shellSocketFD, token);
734       }
735
736       return;
737   }
738
739    /* ifthis is a service command and not a shell command... */
740    /* Transmission starting messages */
741    SendMessage(miss[activeMission].services[sourceID].socketFD, "request_optimization_service");
742    SendMessage(miss[activeMission].services[sourceID].socketFD, \
743            miss[activeMission].services[sourceID].name.c_str());
744}
745
746/* CALLED BY: MessageHandler
747 * INPUTS: <none>
748 * OUTPUTS: <none>
749 *
750 * DESCRIPTION: This function works by first sending the inputs from the shell to
751 * the appropriate components. The first service should begin immeadiately, as
752 * should any others who have all of their input parameters. When they complete,
753 * the output path is found and the data is transfered as it becomes available
754 * Presumably at this point the second function has all of it's parameters, so it
755 * begins to compute, and the cycle repeats.
756 *
757 * Rules for active missions (currently)
758 * -Five inputs/outputs per service and per mission
759 * -All ordering constraints have been relaxed in this version; all data is stored
760 *  locally and only sent when requested
761 * -ifand while support fully implemented - up to three levels (if's can be nested, but dowhiles cannot)
762 * -For dowhiles, assumes loop condition determined on last line
763 *
764 * -IMPORTANT: DB uses '@' to seperate individual statements; using '@' in the data
765 *  stream will result in incorrect behavior
766 */
767void
768ServiceManagementLayer::PerformActiveMission()
769{
770    shellFound = false;
771    std::string data_param;
772    std::string data_obsv;
773    std::string data;
774    std::string input;
775    std::string check;
776
777    char buffer[256];
778    char buffer1[256];
779    std::string token, token2;
780    std::string data2;
781
782    int32_t rc;
783    char *errorMsg;
784    char *cols[] = {(char *) "Tag", (char *) "Data"};
785
786    LOG("ServiceManagementLayer:: Received PerformActiveMission command.\n");
787
788    /* Get the inputs */
789    memset(buffer, 0, 256);
790    ReadMessage(shellSocketFD, buffer);
791
792    /* Receive Set of Parameters */
793    memset(buffer, 0, 256);
794    ReadMessage(shellSocketFD, buffer);
795    int32_t t = atoi(buffer);
796    for(size_t m = 0; m < t; m++) {
797        memset(buffer1, 0, 256);
798        ReadMessage(shellSocketFD, buffer1);
799        _data_DB->command="insert into ";
800        _data_DB->command.append(_data_DB->tablename);
801        _data_DB->command.append(" (");
802        _data_DB->command.append(cols[0]);
803        _data_DB->command.append(", ");
804        _data_DB->command.append(cols[1]);
805        _data_DB->command.append(") ");
806
807        memset(buffer, 0, 256);
808        ReadMessage(shellSocketFD, buffer);
809        _data_DB->command.append(" values('");
810        _data_DB->command.append(buffer1);
811        _data_DB->command.append("', '1@");
812        _data_DB->command.append(buffer1);
813        _data_DB->command.append("@");
814        _data_DB->command.append(buffer);
815        _data_DB->command.append("');");
816
817        rc = sqlite3_exec(_data_DB->db, _data_DB->command.c_str(), callback, 0, &errorMsg);
818        if((rc != SQLITE_OK) && (rc != 101))
819           WARNING("SQL error: %s\n", errorMsg);
820    }
821
822    int32_t numstatements[3] = {0 ,0 ,0};
823    for(size_t i; i < miss[activeMission].numServices; i++) {
824        if(miss[activeMission].services[i].name.compare("if") == 0) {
825            input.clear();
826            check.clear();
827
828            for(size_t t = 0; t < 10; t++) {
829                if(!miss[activeMission].services[i].output[t].empty()) {
830                    input = miss[activeMission].services[i - numstatements[0] - 1].output[t];
831
832                    _data_DB->command="SELECT ";
833                    _data_DB->command.append(_data_DB->tablename);
834                    _data_DB->command.append(".* from ");
835                    _data_DB->command.append(_data_DB->tablename);
836                    _data_DB->command.append(" where Tag=='");
837                    _data_DB->command.append(input);
838                    _data_DB->command.append("';");
839
840                    sqlite3_stmt *pStatement;
841                    rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), \
842                            -1, &pStatement, NULL);
843                    if(rc == SQLITE_OK) {
844                        if(sqlite3_step(pStatement) == SQLITE_ROW)
845                             data = (const char *) sqlite3_column_text(pStatement, 1);
846                        else {
847                            WARNING("1 data_DB:: Data not yet in DB.\n");
848                            rc=31337;
849                        }
850                    } else {
851                        WARNING("data_DB:: Error executing SQL statement. rc = %i\n%s\n", \
852                                rc,_data_DB->command.c_str());
853                    }
854
855                    sqlite3_finalize(pStatement);
856                    int32_t pos = data.find_last_of("@", data.length() - 2);
857                    token = data.substr(pos + 1);
858                    token.erase(token.length() - 1);
859                    data.clear();
860                    break;
861                }
862            }
863
864            bool doit = false;
865            if(miss[activeMission].services[i].output[t].find(">") != string::npos) {
866                std::string data2;
867                _data_DB->command="SELECT ";
868                _data_DB->command.append(_data_DB->tablename);
869                _data_DB->command.append(".* from ");
870                _data_DB->command.append(_data_DB->tablename);
871                _data_DB->command.append(" where Tag=='");
872                _data_DB->command.append(miss[activeMission].services[i].output[t].erase(0, 1));
873                _data_DB->command.append("';");
874                sqlite3_stmt *pStatement;
875
876                rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), \
877                        -1, &pStatement, NULL);
878                if(rc == SQLITE_OK) {
879                    if(sqlite3_step(pStatement) == SQLITE_ROW)
880                         data2 = (const char *) sqlite3_column_text(pStatement, 1);
881                    else {
882                        WARNING("2 data_DB:: Data not yet in DB.\n");
883                        rc = 31337;
884                    }
885                } else {
886                    WARNING("data_DB:: Error executing SQL statement. rc = %i\n%s\n", \
887                            rc, _data_DB->command.c_str());
888                }
889
890                sqlite3_finalize(pStatement);
891
892                int32_t pos = data2.find_last_of("@", data2.length() - 2);
893                token2 = data2.substr(pos + 1);
894                token2.erase(token2.length() - 1);
895                if(atof(token.c_str()) > atof(token2.c_str()))
896                    doit = true;
897            }
898            else if(miss[activeMission].services[i].output[t].find(token) != string::npos)
899                doit = true;
900
901            if(doit) {
902                for(size_t k = i + 1; k <= i+miss[activeMission].services[i].num_conds; k++) {
903                    if(miss[activeMission].services[k].name.compare("if") == 0) {
904                        input.clear();
905                        check.clear();
906                        for(size_t t = 0; t < 10; t++) {
907                            if(!miss[activeMission].services[k].output[t].empty()) {
908                                input = miss[activeMission].services[k - numstatements[1] - 1].output[t];
909                                _data_DB->command="SELECT ";
910                                _data_DB->command.append(_data_DB->tablename);
911                                _data_DB->command.append(".* from ");
912                                _data_DB->command.append(_data_DB->tablename);
913                                _data_DB->command.append(" where Tag=='");
914                                _data_DB->command.append(input);
915                                _data_DB->command.append("';");
916
917                                sqlite3_stmt *pStatement;
918                                rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), \
919                                        -1, &pStatement, NULL);
920                                if(rc == SQLITE_OK) {
921                                    if(sqlite3_step(pStatement) == SQLITE_ROW)
922                                         data = (const char *) sqlite3_column_text(pStatement, 1);
923                                    else {
924                                        WARNING("3 data_DB:: Data not yet in DB.\n");
925                                        rc = 31337;
926                                    }
927                                } else {
928                                    WARNING("data_DB:: Error executing SQL statement. rc = %i\n%s\n", \
929                                            rc,_data_DB->command.c_str());
930                                }
931
932                                sqlite3_finalize(pStatement);
933                                int32_t pos = data.find_last_of("@", data.length() - 2);
934                                token = data.substr(pos + 1);
935                                token.erase(token.length() - 1);
936                                break;
937                            }
938                        }
939
940                        bool doit = false;
941                        if(miss[activeMission].services[k].output[t].find(">") != string::npos) {
942                            std::string data2;
943                            _data_DB->command="SELECT ";
944                            _data_DB->command.append(_data_DB->tablename);
945                            _data_DB->command.append(".* from ");
946                            _data_DB->command.append(_data_DB->tablename);
947                            _data_DB->command.append(" where Tag=='");
948                            _data_DB->command.append(miss[activeMission].services[k].output[t].erase(0, 1));
949                            _data_DB->command.append("';");
950
951                            sqlite3_stmt *pStatement;
952                            rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), \
953                                    -1, &pStatement, NULL);
954                            if(rc == SQLITE_OK) {
955                                if(sqlite3_step(pStatement) == SQLITE_ROW)
956                                     data2 = (const char *) sqlite3_column_text(pStatement, 1);
957                                else {
958                                    WARNING("4 data_DB:: Data not yet in DB.\n");
959                                    rc = 31337;
960                                }
961                            } else {
962                                WARNING("data_DB:: Error executing SQL statement. rc = %i\n%s\n", \
963                                        rc, _data_DB->command.c_str());
964                            }
965
966                            sqlite3_finalize(pStatement);
967                            int32_t pos = data2.find_last_of("@", data2.length() - 2);
968                            token2 = data2.substr(pos + 1);
969                            token2.erase(token2.length() - 1);
970                            if(atof(token.c_str()) > atof(token2.c_str()))
971                                doit = true;
972                        }
973                        else if(miss[activeMission].services[k].output[t].find(token) != string::npos)
974                            doit=true;
975
976                        if(doit) {
977                            for(size_t j = k + 1; j <= k+miss[activeMission].services[k].num_conds; j++) {
978                                if(miss[activeMission].services[j].name.compare("if") == 0) {
979                                    input.clear();
980                                    check.clear();
981                                    for(t = 0; t < 10; t++) {
982                                        if(!miss[activeMission].services[j].output[t].empty()) {
983                                            input = miss[activeMission].services[j - numstatements[2] - 1].output[t];
984
985                                            _data_DB->command="SELECT ";
986                                            _data_DB->command.append(_data_DB->tablename);
987                                            _data_DB->command.append(".* from ");
988                                            _data_DB->command.append(_data_DB->tablename);
989                                            _data_DB->command.append(" where Tag=='");
990                                            _data_DB->command.append(input);
991                                            _data_DB->command.append("';");
992
993                                            sqlite3_stmt *pStatement;
994
995                                            rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), -1, &pStatement, NULL);
996                                            if(rc == SQLITE_OK) {
997                                                if(sqlite3_step(pStatement) == SQLITE_ROW)
998                                                     data = (const char *) sqlite3_column_text(pStatement, 1);
999                                                else {
1000                                                    WARNING("5 data_DB:: Data not yet in DB.\n");
1001                                                    rc = 31337;
1002                                                }
1003                                            } else {
1004                                                WARNING("data_DB:: Error executing SQL statement. rc = %i\n%s\n", \
1005                                                        rc, _data_DB->command.c_str());
1006                                            }
1007
1008                                            sqlite3_finalize(pStatement);
1009                                            int32_t pos = data.find_last_of("@", data.length()-2);
1010                                            token = data.substr(pos+1);
1011                                            token.erase(token.length()-1);
1012                                            data.clear();
1013                                            break;
1014                                        }
1015                                    }
1016
1017                                    bool doit = false;
1018                                    if(miss[activeMission].services[j].output[t].find(">") != string::npos) {
1019                                        _data_DB->command="SELECT ";
1020                                        _data_DB->command.append(_data_DB->tablename);
1021                                        _data_DB->command.append(".* from ");
1022                                        _data_DB->command.append(_data_DB->tablename);
1023                                        _data_DB->command.append(" where Tag=='");
1024                                        _data_DB->command.append(miss[activeMission].services[j].output[t].erase(0, 1));
1025                                        _data_DB->command.append("';");
1026
1027                                        sqlite3_stmt *pStatement;
1028                                        rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), -1, &pStatement, NULL);
1029                                        if(rc == SQLITE_OK) {
1030                                            if(sqlite3_step(pStatement) == SQLITE_ROW)
1031                                                 data2 = (const char *) sqlite3_column_text(pStatement, 1);
1032                                            else {
1033                                                WARNING("6 data_DB:: Data not yet in DB.\n");
1034                                                rc=31337;
1035                                            }
1036                                        } else {
1037                                            WARNING("data_DB:: Error executing SQL statement. rc = %i\n%s\n", \
1038                                                    rc, _data_DB->command.c_str());
1039                                        }
1040
1041                                        sqlite3_finalize(pStatement);
1042
1043                                        int32_t pos = data2.find_last_of("@", data2.length()-2);
1044                                        token2 = data2.substr(pos + 1);
1045                                        token2.erase(token2.length()-1);
1046
1047                                        if(atof(token.c_str()) > atof(token2.c_str()))
1048                                            doit=true;
1049
1050                                        data.clear();
1051                                    }
1052
1053                                    else if(miss[activeMission].services[j].output[t].find(token) != string::npos)
1054                                        doit = true;
1055
1056                                    if(doit) {
1057                                        for(size_t l = j + 1; l <= j + miss[activeMission].services[j].num_conds; l++) {
1058                                            TransactData(l);
1059                                        }
1060                                    }
1061
1062                                    numstatements[2] += miss[activeMission].services[j].num_conds + 1;
1063                                    j += miss[activeMission].services[j].num_conds;
1064                                } else if(miss[activeMission].services[j].name.compare("dowhile") == 0) {
1065                                    numstatements[0]=0;
1066
1067                                    while(true) {
1068                                        uint32_t l;
1069                                        for(l = j + 1; l <= j+miss[activeMission].services[j].num_conds; l++) {
1070                                            TransactData(l);
1071                                        }
1072
1073                                        data.clear();
1074                                        input.clear();
1075                                        check.clear();
1076
1077                                        int32_t t;
1078                                        for(t = 0; t < 10; t++) {
1079                                            if(!miss[activeMission].services[j].output[t].empty()){
1080                                                input=miss[activeMission].services[l-2].output[t];
1081                                                _data_DB->command="SELECT ";
1082                                                _data_DB->command.append(_data_DB->tablename);
1083                                                _data_DB->command.append(".* from ");
1084                                                _data_DB->command.append(_data_DB->tablename);
1085                                                _data_DB->command.append(" where Tag=='");
1086                                                _data_DB->command.append(input);
1087                                                _data_DB->command.append("';");
1088                                                sqlite3_stmt *pStatement;
1089
1090                                                rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), -1, &pStatement, NULL);
1091                                                if(rc == SQLITE_OK){
1092                                                    if(sqlite3_step(pStatement) == SQLITE_ROW)
1093                                                         data = (const char *) sqlite3_column_text(pStatement, 1);
1094                                                    else {
1095                                                        WARNING("7 data_DB:: Data not yet in DB.: %s\n", _data_DB->command.c_str());
1096                                                        rc = 31337;
1097                                                    }
1098                                                } else {
1099                                                    WARNING("data_DB:: SQL statement error. rc = %i\n%s\n", \
1100                                                            rc, _data_DB->command.c_str());
1101                                                }
1102
1103                                                sqlite3_finalize(pStatement);
1104                                                int32_t pos = data.find_last_of("@", data.length() - 2);
1105                                                token = data.substr(pos + 1);
1106                                                token.erase(token.length() - 1);
1107                                                break;
1108                                            }
1109                                        }
1110
1111                                        if(miss[activeMission].services[j].output[t].find(token) == string::npos) {
1112                                            break;
1113                                        }
1114                                    }
1115
1116                                    j += miss[activeMission].services[j].num_conds;
1117                                } else {
1118                                    numstatements[2] = 0;
1119                                    TransactData(j);
1120                                }
1121                            }
1122                        }
1123
1124                        numstatements[1] += miss[activeMission].services[k].num_conds + 1;
1125                        k += miss[activeMission].services[k].num_conds;
1126                    } else if(miss[activeMission].services[k].name.compare("dowhile") == 0) {
1127                        numstatements[0] = 0;
1128                        while(true) {
1129                            int32_t j;
1130                            for(j = k + 1; j <= k + miss[activeMission].services[k].num_conds; j++) {
1131                                TransactData(j);
1132                            }
1133
1134                            data.clear();
1135                            input.clear();
1136                            check.clear();
1137                            int32_t t;
1138                            for(t = 0; t < 10; t++) {
1139                                if(!miss[activeMission].services[k].output[t].empty()) {
1140                                    input = miss[activeMission].services[j - 1].output[t];
1141                                    _data_DB->command="SELECT ";
1142                                    _data_DB->command.append(_data_DB->tablename);
1143                                    _data_DB->command.append(".* from ");
1144                                    _data_DB->command.append(_data_DB->tablename);
1145                                    _data_DB->command.append(" where Tag=='");
1146                                    _data_DB->command.append(input);
1147                                    _data_DB->command.append("';");
1148
1149                                    sqlite3_stmt *pStatement;
1150                                    rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), -1, &pStatement, NULL);
1151                                    if(rc == SQLITE_OK) {
1152                                        if(sqlite3_step(pStatement) == SQLITE_ROW)
1153                                             data = (const char *) sqlite3_column_text(pStatement, 1);
1154                                        else {
1155                                            WARNING("8 data_DB:: Data not yet in DB.\n");
1156                                            rc = 31337;
1157                                        }
1158                                    } else {
1159                                        WARNING("data_DB:: Error executing SQL statement. rc = %i\n%s\n", \
1160                                                rc, _data_DB->command.c_str());
1161                                    }
1162
1163                                    sqlite3_finalize(pStatement);
1164                                    int32_t pos = data.find_last_of("@", data.length() - 2);
1165                                    token = data.substr(pos + 1);
1166                                    token.erase(token.length() - 1);
1167                                    break;
1168                                }
1169                            }
1170
1171                            if(miss[activeMission].services[k].output[t].find(token) == string::npos) {
1172                                break;
1173                            }
1174                        }
1175
1176                        k += miss[activeMission].services[k].num_conds;
1177                    } else{
1178                        numstatements[1] = 0;
1179                        TransactData(k);
1180                    }
1181                }
1182            }
1183
1184            numstatements[0] += miss[activeMission].services[i].num_conds + 1;
1185            i += miss[activeMission].services[i].num_conds;
1186        } else if(miss[activeMission].services[i].name.compare("dowhile") == 0) {
1187            numstatements[0] = 0;
1188            while(true) {
1189                uint32_t k;
1190                for(k = i + 1; k <= i + miss[activeMission].services[i].num_conds; k++){
1191                    TransactData(k);
1192                }
1193
1194                data.clear();
1195                input.clear();
1196                check.clear();
1197                int32_t t;
1198                for(t = 0; t < 10; t++){
1199                    if(!miss[activeMission].services[i].output[t].empty()) {
1200                        input = miss[activeMission].services[k - 1].output[t];
1201                        _data_DB->command="SELECT ";
1202                        _data_DB->command.append(_data_DB->tablename);
1203                        _data_DB->command.append(".* from ");
1204                        _data_DB->command.append(_data_DB->tablename);
1205                        _data_DB->command.append(" where Tag=='");
1206                        _data_DB->command.append(input);
1207                        _data_DB->command.append("';");
1208
1209                        sqlite3_stmt *pStatement;
1210                        rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), -1, &pStatement, NULL);
1211                        if(rc == SQLITE_OK) {
1212                            if(sqlite3_step(pStatement) == SQLITE_ROW)
1213                                 data = (const char *) sqlite3_column_text(pStatement, 1);
1214                            else {
1215                                WARNING("10data_DB:: Data not yet in DB.\n");
1216                                rc = 31337;
1217                            }
1218                        } else {
1219                            WARNING("data_DB:: Error executing SQL statement. rc = %i\n%s\n", \
1220                                    rc, _data_DB->command.c_str());
1221                        }
1222
1223                        sqlite3_finalize(pStatement);
1224                        int32_t pos = data.find_last_of("@", data.length()-2);
1225                        token = data.substr(pos + 1);
1226                        token.erase(token.length() - 1);
1227                        break;
1228                    }
1229                }
1230
1231                if(miss[activeMission].services[i].output[t].find(token) == string::npos) {
1232                    break;
1233                }
1234            }
1235
1236            i += miss[activeMission].services[i].num_conds;
1237        } else{
1238            numstatements[0] = 0;
1239            TransactData(i);
1240        }
1241    }
1242
1243    int32_t i = 0;
1244    data.clear();
1245
1246    if(!shellFound) {
1247        int k = 0;
1248        while(k < 10 && !miss[activeMission].input[k].empty()) {
1249            k++;
1250        }
1251
1252        sprintf(buffer, "%d", k);
1253        SendMessage(shellSocketFD, buffer);
1254
1255        for(size_t t = 0; t < k; t++) {
1256            SendMessage(shellSocketFD, miss[activeMission].input[t].c_str());
1257            SendMessage(shellSocketFD, "0");
1258        }
1259    }
1260
1261    LOG("ServiceManagementLayer:: Done performing active mission.\n");
1262}
1263
1264
1265/* CALLED BY: MessageHandler
1266 * INPUTS: <none>
1267 * OUTPUTS: <none>
1268 *
1269 * DESCRIPTION: Print a list of the services currently registered and the ID's of the components that registered them
1270 */
1271void
1272ServiceManagementLayer::ListServices()
1273{
1274    _services_DB->command="select ";
1275    _services_DB->command.append(_services_DB->tablename);
1276    _services_DB->command.append(".* from ");
1277    _services_DB->command.append(_services_DB->tablename);
1278    _services_DB->command.append(";");
1279
1280    // Execute print (select all) command
1281    char *errorMsg;
1282    int32_t rc = sqlite3_exec(_services_DB->db, _services_DB->command.c_str(), callback, 0, &errorMsg);
1283    if((rc != SQLITE_OK) && (rc != 101))
1284        WARNING("SQL error: %s\n", errorMsg);
1285
1286    LOG("database %s, table %s:\n", _services_DB->filename.c_str(), _services_DB->tablename.c_str());
1287}
1288
1289/* CALLED BY: Reset
1290 * INPUTS: <none>
1291 * OUTPUTS: <none>
1292 *
1293 * DESCRIPTION: Clear and reinitialize the mission array, then reload the configuration file
1294 */
1295void
1296ServiceManagementLayer::ReloadConfiguration()
1297{
1298    LOG("ServiceManagementLayer:: Reloading Configuration.\n");
1299
1300    free(miss);
1301
1302    miss = new Mission[10];
1303    for(size_t i = 0; i < 10; i++)
1304        miss[i].services = new Service[30];
1305
1306    LoadConfiguration(_SML_Config.c_str(), miss);
1307}
1308
1309/* CALLED BY: constructor
1310 * INPUTS: |SML_Config| Address (either relitive or full) of the XML file containing mission data
1311 *        |mList| Mission array to be modified
1312 * OUTPUTS: <none>
1313 *
1314 * DESCRIPTION: IMPORTANT - See formatting instructions for correct parsing of data
1315 * Can currently handle 10 inputs and 10 outputs per service, but easily expandable
1316 * Also, can handle two layer of nested conditional statements, but could
1317 * be expanded to meet additional needs.
1318 *
1319 * Components assigned to mission during "set active mission" stage so that
1320 * components can still continue to register after the configuration is loaded
1321 */
1322void
1323ServiceManagementLayer::LoadConfiguration(const char *SML_Config, Mission* &mList)
1324{
1325    TiXmlElement *pMission;
1326    TiXmlElement *pService;
1327    TiXmlElement *pChild0, *pChild1, *pChild2, *pChild3, *pChild4;
1328    TiXmlHandle hRoot(0);
1329
1330    LOG("ServiceManagementLayer:: Loading Configuration.\n");
1331
1332    TiXmlDocument doc(".");
1333    doc.LoadFile(SML_Config);
1334    bool loadOkay = doc.LoadFile();
1335    if(!loadOkay)
1336        WARNING("Loading SML configuration failed: %s\n", SML_Config);
1337
1338    TiXmlHandle hDoc(&doc);
1339   
1340    pMission = hDoc.FirstChildElement().Element();
1341
1342    if(!pMission)
1343        WARNING("No valid root!");
1344
1345    hRoot = TiXmlHandle(pMission);
1346    pService = pMission->FirstChildElement();
1347
1348    int32_t mission_num = 0;
1349
1350    /* Iterate through the missions */
1351    for(pChild0 = pMission->FirstChildElement(); pChild0 ; pChild0 = pChild0->NextSiblingElement())  {
1352        int32_t service_num = 0;
1353        uint16_t cond_array[] = {0, 0, 0};
1354   
1355        for(pChild1  = (pChild0->FirstChildElement())->FirstChildElement(); pChild1; \
1356                pChild1  = pChild1->NextSiblingElement()) {
1357
1358            int32_t conditional_0 = service_num;
1359            for(pChild2 = pChild1->FirstChildElement(); pChild2; pChild2 = pChild2->NextSiblingElement()) {
1360                service_num++;
1361
1362                int32_t conditional_1 = service_num;
1363                for(pChild3 = pChild2->FirstChildElement(); pChild3; pChild3 = pChild3->NextSiblingElement()) {
1364                    service_num++;
1365                    int32_t conditional_2 = service_num;
1366                    for(pChild4 = pChild3->FirstChildElement(); pChild4; pChild4 = pChild4->NextSiblingElement()) {
1367                        service_num++;
1368                        if(pChild4->Attribute("name"))
1369                            mList[mission_num].services[service_num].name = pChild4->Attribute("name");
1370                        else
1371                            mList[mission_num].services[service_num].name = pChild4->Value();
1372
1373                        for(size_t i = 1; i <= 10; i++) {
1374                            char buffer[9]="input";
1375                            sprintf(buffer, "%s%d", buffer, i);
1376                            if(pChild4->Attribute(buffer))
1377                                mList[mission_num].services[service_num].input[i - 1] = pChild4->Attribute(buffer);
1378
1379                            char buffer2[9]="output";
1380                            sprintf(buffer2, "%s%d", buffer2, i);
1381                            if(pChild4->Attribute(buffer2))
1382                                mList[mission_num].services[service_num].output[i - 1] = pChild4->Attribute(buffer2);
1383                        }
1384
1385                        if(pChild4->Attribute("parameter"))
1386                            mList[mission_num].services[service_num].parameter = pChild4->Attribute("parameter");
1387
1388                        cond_array[2]++;
1389                    }
1390
1391                    if(!strcmp(pChild3->Value(), "shell") || conditional_2 != service_num) {
1392                        mList[mission_num].services[conditional_2].name = pChild3->Value();
1393                    } else {
1394                        mList[mission_num].services[service_num].name = pChild3->Attribute("name");
1395                    }
1396
1397                    for(size_t i = 1; i <= 10; i++) {
1398                        char buffer[9]="input";
1399                        sprintf(buffer, "%s%d", buffer, i);
1400                        if(pChild3->Attribute(buffer))
1401                            mList[mission_num].services[conditional_2].input[i - 1] = pChild3->Attribute(buffer);
1402
1403                        char buffer2[9]="output";
1404                        sprintf(buffer2, "%s%d", buffer2, i);
1405                        if(pChild3->Attribute(buffer2))
1406                            mList[mission_num].services[conditional_2].output[i - 1] = pChild3->Attribute(buffer2);
1407                    }
1408
1409                    if(pChild3->Attribute("parameter"))
1410                        mList[mission_num].services[conditional_2].parameter = pChild3->Attribute("parameter");
1411
1412                    mList[mission_num].services[conditional_2].num_conds = cond_array[2];
1413                    cond_array[1] += cond_array[2] + 1;
1414                    cond_array[2] = 0;
1415                }
1416
1417                if(!strcmp(pChild2->Value(), "shell") || (conditional_1 != service_num)) {
1418                    mList[mission_num].services[conditional_1].name = pChild2->Value();
1419                } else{
1420                    mList[mission_num].services[service_num].name = pChild2->Attribute("name");
1421                }
1422
1423                for(int i = 1; i <= 10; i++) {
1424                    char buffer[9]="input";
1425                    sprintf(buffer, "%s%d", buffer, i);
1426                    if(pChild2->Attribute(buffer))
1427                        mList[mission_num].services[conditional_1].input[i - 1] = pChild2->Attribute(buffer);
1428
1429                    char buffer2[9]="output";
1430                    sprintf(buffer2, "%s%d", buffer2, i);
1431                    if(pChild2->Attribute(buffer2))
1432                        mList[mission_num].services[conditional_1].output[i - 1] = pChild2->Attribute(buffer2);
1433                }
1434
1435                if(pChild2->Attribute("parameter"))
1436                    mList[mission_num].services[conditional_1].parameter = pChild2->Attribute("parameter");
1437
1438                mList[mission_num].services[conditional_1].num_conds = cond_array[1];
1439                cond_array[0] += cond_array[1] + 1;
1440                cond_array[1] = 0;
1441            }
1442       
1443            if(!strcmp(pChild1->Value(), "shell") || conditional_0 != service_num) {
1444                mList[mission_num].services[conditional_0].name = pChild1->Value();
1445            } else{
1446                mList[mission_num].services[conditional_0].name = pChild1->Attribute("name");
1447            }
1448
1449            for(size_t i = 1; i <= 10; i++) {
1450                char buffer[9]="input";
1451                sprintf(buffer, "%s%d", buffer, i);
1452                if(pChild1->Attribute(buffer))
1453                    mList[mission_num].services[conditional_0].input[i-1] = pChild1->Attribute(buffer);
1454
1455                char buffer2[9]="output";
1456                sprintf(buffer2, "%s%d", buffer2, i);
1457                if(pChild1->Attribute(buffer2))
1458                    mList[mission_num].services[conditional_0].output[i-1] = pChild1->Attribute(buffer2);
1459            }
1460
1461            if(pChild1->Attribute("parameter"))
1462                mList[mission_num].services[conditional_0].parameter = pChild1->Attribute("parameter");
1463            mList[mission_num].services[conditional_0].num_conds = cond_array[0];
1464                cond_array[0] = 0;
1465
1466            service_num++;
1467        }
1468   
1469        mList[mission_num].numServices = service_num;
1470        mList[mission_num].name = pChild0->Attribute("name");
1471        mList[mission_num].missionID = atoi(pChild0->Attribute("id"));
1472
1473        for(size_t i = 1; i <= 10; i++) {
1474            char buffer[9]="param";
1475            sprintf(buffer, "%s%d", buffer, i);
1476            if(pChild0->Attribute(buffer)){
1477                mList[mission_num].input[i-1] = pChild0->Attribute(buffer);
1478            }
1479        }
1480
1481        mission_num++;
1482    }
1483
1484    LOG("ServiceManagementLayer:: Done Loading Configuration\n");
1485}
1486
1487/* CALLED BY: MessageHandler
1488 * INPUTS: |ID| The ID number of the engine to be registered
1489 * OUTPUTS: <none>
1490 *
1491 * DESCRIPTION: Sends a registration message onto the shell and sends the ACK back to the component
1492 */
1493void
1494ServiceManagementLayer::RegisterCognitiveEngine(int32_t ID)
1495{
1496    SendMessage(shellSocketFD, "register_engine_cognitive");
1497
1498    LOG("ServiceManagementLayer:: CE registration message forwarded to shell.\n");
1499    char buffer[256];
1500    memset(buffer, 0, 256);
1501    ReadMessage(shellSocketFD, buffer);
1502    SendMessage(CE_List[ID].FD, buffer);
1503
1504    TransferRadioConfiguration(ID);
1505    memset(buffer, 0, 256);
1506    TransferExperience(ID);
1507    memset(buffer, 0, 256);
1508    numberOfCognitiveEngines++;
1509    CE_Present = true;
1510}
1511
1512/* CALLED BY: MessageHandler
1513 * INPUTS: |ID| The ID number of the engine to have it's services deregistered
1514 * OUTPUTS: <none>
1515 *
1516 * DESCRIPTION: Deletes individual services from the DB
1517 * NOTE THAT this function only needs to be called ifservice deregistration is going
1518 * to be done at a different time than component deregistration; it is handled
1519 * more efficiently and directly during that deregistration process.
1520 */
1521void
1522ServiceManagementLayer::DeregisterServices(int32_t ID)
1523{
1524    char buffer[256];
1525    memset(buffer, 0, 256);
1526    ReadMessage(CE_List[ID].FD, buffer);
1527    _services_DB->command="DELETE FROM ";
1528    _services_DB->command.append(_services_DB->tablename);
1529    _services_DB->command.append(" WHERE ID_Num IN (SELECT ");
1530
1531    char tmp[3];
1532    memset(tmp,0,3);
1533    sprintf(tmp, "%d", ID);
1534    _services_DB->command.append(tmp);
1535    _services_DB->command.append(" FROM ");
1536    _services_DB->command.append(_services_DB->tablename);
1537    _services_DB->command.append(" WHERE Service_Name");
1538    _services_DB->command.append("=='");
1539    _services_DB->command.append(buffer);
1540    _services_DB->command.append("');");
1541
1542    char *errorMsg;
1543    int32_t rc = sqlite3_exec(_services_DB->db, _services_DB->command.c_str(), callback, 0, &errorMsg);
1544    if((rc != SQLITE_OK) && (rc != 101))
1545        WARNING("SQL error: %s\n", errorMsg);
1546}
1547
1548/* CALLED BY: MessageHandler
1549 * INPUTS: |ID| The ID number of the engine to have it's services deregistered
1550 * OUTPUTS: <none>
1551 *
1552 * DESCRIPTION: Deletes the contact info for the cognitive engine, forwards a deregistration message to the shell
1553 * Also, deletes the services from the DB
1554 */
1555void
1556ServiceManagementLayer::DeregisterCognitiveEngine(int32_t ID)
1557{
1558    LOG("ServiceManagementLayer:: CE deregistration message forwarded to shell.\n");
1559
1560    numberOfCognitiveEngines--;
1561    if(numberOfCognitiveEngines == 0)
1562        CE_Present = false;
1563
1564    SendMessage(shellSocketFD, "deregister_engine_cognitive");
1565    char buffer[256];
1566    memset(buffer, 0, 256);
1567    ReadMessage(shellSocketFD, buffer);
1568    SendMessage(CE_List[ID].FD, buffer);
1569    if(strcmp("deregister_ack", buffer) != 0) {
1570        ERROR(1, "SML:: Failed to close CE socket\n");
1571    }
1572
1573    //Deregister the services
1574    _services_DB->command="DELETE FROM ";
1575    _services_DB->command.append(_services_DB->tablename);
1576    _services_DB->command.append(" WHERE ");
1577    _services_DB->command.append("ID_Num");
1578    _services_DB->command.append("==");
1579
1580    char tmp[3];
1581    memset(tmp,0,3);
1582    sprintf(tmp, "%d", ID);
1583    _services_DB->command.append(tmp);
1584    _services_DB->command.append(";");
1585
1586    char *errorMsg;
1587    int32_t rc = sqlite3_exec(_services_DB->db, _services_DB->command.c_str(), callback, 0, &errorMsg);
1588    if((rc != SQLITE_OK) && (rc != 101))
1589        WARNING("SQL error: %s\n", errorMsg);
1590
1591    CE_List[ID].FD = -1;
1592    CE_List[ID].ID_num = -1;
1593
1594    LOG("Cognitive Radio Shell:: CE Socket closed for engine #%d.\n", ID);
1595}
1596
1597
1598/* CALLED BY: test class
1599 * INPUTS: <none>
1600 * OUTPUTS: <none>
1601 *
1602 * DESCRIPTION: Sets up a server socket and listens for communication on either that or the shell socket
1603 */
1604void
1605ServiceManagementLayer::StartSMLServer()
1606{
1607    struct timeval selTimeout;
1608    int32_t running = 1;
1609    int32_t port, rc, new_sd = 1;
1610    int32_t desc_ready = 1;
1611    fd_set sockSet, shellSet;
1612
1613    cogEngSrv = CreateTCPServerSocket(SMLport);
1614    int32_t maxDescriptor = cogEngSrv;
1615
1616    if(InitializeTCPServerPort(cogEngSrv) == -1)
1617        ERROR(1,"Error initializing primary port\n");
1618
1619    while (running) {
1620        /* Zero socket descriptor vector and set for server sockets */
1621        /* This must be reset every time select() is called */
1622        FD_ZERO(&sockSet);
1623        FD_SET(cogEngSrv, &sockSet);
1624       
1625        for(uint16_t k = 0; k < Current_ID; k++){
1626            if(CE_List[k].ID_num != -1)
1627                FD_SET(CE_List[k].FD, &sockSet);
1628        }
1629
1630        /* Timeout specification */
1631        /* This must be reset every time select() is called */
1632        selTimeout.tv_sec = 0;      /* timeout (secs.) */
1633        selTimeout.tv_usec = 0;     /* 0 microseconds */
1634
1635        /* Changed both to zero so that select will check messages from the shell
1636         * instead of blocking when there is no command from the CE's to be processed */
1637
1638        /* Check ifthere is a message on the socket waiting to be read */
1639        rc = select(maxDescriptor + 1, &sockSet, NULL, NULL, &selTimeout);
1640        if(rc == 0) {
1641            FD_ZERO(&shellSet);
1642            FD_SET(shellSocketFD, &shellSet);
1643            selTimeout.tv_sec = 0;
1644            selTimeout.tv_usec = 0;
1645
1646            /* Check if there is a message on the shell socket ready to be processed */
1647            select(shellSocketFD + 1, &shellSet, NULL, NULL, &selTimeout);
1648            if(FD_ISSET(shellSocketFD, &shellSet)){
1649                MessageHandler(-1);}
1650        } else {
1651            desc_ready = rc;
1652            for(port = 0; port <= maxDescriptor && desc_ready > 0; port++) {
1653                if(FD_ISSET(port, &sockSet)) {
1654                    desc_ready -= 1;
1655
1656                    /* Check ifrequest is new or on an existing open descriptor */
1657                    if(port == cogEngSrv) {
1658                        /* If new, assign it a descriptor and give it an ID */
1659                        new_sd = AcceptTCPConnection(port);
1660             
1661                        if(new_sd < 0)
1662                            break;
1663
1664                        CE_List[Current_ID].FD = new_sd;
1665                        CE_List[Current_ID].ID_num = Current_ID;
1666                        MessageHandler(Current_ID);
1667                        Current_ID++;
1668   
1669                        FD_SET(new_sd,&sockSet);
1670                        if(new_sd > maxDescriptor)
1671                           maxDescriptor = new_sd;
1672                    } else {
1673                        /* If old, figure out which ID it coresponds to and handle it accordingly */
1674                        for(size_t z = 0; z < Current_ID; z++) {
1675                            if(CE_List[z].FD == port) {
1676                                MessageHandler(z);
1677                            }
1678                        }
1679                    }
1680                }
1681            }
1682        }
1683    }       
1684
1685    /* Close sockets */
1686    close(cogEngSrv);
1687
1688    return;
1689}
1690
Note: See TracBrowser for help on using the browser.