root/vtcross/trunk/src/service_management_layer/ServiceManagementLayer.cpp @ 445

Revision 445, 67.1 KB (checked in by bhilburn, 15 years ago)

55% done with code cleanup. Discovered a variable scoping issue that is
likely affecting code behavoir - leaving unchanged for now (see var 't'
on 827).

Line 
1/*
2 Copyright 2009 Virginia Polytechnic Institute and State University 
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7 
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17/* Inter-component communication handled by sockets and FD's. 
18 * Server support has been completely implemented and tested.
19 *
20 * Services are stored in a SQLite DB by the ID of the CE that registered them.  Service
21 * support has been completely implemented and tested.
22 *
23 * Missions are loaded from an XML file, connected with services provided by components,
24 * and run.  See the documentation for the "PerformActiveMission" below for important
25 * info.
26 */
27
28
29#include <cmath>
30#include <cstdio>
31#include <cstdlib>
32#include <cstring>
33#include <stdint.h>
34
35#include <arpa/inet.h>
36#include <iostream>
37#include <netinet/in.h>
38#include <netdb.h>
39#include <fcntl.h>
40#include <sqlite3.h>
41#include <string>
42#include <sys/ioctl.h>
43#include <sys/mman.h>
44#include <sys/socket.h>
45#include <sys/types.h>
46#include <sys/wait.h>
47
48#include "tinyxml/tinyxml.h"
49#include "tinyxml/tinystr.h"
50
51#include "vtcross/debug.h"
52#include "vtcross/error.h"
53#include "vtcross/common.h"
54#include "vtcross/components.h"
55#include "vtcross/containers.h"
56#include "vtcross/socketcomm.h"
57
58
59typedef struct services_s *services_DB;
60typedef struct data_s *data_DB;
61
62using namespace std;
63
64struct services_s {
65    string filename;
66    string tablename;
67    string command;
68    sqlite3 *db;
69    unsigned int num_columns;
70};
71
72struct data_s {
73    string filename;
74    string tablename;
75    string command;
76    sqlite3 *db;
77    unsigned int num_columns;
78};
79
80services_DB _services_DB;
81data_DB _data_DB;
82string _SML_Config;
83bool shellFound;
84
85/* Callback function used internally by some of the SQLite3 commands */
86int32_t
87callback(void *notUsed, int32_t argc, char **argv, char **azColName)
88{
89    for(size_t i = 0; i < argc; i++) {
90        LOG("%s = %s\n", azColName[i], argv[i] ? argv[i] : "NULL");
91    }
92
93    LOG("\n");
94    return 0;
95}
96
97/* Useful for spotchecking what's in the database */
98void
99printDatabase()
100{
101    /*
102    LOG("\n\n\n");
103    strcpy(_data_DB->command, "select ");
104    strcat(_data_DB->command, _data_DB->tablename);
105    strcat(_data_DB->command, ".* from ");
106    strcat(_data_DB->command, _data_DB->tablename);
107    strcat(_data_DB->command, ";");
108
109    rc = sqlite3_exec(_data_DB->db, _data_DB->command, callback, 0, &errorMsg);
110    if((rc != SQLITE_OK) && (rc != 101))
111        WARNING("SQL error: %s\n", errorMsg);
112
113    LOG("database %s, table %s:\n", _data_DB->filename, _data_DB->tablename);
114    LOG("\n\n\n");
115    */
116}
117
118ServiceManagementLayer::ServiceManagementLayer()
119{
120    LOG("Creating Service Management Layer.\n");
121
122    shellSocketFD = -1;
123    numberOfCognitiveEngines = 0;
124    CE_Present = false;
125    cogEngSrv = 1;
126}
127
128/* Free and clear the DB's associated with this SML in the destructor.
129 *
130 * Note that exiting with an error condition will cause SML to not be destructed,
131 * resulting in the DB's staying in memory until the destructor is encountered in
132 * future executions. */
133ServiceManagementLayer::~ServiceManagementLayer()
134{
135    char *errorMsg;
136    int32_t rc;         /* sqlite command return code */
137
138    _services_DB->command = "drop table ";
139    _services_DB->command.append(_services_DB->tablename);
140    rc = sqlite3_exec(_services_DB->db, _services_DB->command.c_str(), callback, 0, &errorMsg);
141    if((rc != SQLITE_OK) && (rc != 101))
142        WARNING("ServiceManagementLayer::Destructor services 'drop table' error: %s\n", errorMsg);
143
144    _services_DB->command = "vacuum";
145    rc = sqlite3_exec(_services_DB->db, _services_DB->command.c_str(), callback, 0, &errorMsg);
146    if((rc != SQLITE_OK) && (rc != 101))
147        WARNING("ServiceManagementLayer::Destructor services 'vacuum' error: %s\n", errorMsg);
148
149    free(_services_DB);
150
151    _data_DB->command = "drop table ";
152    _data_DB->command.append(_data_DB->tablename);
153    rc = sqlite3_exec(_data_DB->db, _data_DB->command.c_str(), callback, 0, &errorMsg);
154    if((rc != SQLITE_OK) && (rc != 101))
155        WARNING("ServiceManagementLayer::Destructor data 'drop table' error: %s\n", errorMsg);
156
157    _data_DB->command = "vacuum";
158    rc = sqlite3_exec(_data_DB->db, _data_DB->command.c_str(), callback, 0, &errorMsg);
159    if((rc != SQLITE_OK) && (rc != 101))
160        WARNING("ServiceManagementLayer::Destructor data 'vacuum' error: %s\n", errorMsg);
161
162    free(_data_DB);
163}
164
165/* Note that sizes of CE_List, miss, and service are hardcoded for now.
166 * Also, their sizes are hardcoded into the code in various places; a fix
167 * for a future version. */
168ServiceManagementLayer::ServiceManagementLayer(const char* SML_Config, \
169    const char* serverName, const char* serverPort, int16_t clientPort)
170{
171    LOG("Creating Service Management Layer.\n");
172
173    _SML_Config = string(SML_Config);
174    SMLport = clientPort;
175
176    ConnectToShell(serverName, serverPort);
177    CE_List = new CE_Reg[10];
178
179    miss = new Mission[10];
180    for(size_t i = 0; i < 10; i++) {
181        miss[i].services = new Service[30];
182    }
183
184    Current_ID = 0;
185
186    LoadConfiguration(SML_Config, miss);
187
188    CreateServicesDB();
189    CreateDataDB();
190}
191
192/* CALLED BY: constructor
193 * INPUTS: <none>
194 * OUTPUTS: <none>
195 *
196 * DESCRIPTION: Create and initialize a DB to hold the services registered by components
197 */
198void
199ServiceManagementLayer::CreateServicesDB()
200{
201    sqlite3_stmt *ppStmt;   /* OUT: Statement handle */
202    const char *pzTail;     /* OUT: Pointer to unused portion of zSql */
203    int32_t rc;             /* sqlite command return code */
204
205    _services_DB = new services_s;
206    _services_DB->filename="Services_Table";
207    sqlite3_open(_services_DB->filename.c_str(), &(_services_DB->db));
208
209    char *cols[] = {(char *)"ID_Num", (char *)"Service_Name"};
210
211    _services_DB->tablename="Services";
212
213    /* If program execution ends in anything other than a ordered shutdown, DB's will still
214     * be there for next run. Need to get rid of it so that old data isn't inadvertantly
215     * used in the next execution cycle. */
216    _services_DB->command = "DROP TABLE IF EXISTS Services;";     
217
218    rc = sqlite3_prepare_v2(_services_DB->db, _services_DB->command.c_str(), 128, &ppStmt, &pzTail);
219    if((rc != SQLITE_OK) && (rc != 101))
220        WARNING("ServiceManagementLayer::CreateServicesDB 'prepare_stmt' error %d\n", rc);
221
222    rc = sqlite3_step(ppStmt);
223    if((rc != SQLITE_OK) && (rc != 101))
224        WARNING("ServiceManagementLayer::CreateServicesDB 'step' error\n");
225
226    _services_DB->num_columns = 2;
227
228    /* Generate command */
229    _services_DB->command="CREATE TABLE ";
230    _services_DB->command.append(_services_DB->tablename);
231    _services_DB->command.append("(");
232    _services_DB->command.append(cols[0]);
233    _services_DB->command.append(" INT, ");
234    _services_DB->command.append(cols[1]);
235    _services_DB->command.append(" TEXT);");
236
237    /* Execute create table command */
238    rc = sqlite3_prepare_v2(_services_DB->db, _services_DB->command.c_str(), 128, &ppStmt, &pzTail);
239    if((rc != SQLITE_OK) && (rc != 101))
240        WARNING("ServiceManagementLayer::CreateServicesDB 'prepare_stmt' error %d\n", rc);
241
242    rc = sqlite3_step(ppStmt);
243    if((rc != SQLITE_OK) && (rc != 101))
244        WARNING("ServiceManagementLayer::CreateServicesDB 'step' error\n");
245}
246
247/* CALLED BY: constructor
248 * INPUTS: <none>
249 * OUTPUTS: <none>
250 *
251 * DESCRIPTION: Create and initialize a DB to hold the data sent by components
252 */
253void
254ServiceManagementLayer::CreateDataDB()
255{
256    sqlite3_stmt *ppStmt;   /* OUT: Statement handle */
257    const char *pzTail;     /* OUT: Pointer to unused portion of zSql */
258    int32_t rc;             /* sqlite command return code */
259
260    _data_DB = new data_s;
261
262    _data_DB->filename="Data_Table";
263    sqlite3_open(_data_DB->filename.c_str(), &(_data_DB->db));
264
265    char *cols[] = {(char *)"Tag", (char *)"Data"};
266
267    _data_DB->tablename = "Data";
268    _data_DB->command = "DROP TABLE IF EXISTS Data;";     
269
270    rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), 128, &ppStmt, &pzTail);
271    if((rc != SQLITE_OK) && (rc != 101))
272        WARNING("ServiceManagementLayer::CreateDataDB 'prepare_stmt' error %d\n", rc);
273
274    rc = sqlite3_step(ppStmt);
275    if((rc != SQLITE_OK) && (rc != 101))
276        WARNING("ServiceManagementLayer::CreateDataDB 'step' error\n");
277
278    _data_DB->num_columns = 2;
279
280    /* Generate command */
281    _data_DB->command = "CREATE TABLE ";
282    _data_DB->command.append(_data_DB->tablename);
283    _data_DB->command.append("(");
284    _data_DB->command.append(cols[0]);
285
286    /* First column is the name of the data (corresponding to the name of the output/input pair)
287     * It is the primary key so any subsequent data with the same name will replace the row. */
288    _data_DB->command.append(" TEXT PRIMARY KEY ON CONFLICT REPLACE, ");
289    _data_DB->command.append(cols[1]);
290    _data_DB->command.append(" TEXT);");
291
292    rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), 128, &ppStmt, &pzTail);
293    if((rc != SQLITE_OK) && (rc != 101))
294        WARNING("ServiceManagementLayer::CreateDataDB 'prepare_stmt' error %d\n", rc);
295
296    rc = sqlite3_step(ppStmt);
297    if((rc != SQLITE_OK) && (rc != 101))
298        WARNING("ServiceManagementLayer::CreateDataDB 'step' error\n");
299}
300
301/* CALLED BY: MessageHandler
302 * INPUTS: <none>
303 * OUTPUTS: <none>
304 *
305 * DESCRIPTION: Sends a message identifying this component as an SML to the Shell
306 */
307void
308ServiceManagementLayer::SendComponentType()
309{
310    SendMessage(shellSocketFD, "response_sml");
311    LOG("SML responded to GetRemoteComponentType query.\n");
312}
313
314/* CALLED BY: constructor
315 * INPUTS: |serverName| the IPv4 name of the server (127.0.0.1 for localhost)
316 *         |serverPort| the port on the server to connect to
317 * OUTPUTS: <none>
318 *
319 * DESCRIPTION: Connecting to the shell takes 2 steps
320 * 1) Establish a client socket for communication
321 * 2) Run the initial Registration/handshake routine
322 */
323void
324ServiceManagementLayer::ConnectToShell(const char* serverName, \
325        const char* serverPort)
326{
327    shellSocketFD = ClientSocket(serverName, serverPort);
328    RegisterComponent();
329}
330
331/* CALLED BY: StartSMLServer
332 * INPUTS: |ID| The ID number of the CE that has a message wating
333 * OUTPUTS: <none>
334 *
335 * DESCRIPTION: Called whenever a socket is identified as being ready for communication
336 *              This funciton reads the message and calls the appropriate helper
337 */
338void
339ServiceManagementLayer::MessageHandler(int32_t ID)
340{
341    char buffer[256];   
342    memset(buffer, 0, 256); 
343    int32_t _FD; 
344   
345    if(ID != -1)
346        _FD = CE_List[ID].FD;
347    else
348        _FD = shellSocketFD;
349
350    ReadMessage(_FD, buffer);
351   
352    //--------Policy Engine Stuff - no policy engine support in this version-------//
353
354    //printf("********* %s **********\n", buffer);
355    // TODO
356    // If we send integer op codes rather than strings, this process will be
357    // MUCH faster since instead of donig string compares we can simply
358    // switch on the integer value...
359    /*if(strcmp(buffer, "register_service") == 0) {
360        if(strcmp(buffer, "policy_geo") == 0) {
361        }
362        else if(strcmp(buffer, "policy_time") == 0) {
363        }
364        else if(strcmp(buffer, "policy_spectrum") == 0) {
365        }
366        else if(strcmp(buffer, "policy_spacial") == 0) {
367        }
368    }
369    else if(strcmp(buffer, "deregister_service") == 0) {
370        if(strcmp(buffer, "policy_geo") == 0) {
371        }
372        else if(strcmp(buffer, "policy_time") == 0) {
373        }
374        else if(strcmp(buffer, "policy_spectrum") == 0) {
375        }
376        else if(strcmp(buffer, "policy_spacial") == 0) {
377        }
378    }*/
379
380    //Go down the list to call the appropriate function
381    if(strcmp(buffer, "query_component_type") == 0) {
382        SendComponentType();
383    }
384    else if(strcmp(buffer, "reset_sml") == 0) {
385        Reset();
386    }
387    else if(strcmp(buffer, "shutdown_sml") == 0) {
388        Shutdown();
389    }
390    else if(strcmp(buffer, "register_engine_cognitive") == 0) {
391        RegisterCognitiveEngine(ID);
392    }
393    else if(strcmp(buffer, "register_service") == 0) {
394        ReceiveServices(ID);
395    }
396    else if(strcmp(buffer, "send_component_type") == 0) {
397        SendComponentType();
398    }
399    else if(strcmp(buffer, "list_services") == 0) {
400        ListServices();
401    }
402    else if(strcmp(buffer, "set_active_mission") == 0) {
403        SetActiveMission();
404    }
405    else if(strcmp(buffer, "request_optimization") == 0) {
406        PerformActiveMission();
407    }
408    else if(strcmp(buffer, "deregister_engine_cognitive") == 0) {
409        DeregisterCognitiveEngine(ID);
410    }
411    else if(strcmp(buffer, "deregister_service") == 0) {
412        DeregisterServices(ID);
413    }
414}
415
416//TODO Finish
417/* CALLED BY: MessageHandler
418 * INPUTS: <none>
419 * OUTPUTS: <none>
420 *
421 * DESCRIPTION: Deregisters the component from the Shell.
422 */
423void
424ServiceManagementLayer::Shutdown()
425{
426    DeregisterComponent();
427}
428
429//TODO Finish
430/* CALLED BY: MessageHandler
431 * INPUTS: <none>
432 * OUTPUTS: <none>
433 *
434 * DESCRIPTION: Deregisters the component from the Shell
435 */
436void
437ServiceManagementLayer::Reset()
438{
439    DeregisterComponent();
440    ReloadConfiguration();
441}
442
443/* CALLED BY: ConnectToShell
444 * INPUTS: <none>
445 * OUTPUTS: <none>
446 *
447 * DESCRIPTION: Sends the registration message to the Shell
448 */
449void
450ServiceManagementLayer::RegisterComponent()
451{
452    SendMessage(shellSocketFD, "register_sml");
453    LOG("ServiceManagementLayer:: Registration message sent.\n");
454}
455
456/* CALLED BY: Shutdown
457 * INPUTS: <none>
458 * OUTPUTS: <none>
459 *
460 * DESCRIPTION: Closes the client socket with the shell, sends a deregstration message
461 */
462void
463ServiceManagementLayer::DeregisterComponent()
464{
465    SendMessage(shellSocketFD, "deregister_sml");
466    LOG("ServiceManagementLayer:: Deregistration message sent.\n");
467
468    shutdown(shellSocketFD, 2);
469    close(shellSocketFD);
470    shellSocketFD = -1;
471    LOG("ServiceManagementLayer:: Shell socket closed.\n");
472}
473
474
475/* CALLED BY: RegisterCognitiveEngine
476 * INPUTS: |ID| The ID number of the component where the data is to be transfered to
477 * OUTPUTS: <none>
478 *
479 * DESCRIPTION: Streams config data directly from the shell to the CE, and checks
480 * for an "ack" message from the CE after every sent message
481 * to know when to stop communication.
482 *
483 * NOTE: Modified to check the incoming message buffer rather than the outgoing
484 * message buffer to avoid a portion of the delay. May change this again to handle
485 * data more inteligently, taking advantage of it's properties.
486 */
487void
488ServiceManagementLayer::TransferRadioConfiguration(int32_t ID)
489{
490    struct timeval selTimeout;
491    fd_set sockSet;
492    int32_t rc = 1;
493    char buffer[256];
494
495    /* Send data until the CE sends an ACK message back */
496    while(rc != 0) {
497        memset(buffer, 0, 256);
498
499        /* Receive data from Shell */
500        ReadMessage(shellSocketFD, buffer);
501
502        /* Send data to CE */
503        SendMessage(CE_List[ID].FD, buffer);
504        FD_ZERO(&sockSet);
505        FD_SET(shellSocketFD, &sockSet);
506        selTimeout.tv_sec = 0;
507        selTimeout.tv_usec = 5000;
508
509        /* Check if there is a message on the shell ready to be processed */
510        rc = select(shellSocketFD + 1, &sockSet, NULL, NULL, &selTimeout);
511    }
512
513    memset(buffer, 0, 256);
514    ReadMessage(CE_List[ID].FD, buffer);
515    SendMessage(shellSocketFD, buffer);
516}
517
518
519/* CALLED BY: RegisterCognitiveEngine
520 * INPUTS: |ID| The ID number of the component where the data is to be transfered to
521 * OUTPUTS: <none>
522 *
523 * DESCRIPTION: Simmilar to TransferRadioConfig, just with Experience data
524 *
525 * NOTE: Modified to check the incoming message buffer rather than the outgoing
526 * message buffer to avoid a portion of the delay. May change this again to handle
527 * data more inteligently, taking advantage of it's properties.
528 */
529void
530ServiceManagementLayer::TransferExperience(int32_t ID)
531{
532    struct timeval selTimeout;
533    fd_set sockSet;
534    int32_t rc = 1;
535    char buffer[256];
536    /* Send data until the CE sends an ACK message back */
537    while(rc != 0) {
538        memset(buffer, 0, 256);
539
540        /* Receive data from Shell */
541        ReadMessage(shellSocketFD, buffer);
542
543        /* Send data to CE */
544        SendMessage(CE_List[ID].FD, buffer);
545        FD_ZERO(&sockSet);
546        FD_SET(shellSocketFD, &sockSet);
547        selTimeout.tv_sec = 0;
548        selTimeout.tv_usec = 5000;
549
550        /* Check if there is a message on the shell ready to be processed */
551        rc = select(shellSocketFD + 1, &sockSet, NULL, NULL, &selTimeout);
552    }
553
554    memset(buffer, 0, 256);
555    ReadMessage(CE_List[ID].FD, buffer);
556    SendMessage(shellSocketFD, buffer);
557}
558
559/* CALLED BY: MessageHandler
560 * INPUTS: |ID| The ID number of the component where service is located
561 * OUTPUTS: <none>
562 *
563 * DESCRIPTION: Inserts a service into the DB with the ID of the component where it exists
564 */
565void
566ServiceManagementLayer::ReceiveServices(int32_t ID)
567{
568    char buffer[256];
569    memset(buffer, 0, 256);
570    ReadMessage(CE_List[ID].FD, buffer);
571
572    char *cols[] = {(char *) "ID_Num", (char *) "Service_Name"};
573
574    /* Generate command */
575    _services_DB->command = "insert into ";
576    _services_DB->command.append(_services_DB->tablename);
577    _services_DB->command.append(" (");
578    _services_DB->command.append(cols[0]);
579    _services_DB->command.append(", ");
580    _services_DB->command.append(cols[1]);
581    _services_DB->command.append(") ");
582    _services_DB->command.append(" values(");
583
584    char temp[3];
585    memset(temp, 0, 3);
586    sprintf(temp, "%d", ID);
587
588    _services_DB->command.append(temp);
589    _services_DB->command.append(", '");
590    _services_DB->command.append(buffer);
591    _services_DB->command.append("');");
592   
593    /* Execute add command */
594    char *errorMsg;
595    int rc = sqlite3_exec(_services_DB->db, _services_DB->command.c_str(), callback, 0, &errorMsg);
596    if((rc != SQLITE_OK) && (rc != 101))
597        WARNING("ServiceManagementLayer::RecieveServices DB Error %s\n", errorMsg);
598}
599
600/* CALLED BY: MessageHandler
601 * INPUTS: <none>
602 * OUTPUTS: <none>
603 *
604 * DESCRIPTION: This method associates the services that components provide with the
605 * services that are requested in the mission. Each service in the mission is given
606 * the ID and FD of a component that has registered to provide that service. Deregistration
607 * is okay until this method is called without a reload, but if deregistration occurs after this
608 * method is called it needs to be called again even if other engines also provide the services
609 */
610void
611ServiceManagementLayer::SetActiveMission()
612{
613    char buffer[256];
614    memset(buffer, 0, 256);
615    ReadMessage(shellSocketFD, buffer);
616
617    uint32_t missID = atoi(buffer);
618    for(activeMission = 0; activeMission < 10; activeMission++) {
619        /* Find the active mission by comparing mission ID's */
620        if(miss[activeMission].missionID == missID)
621            break;
622    }
623
624    LOG("ServiceManagementLayer:: Received Set Active Mission command: %i.\n", missID);
625
626    /* For each service in the mission */
627    for(size_t i = 0; i < miss[activeMission].numServices; i++) {
628        /* Check whether the current service is an actual service or a conditional */
629        if(miss[activeMission].services[i].name.compare("if") && \
630                miss[activeMission].services[i].name.compare("dowhile") && \
631                miss[activeMission].services[i].name.compare("shell")) {
632                /* If it is a service, search the database of registered services to find
633                 * the ID of the component that registered it */
634                _services_DB->command="select ";
635                _services_DB->command.append(_services_DB->tablename);
636                _services_DB->command.append(".* from ");
637                _services_DB->command.append( _services_DB->tablename);
638                _services_DB->command.append(" where Service_Name=='");
639                _services_DB->command.append(miss[activeMission].services[i].name);
640                _services_DB->command.append("';");
641   
642                sqlite3_stmt * pStatement;
643                int32_t rc = sqlite3_prepare_v2(_services_DB->db, _services_DB->command.c_str(), \
644                        -1, &pStatement, NULL);
645                if(rc == SQLITE_OK) {
646                    if (sqlite3_step(pStatement) == SQLITE_ROW)
647                        miss[activeMission].services[i].componentID = sqlite3_column_int(pStatement, 0);
648                    else {
649                        WARNING("services_DB:: Mission requires service %s ", \
650                                miss[activeMission].services[i].name.c_str());
651                        WARNING("not provided by any connected component.\n");
652                        rc = 31337;
653                    }
654                } else {
655                    WARNING("services_DB:: Error executing SQL statement. rc = %i\n%s\n", \
656                            rc, _services_DB->command.c_str());
657                }
658
659                sqlite3_finalize(pStatement);
660                miss[activeMission].services[i].socketFD = \
661                    CE_List[miss[activeMission].services[i].componentID].FD;
662        }
663        /* TODO Nothing to be done for conditionals at this stage */
664    }
665 
666    SendMessage(shellSocketFD, "ack");
667    LOG("ServiceManagementLayer:: Done setting active mission.\n");
668}
669
670/* CALLED BY: PerformActiveMission
671 * INPUTS: |sourceID| ID of the service that is being processed
672 * OUTPUTS: <none>
673 *
674 * DESCRIPTION: This is a helper method for the "PerformActiveMission" function
675 * NOTE: This function has changed drastically from the previous implementation
676 *
677 * Takes an ID of a service. For that service, finds inputs in DB and forwords
678 * those on to the engine after sending comm-starting messages. Afterwords, listenes
679 * for the outputs so that it can store those in the database for future services or
680 * the overall output
681 */
682void
683ServiceManagementLayer::TransactData(int32_t sourceID)
684{
685    char buffer[256];
686    std::string data;
687    char *cols[] = {(char *) "Tag", (char *) "Data"};
688    char *token;
689
690   /* Send a message directly to the shell */
691   if(miss[activeMission].services[sourceID].name.find("shell") != string::npos) {
692       shellFound=true;
693
694       int32_t k = 0;
695       while((k < 10) && (!miss[activeMission].input[k].empty())) {
696           k++;
697       }
698
699       sprintf(buffer, "%d", k);
700       SendMessage(shellSocketFD, buffer);
701       for(int32_t t = 0; t < k; t++) {
702           memset(buffer, 0 , 256);
703           _data_DB->command="select ";
704           _data_DB->command.append(_data_DB->tablename);
705           _data_DB->command.append(".* from ");
706           _data_DB->command.append(_data_DB->tablename);
707           _data_DB->command.append(" where Tag=='");
708           _data_DB->command.append(miss[activeMission].input[t]);
709           _data_DB->command.append("';");
710           sqlite3_stmt * pStatement;
711
712           int32_t rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), \
713                   -1, &pStatement, NULL);
714           if(rc == SQLITE_OK) {
715               if(sqlite3_step(pStatement) == SQLITE_ROW)
716                   data=((const char*) sqlite3_column_text(pStatement, 1));
717               else {
718                   LOG("3data_DB:: Data not yet in DB., %s\n", _data_DB->command.c_str());
719                   rc = 31337;
720               }
721           }
722           else {
723               WARNING("data_DB:: Error executing SQL statement. rc = %i\n%s\n", \
724                       rc,_data_DB->command.c_str());
725           }
726
727           sqlite3_finalize(pStatement);
728           token = strtok((char *) data.c_str(), "@");
729           token = strtok(NULL, "@");
730           SendMessage(shellSocketFD, token);
731           token = strtok(NULL, "@");
732           SendMessage(shellSocketFD, token);
733       }
734
735       return;
736   }
737
738    /* If this is a service command and not a shell command... */
739    /* Transmission starting messages */
740    SendMessage(miss[activeMission].services[sourceID].socketFD, "request_optimization_service");
741    SendMessage(miss[activeMission].services[sourceID].socketFD, \
742            miss[activeMission].services[sourceID].name.c_str());
743}
744
745/* CALLED BY: MessageHandler
746 * INPUTS: <none>
747 * OUTPUTS: <none>
748 *
749 * DESCRIPTION: This function works by first sending the inputs from the shell to
750 * the appropriate components. The first service should begin immeadiately, as
751 * should any others who have all of their input parameters. When they complete,
752 * the output path is found and the data is transfered as it becomes available
753 * Presumably at this point the second function has all of it's parameters, so it
754 * begins to compute, and the cycle repeats.
755 *
756 * Rules for active missions (currently)
757 * -Five inputs/outputs per service and per mission
758 * -All ordering constraints have been relaxed in this version; all data is stored
759 *  locally and only sent when requested
760 * -If and while support fully implemented - up to three levels (if's can be nested, but dowhiles cannot)
761 * -For dowhiles, assumes loop condition determined on last line
762 *
763 * -IMPORTANT: DB uses '@' to seperate individual statements; using '@' in the data
764 *  stream will result in incorrect behavior
765 */
766void
767ServiceManagementLayer::PerformActiveMission()
768{
769    shellFound = false;
770    std::string data_param;
771    std::string data_obsv;
772    std::string data;
773    std::string input;
774    std::string check;
775
776    char buffer[256];
777    char buffer1[256];
778    std::string token, token2;
779    std::string data2;
780
781    int32_t rc;
782    char *errorMsg;
783    char *cols[] = {(char *) "Tag", (char *) "Data"};
784
785    LOG("ServiceManagementLayer:: Received PerformActiveMission command.\n");
786
787    /* Get the inputs */
788    memset(buffer, 0, 256);
789    ReadMessage(shellSocketFD, buffer);
790
791    /* Receive Set of Parameters */
792    memset(buffer, 0, 256);
793    ReadMessage(shellSocketFD, buffer);
794    int32_t t = atoi(buffer);
795    for(size_t m = 0; m < t; m++) {
796        memset(buffer1, 0, 256);
797        ReadMessage(shellSocketFD, buffer1);
798        _data_DB->command="insert into ";
799        _data_DB->command.append(_data_DB->tablename);
800        _data_DB->command.append(" (");
801        _data_DB->command.append(cols[0]);
802        _data_DB->command.append(", ");
803        _data_DB->command.append(cols[1]);
804        _data_DB->command.append(") ");
805
806        memset(buffer, 0, 256);
807        ReadMessage(shellSocketFD, buffer);
808        _data_DB->command.append(" values('");
809        _data_DB->command.append(buffer1);
810        _data_DB->command.append("', '1@");
811        _data_DB->command.append(buffer1);
812        _data_DB->command.append("@");
813        _data_DB->command.append(buffer);
814        _data_DB->command.append("');");
815
816        rc = sqlite3_exec(_data_DB->db, _data_DB->command.c_str(), callback, 0, &errorMsg);
817        if((rc != SQLITE_OK) && (rc != 101))
818           WARNING("SQL error: %s\n", errorMsg);
819    }
820
821    int32_t numstatements[3] = {0 ,0 ,0};
822    for(size_t i; i < miss[activeMission].numServices; i++) {
823        if(miss[activeMission].services[i].name.compare("if") == 0) {
824            input.clear();
825            check.clear();
826
827            for(size_t t = 0; t < 10; t++) {
828                if(!miss[activeMission].services[i].output[t].empty()) {
829                    input = miss[activeMission].services[i - numstatements[0] - 1].output[t];
830
831                    _data_DB->command="SELECT ";
832                    _data_DB->command.append(_data_DB->tablename);
833                    _data_DB->command.append(".* from ");
834                    _data_DB->command.append(_data_DB->tablename);
835                    _data_DB->command.append(" where Tag=='");
836                    _data_DB->command.append(input);
837                    _data_DB->command.append("';");
838
839                    sqlite3_stmt *pStatement;
840                    rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), \
841                            -1, &pStatement, NULL);
842                    if(rc == SQLITE_OK) {
843                        if(sqlite3_step(pStatement) == SQLITE_ROW)
844                             data = (const char *) sqlite3_column_text(pStatement, 1);
845                        else {
846                            LOG("1 data_DB:: Data not yet in DB.\n");
847                            rc=31337;
848                        }
849                    } else {
850                        LOG("data_DB:: Error executing SQL statement. rc = %i\n%s\n", \
851                                rc,_data_DB->command.c_str());
852                    }
853
854                    sqlite3_finalize(pStatement);
855                    int32_t pos = data.find_last_of("@", data.length() - 2);
856                    token = data.substr(pos + 1);
857                    token.erase(token.length() - 1);
858                    data.clear();
859                    break;
860                }
861            }
862
863            bool doit = false;
864            if(miss[activeMission].services[i].output[t].find(">") != string::npos) {
865                std::string data2;
866                _data_DB->command="SELECT ";
867                _data_DB->command.append(_data_DB->tablename);
868                _data_DB->command.append(".* from ");
869                _data_DB->command.append(_data_DB->tablename);
870                _data_DB->command.append(" where Tag=='");
871                _data_DB->command.append(miss[activeMission].services[i].output[t].erase(0, 1));
872                _data_DB->command.append("';");
873                sqlite3_stmt *pStatement;
874
875                rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), \
876                        -1, &pStatement, NULL);
877                if(rc == SQLITE_OK) {
878                    if(sqlite3_step(pStatement) == SQLITE_ROW)
879                         data2 = (const char *) sqlite3_column_text(pStatement, 1);
880                    else {
881                        LOG("2 data_DB:: Data not yet in DB.\n");
882                        rc = 31337;
883                    }
884                } else {
885                    LOG("data_DB:: Error executing SQL statement. rc = %i\n%s\n", \
886                            rc, _data_DB->command.c_str());
887                }
888
889                sqlite3_finalize(pStatement);
890
891                int32_t pos = data2.find_last_of("@", data2.length() - 2);
892                token2 = data2.substr(pos + 1);
893                token2.erase(token2.length() - 1);
894                if(atof(token.c_str()) > atof(token2.c_str()))
895                    doit = true;
896            }
897            else if(miss[activeMission].services[i].output[t].find(token) != string::npos)
898                doit = true;
899
900            if(doit) {
901                for(size_t k = i + 1; k <= i+miss[activeMission].services[i].num_conds; k++) {
902                    if(miss[activeMission].services[k].name.compare("if") == 0) {
903                        input.clear();
904                        check.clear();
905                        for(size_t t = 0; t < 10; t++) {
906                            if(!miss[activeMission].services[k].output[t].empty()) {
907                                input = miss[activeMission].services[k - numstatements[1] - 1].output[t];
908                                _data_DB->command="SELECT ";
909                                _data_DB->command.append(_data_DB->tablename);
910                                _data_DB->command.append(".* from ");
911                                _data_DB->command.append(_data_DB->tablename);
912                                _data_DB->command.append(" where Tag=='");
913                                _data_DB->command.append(input);
914                                _data_DB->command.append("';");
915
916                                sqlite3_stmt *pStatement;
917                                rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), \
918                                        -1, &pStatement, NULL);
919                                if(rc == SQLITE_OK) {
920                                    if(sqlite3_step(pStatement) == SQLITE_ROW)
921                                         data = (const char *) sqlite3_column_text(pStatement, 1);
922                                    else {
923                                        LOG("3 data_DB:: Data not yet in DB.\n");
924                                        rc = 31337;
925                                    }
926                                } else {
927                                    LOG("data_DB:: Error executing SQL statement. rc = %i\n%s\n", \
928                                            rc,_data_DB->command.c_str());
929                                }
930
931                                sqlite3_finalize(pStatement);
932                                int32_t pos = data.find_last_of("@", data.length() - 2);
933                                token = data.substr(pos + 1);
934                                token.erase(token.length() - 1);
935                                break;
936                            }
937                        }
938
939                        bool doit = false;
940                        if(miss[activeMission].services[k].output[t].find(">") != string::npos) {
941                            std::string data2;
942                            _data_DB->command="SELECT ";
943                            _data_DB->command.append(_data_DB->tablename);
944                            _data_DB->command.append(".* from ");
945                            _data_DB->command.append(_data_DB->tablename);
946                            _data_DB->command.append(" where Tag=='");
947                            _data_DB->command.append(miss[activeMission].services[k].output[t].erase(0, 1));
948                            _data_DB->command.append("';");
949                            sqlite3_stmt * pStatement;
950                            rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), -1, &pStatement, NULL);
951                            if (rc == SQLITE_OK){
952                            if (sqlite3_step(pStatement) == SQLITE_ROW)
953                                 data2 = (const char *) sqlite3_column_text(pStatement, 1);
954                            else {
955                                printf("4 data_DB:: Data not yet in DB.\n");
956                                rc=31337;
957                            }
958                            } else {
959                            printf("data_DB:: Error executing SQL statement. rc = %i\n%s\n",rc,_data_DB->command.c_str());
960                            }
961                                sqlite3_finalize(pStatement);
962                            int pos = data2.find_last_of("@", data2.length()-2);
963                            token2 = data2.substr(pos+1);
964                            token2.erase(token2.length()-1);
965                                //printf("token=%s token2=%s\n", token.c_str(), token2.c_str());
966                                if(atof(token.c_str()) > atof(token2.c_str()))
967                                doit=true;
968                        }
969                        else if (miss[activeMission].services[k].output[t].find(token) != string::npos)
970                        doit=true;
971                        if(doit){
972                        //printf("Level 1:if taken\n");
973                        for(uint16_t j = k+1; j <= k+miss[activeMission].services[k].num_conds; j++){
974                            if(miss[activeMission].services[j].name.compare("if")==0){
975                            //printf("Level 2:if detected\n");
976                                input.clear();
977                                check.clear();
978                                for(t = 0; t < 10; t++){
979                                if(!miss[activeMission].services[j].output[t].empty()){
980                                    input=miss[activeMission].services[j-numstatements[2]-1].output[t];
981                                           _data_DB->command="SELECT ";
982                                        _data_DB->command.append(_data_DB->tablename);
983                                       _data_DB->command.append(".* from ");
984                                       _data_DB->command.append(_data_DB->tablename);
985                                      _data_DB->command.append(" where Tag=='");
986                                     _data_DB->command.append(input);
987                                    _data_DB->command.append("';");
988                                    sqlite3_stmt * pStatement;
989                                    rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), -1, &pStatement, NULL);
990                                    if (rc == SQLITE_OK){
991                                    if (sqlite3_step(pStatement) == SQLITE_ROW)
992                                         data = (const char *) sqlite3_column_text(pStatement, 1);
993                                    else {
994                                        printf("5 data_DB:: Data not yet in DB.\n");
995                                        rc=31337;
996                                    }
997                                    } else {
998                                    printf("data_DB:: Error executing SQL statement. rc = %i\n%s\n",rc,_data_DB->command.c_str());
999                                    }
1000                                        sqlite3_finalize(pStatement);
1001                                    //printf("data=%s\n", data.c_str());
1002                                    int pos = data.find_last_of("@", data.length()-2);
1003                                    token = data.substr(pos+1);
1004                                    token.erase(token.length()-1);
1005                                    //printf("token=%s\n", token.c_str());
1006                                    data.clear();
1007                                    break;
1008                                }
1009                                }
1010                                bool doit = false;
1011                                if(miss[activeMission].services[j].output[t].find(">") != string::npos){
1012                                    _data_DB->command="SELECT ";
1013                                    _data_DB->command.append(_data_DB->tablename);
1014                                    _data_DB->command.append(".* from ");
1015                                   _data_DB->command.append(_data_DB->tablename);
1016                                    _data_DB->command.append(" where Tag=='");
1017                                    _data_DB->command.append(miss[activeMission].services[j].output[t].erase(0, 1));
1018                                    _data_DB->command.append("';");
1019                                    sqlite3_stmt * pStatement;
1020                                    rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), -1, &pStatement, NULL);
1021                                    if (rc == SQLITE_OK){
1022                                    if (sqlite3_step(pStatement) == SQLITE_ROW)
1023                                         data2 = (const char *) sqlite3_column_text(pStatement, 1);
1024                                    else {
1025                                        printf("6 data_DB:: Data not yet in DB.\n");
1026                                        rc=31337;
1027                                    }
1028                                    } else {
1029                                    printf("data_DB:: Error executing SQL statement. rc = %i\n%s\n",rc,_data_DB->command.c_str());
1030                                    }
1031                                        sqlite3_finalize(pStatement);
1032
1033                                int pos = data2.find_last_of("@", data2.length()-2);
1034                                token2 = data2.substr(pos+1);
1035                                token2.erase(token2.length()-1);
1036                                    if(atof(token.c_str()) > atof(token2.c_str()))
1037                                    doit=true;
1038                                data.clear();
1039                                }
1040                                else if (miss[activeMission].services[j].output[t].find(token) != string::npos)
1041                                doit=true;
1042                                if(doit){
1043                                //printf("Level 1:if taken\n");
1044                                for(uint16_t l = j+1; l <= j+miss[activeMission].services[j].num_conds; l++){
1045                                    TransactData(l);
1046                                }
1047                                }
1048                                else
1049                                //printf("Level 2:if not taken\n");
1050                                numstatements[2] +=miss[activeMission].services[j].num_conds+1;
1051                                j+=miss[activeMission].services[j].num_conds;
1052                                //printf("Level 2:doneif %d, %d, %d\n", numstatements, miss[activeMission].services[i].num_conds, i);
1053                            }else if(miss[activeMission].services[j].name.compare("dowhile")==0){
1054                                numstatements[0]=0;
1055                                //printf("Level 2:while detected\n");
1056                                while(true){
1057                                    uint16_t l;
1058                                    for(l = j+1; l <= j+miss[activeMission].services[j].num_conds; l++){
1059                                    TransactData(l);
1060                                    }
1061                                    data.clear();
1062                                    input.clear();
1063                                    check.clear();
1064                                    int t;
1065                                    for(t = 0; t < 10; t++){
1066                                    if(!miss[activeMission].services[j].output[t].empty()){
1067                                        input=miss[activeMission].services[l-2].output[t];
1068                                        //printf("%s\n",input.c_str());
1069                                        _data_DB->command="SELECT ";
1070                                        _data_DB->command.append(_data_DB->tablename);
1071                                        _data_DB->command.append(".* from ");
1072                                        _data_DB->command.append(_data_DB->tablename);
1073                                        _data_DB->command.append(" where Tag=='");
1074                                        _data_DB->command.append(input);
1075                                        _data_DB->command.append("';");
1076                                        sqlite3_stmt * pStatement;
1077
1078                                        rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), -1, &pStatement, NULL);
1079                                        if (rc == SQLITE_OK){
1080                                        if (sqlite3_step(pStatement) == SQLITE_ROW)
1081                                             data = (const char *) sqlite3_column_text(pStatement, 1);
1082                                        else {
1083                                            printf("7 data_DB:: Data not yet in DB.: %s\n", _data_DB->command.c_str());
1084                                            rc=31337;
1085                                        }
1086                                        } else {
1087                                          printf("data_DB:: SQL statement error. rc = %i\n%s\n",rc,_data_DB->command.c_str());
1088                                        }
1089                                            sqlite3_finalize(pStatement);
1090                                        //printf("Level 2:data=%s\n", data.c_str());
1091                                            int pos = data.find_last_of("@", data.length()-2);
1092                                            token = data.substr(pos+1);
1093                                            token.erase(token.length()-1);
1094                                        //printf("Level 2:token=%s\n", token);
1095                                        break;
1096                                    }
1097                                    }
1098                                    //printf("Level 2:--- %s  %s---\n", miss[activeMission].services[j].output[t].c_str(), token);
1099                                    if(miss[activeMission].services[j].output[t].find(token) != string::npos){
1100                                    //printf("Level 2:while taken again!\n");
1101                                    }
1102                                    else {
1103                                    //printf("Level 2:no more while\n");
1104                                    break;}
1105                                }
1106                                j+=miss[activeMission].services[j].num_conds;
1107                            }
1108                            else{
1109                                //printf("Level 2: no conditional\n");
1110                                numstatements[2]=0;
1111                                    TransactData(j);
1112                            }
1113                        }
1114                        }
1115                        else{
1116                        //printf("Level 1: if not taken\n");
1117                        }
1118                            numstatements[1] +=miss[activeMission].services[k].num_conds+1;
1119                            k+=miss[activeMission].services[k].num_conds;
1120                        //printf("doneif %d, %d, %d\n", numstatements, miss[activeMission].services[i].num_conds, i);
1121                    }else if(miss[activeMission].services[k].name.compare("dowhile")==0){
1122                        numstatements[0]=0;
1123                        //printf("Level 1: while detected\n");
1124                        while(true){
1125                        uint16_t j;
1126                            for(j = k+1; j <= k+miss[activeMission].services[k].num_conds; j++){
1127                            TransactData(j);
1128                            }
1129                            data.clear();
1130                            input.clear();
1131                            check.clear();
1132                            int t;
1133                            for(t = 0; t < 10; t++){
1134                            if(!miss[activeMission].services[k].output[t].empty()){
1135                                input=miss[activeMission].services[j-1].output[t];
1136                                _data_DB->command="SELECT ";
1137                                _data_DB->command.append(_data_DB->tablename);
1138                                _data_DB->command.append(".* from ");
1139                                _data_DB->command.append(_data_DB->tablename);
1140                                _data_DB->command.append(" where Tag=='");
1141                                _data_DB->command.append(input);
1142                                _data_DB->command.append("';");
1143                                sqlite3_stmt * pStatement;
1144                                rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), -1, &pStatement, NULL);
1145                                if (rc == SQLITE_OK){
1146                                if (sqlite3_step(pStatement) == SQLITE_ROW)
1147                                     data = (const char *) sqlite3_column_text(pStatement, 1);
1148                                else {
1149                                    printf("8 data_DB:: Data not yet in DB.\n");
1150                                    rc=31337;
1151                                }
1152                                } else {
1153                                printf("data_DB:: Error executing SQL statement. rc = %i\n%s\n",rc,_data_DB->command.c_str());
1154                                }
1155                                    sqlite3_finalize(pStatement);
1156                                //printf("Level 1:data=%s\n", data.c_str());
1157                                int pos = data.find_last_of("@", data.length()-2);
1158                                token = data.substr(pos+1);
1159                                token.erase(token.length()-1);
1160                                //printf("Level 1:token=%s\n", token);
1161                                break;
1162                            }
1163                            }
1164                            //printf("Level 1:--- %s  %s---\n", miss[activeMission].services[k].output[t].c_str(), token);
1165                            if(miss[activeMission].services[k].output[t].find(token) != string::npos){
1166                            //printf("Level 1:while taken again!\n");
1167                            }
1168                            else {
1169                            //printf("Level 1:While finished\n");
1170                            break;}
1171                        }
1172                        k+=miss[activeMission].services[k].num_conds;
1173                    }
1174                    else{
1175                        //printf("Level1:No conditional\n");
1176                        numstatements[1]=0;
1177                            TransactData(k);
1178                    }
1179                }
1180            }
1181                numstatements[0] +=miss[activeMission].services[i].num_conds+1;
1182            i+=miss[activeMission].services[i].num_conds;
1183        }
1184        else if(miss[activeMission].services[i].name.compare("dowhile")==0)
1185        {
1186            numstatements[0]=0;
1187            //printf("Level 0: while detected\n");
1188            while(true){
1189            uint16_t k;
1190                for(k = i+1; k <= i+miss[activeMission].services[i].num_conds; k++){
1191                TransactData(k);
1192                }
1193                data.clear();
1194                input.clear();
1195                check.clear();
1196                int t;
1197                for(t = 0; t < 10; t++){
1198                if(!miss[activeMission].services[i].output[t].empty()){
1199                    input=miss[activeMission].services[k-1].output[t];
1200                    _data_DB->command="SELECT ";
1201                    _data_DB->command.append(_data_DB->tablename);
1202                    _data_DB->command.append(".* from ");
1203                    _data_DB->command.append(_data_DB->tablename);
1204                    _data_DB->command.append(" where Tag=='");
1205                    _data_DB->command.append(input);
1206                    _data_DB->command.append("';");
1207                    sqlite3_stmt * pStatement;
1208                    rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), -1, &pStatement, NULL);
1209                    if (rc == SQLITE_OK){
1210                    if (sqlite3_step(pStatement) == SQLITE_ROW)
1211                         data = (const char *) sqlite3_column_text(pStatement, 1);
1212                    else {
1213                        printf("10data_DB:: Data not yet in DB.\n");
1214                        rc=31337;
1215                    }
1216                    } else {
1217                    printf("data_DB:: Error executing SQL statement. rc = %i\n%s\n",rc,_data_DB->command.c_str());
1218                    }
1219                        sqlite3_finalize(pStatement);
1220                    int pos = data.find_last_of("@", data.length()-2);
1221                    token = data.substr(pos+1);
1222                    token.erase(token.length()-1);
1223                    break;
1224                }
1225                }
1226                //printf("Level 0:--- %s  %s---\n", miss[activeMission].services[i].output[t].c_str(), token);
1227                if(miss[activeMission].services[i].output[t].find(token) != string::npos){
1228                //printf("Level 0:while taken again!\n");
1229                }
1230                else {
1231                //printf("Level 0:no more while\n");
1232                break;}
1233            }
1234            i+=miss[activeMission].services[i].num_conds;
1235        }
1236        else{
1237            numstatements[0]=0;
1238            //printf("Level 0: No conditional\n");
1239            TransactData(i);
1240        }
1241    }
1242
1243    int32_t i = 0;
1244    data.clear();
1245
1246    if(!shellFound)
1247    {
1248    int k = 0;
1249    while(k<10 && !miss[activeMission].input[k].empty()){
1250        k++;
1251    }
1252    sprintf(buffer, "%d", k);
1253    SendMessage(shellSocketFD, buffer);
1254    for(int t = 0; t < k; t++){
1255        SendMessage(shellSocketFD, miss[activeMission].input[t].c_str());
1256        SendMessage(shellSocketFD, "0");
1257    }
1258    }
1259
1260
1261    LOG("ServiceManagementLayer:: Done performing active mission.\n");
1262    /*
1263    strcpy(_data_DB->command, "select ");
1264    strcat(_data_DB->command, _data_DB->tablename);
1265    strcat(_data_DB->command, ".* from ");
1266    strcat(_data_DB->command, _data_DB->tablename);
1267    strcat(_data_DB->command, ";");
1268
1269    // execute print (select all)  command 
1270    rc = sqlite3_exec(_data_DB->db, _data_DB->command, callback, 0, &errorMsg);
1271    if( rc!=SQLITE_OK && rc!=101 )
1272        fprintf(stderr, "SQL error: %s\n", errorMsg);
1273    printf("database %s, table %s:\n", _data_DB->filename, _data_DB->tablename);
1274    printf("\n\n\n");*/
1275}
1276
1277
1278/* CALLED BY: MessageHandler
1279 * INPUTS: <none>
1280 * OUTPUTS: <none>
1281 *
1282 * DESCRIPTION: Print a list of the services currently registered and the ID's of the components that registered them
1283 */
1284void
1285ServiceManagementLayer::ListServices()
1286{
1287    _services_DB->command="select ";
1288    _services_DB->command.append(_services_DB->tablename);
1289    _services_DB->command.append(".* from ");
1290    _services_DB->command.append(_services_DB->tablename);
1291    _services_DB->command.append(";");
1292
1293    // execute print (select all)  command   
1294    char *errorMsg;
1295    int rc = sqlite3_exec(_services_DB->db, _services_DB->command.c_str(), callback, 0, &errorMsg);
1296    if( rc!=SQLITE_OK && rc!=101 )
1297        fprintf(stderr, "SQL error: %s\n", errorMsg);
1298    printf("database %s, table %s:\n", _services_DB->filename.c_str(), _services_DB->tablename.c_str());
1299}
1300
1301/* CALLED BY: Reset
1302 * INPUTS: <none>
1303 * OUTPUTS: <none>
1304 *
1305 * DESCRIPTION: Clear and reinitialize the mission array, then reload the configuration file
1306 */
1307void
1308ServiceManagementLayer::ReloadConfiguration()
1309{
1310    LOG("ServiceManagementLayer:: Reloading Configuration.\n");
1311    free(miss);
1312    miss = new Mission[10];
1313    for(int i = 0; i < 10; i++)
1314        miss[i].services = new Service[30];
1315    LoadConfiguration(_SML_Config.c_str(), miss);
1316}
1317
1318/* CALLED BY: constructor
1319 * INPUTS: |SML_Config| Address (either relitive or full) of the XML file containing mission data
1320 *        |mList| Mission array to be modified
1321 * OUTPUTS: <none>
1322 *
1323 * DESCRIPTION: IMPORTANT - See formatting instructions for correct parsing of data
1324 * Can currently handle 10 inputs and 10 outputs per service, but easily expandable
1325 * Also, can handle two layer of nested conditional statements, but could
1326 * be expanded to meet additional needs.
1327 *
1328 * Components assigned to mission during "set active mission" stage so that
1329 * components can still continue to register after the configuration is loaded
1330 */
1331void
1332ServiceManagementLayer::LoadConfiguration(const char *SML_Config, Mission* &mList)
1333{
1334    TiXmlElement *pMission;
1335    TiXmlElement *pService;
1336    TiXmlElement *pChild0, *pChild1, *pChild2, *pChild3, *pChild4;
1337    TiXmlHandle hRoot(0);
1338    printf("ServiceManagementLayer:: Loading Configuration.\n");
1339    TiXmlDocument doc(".");
1340    doc.LoadFile(SML_Config);
1341    bool loadOkay = doc.LoadFile();
1342    if(!loadOkay)
1343        printf("Loading SML configuration failed: %s\n", SML_Config);
1344
1345    TiXmlHandle hDoc(&doc);
1346   
1347    pMission = hDoc.FirstChildElement().Element();
1348
1349    if(!pMission)
1350        printf("No valid root!");
1351
1352    hRoot = TiXmlHandle(pMission);
1353    pService = pMission->FirstChildElement();
1354    int32_t mission_num = 0;
1355    //Iterate through the missions
1356    for(pChild0 = pMission->FirstChildElement(); pChild0 ; \
1357        pChild0 = pChild0->NextSiblingElement())
1358    {
1359    int32_t service_num = 0;
1360    uint16_t cond_array[] = {0, 0, 0};
1361   
1362        for(pChild1  = (pChild0->FirstChildElement())->FirstChildElement(); pChild1; \
1363              pChild1  = pChild1->NextSiblingElement())
1364        {
1365        int32_t conditional_0 = service_num;
1366        for(pChild2 = pChild1->FirstChildElement(); \
1367        pChild2; pChild2 = pChild2->NextSiblingElement())
1368        {
1369        service_num++;
1370        int32_t conditional_1 = service_num;
1371        for(pChild3 = pChild2->FirstChildElement(); \
1372            pChild3; pChild3 = pChild3->NextSiblingElement())
1373        {
1374            service_num++;
1375            int32_t conditional_2 = service_num;
1376            for(pChild4 = pChild3->FirstChildElement(); \
1377                pChild4; pChild4 = pChild4->NextSiblingElement())
1378            {
1379                   service_num++;
1380                if(pChild4->Attribute("name"))
1381                    mList[mission_num].services[service_num].name = pChild4->Attribute("name");
1382                else
1383                    mList[mission_num].services[service_num].name = pChild4->Value();
1384                for(int i = 1; i <= 10; i++) {
1385                    char buffer[9]="input";
1386                    sprintf(buffer, "%s%d", buffer, i);
1387                    //printf("buffer=%s\n", buffer);
1388                    if(pChild4->Attribute(buffer))
1389                        mList[mission_num].services[service_num].input[i-1] = pChild4->Attribute(buffer);
1390                    char buffer2[9]="output";
1391                    sprintf(buffer2, "%s%d", buffer2, i);
1392                    if(pChild4->Attribute(buffer2))
1393                    mList[mission_num].services[service_num].output[i-1] = pChild4->Attribute(buffer2);
1394                }
1395                if(pChild4->Attribute("parameter"))
1396                mList[mission_num].services[service_num].parameter = pChild4->Attribute("parameter");
1397                cond_array[2]++;
1398            }
1399            if(!strcmp(pChild3->Value(), "shell") || conditional_2 != service_num) {
1400                mList[mission_num].services[conditional_2].name = pChild3->Value();
1401            }
1402            else{
1403                mList[mission_num].services[service_num].name = pChild3->Attribute("name");
1404            }
1405            for(int i = 1; i <= 10; i++) {
1406                char buffer[9]="input";
1407                sprintf(buffer, "%s%d", buffer, i);
1408                if(pChild3->Attribute(buffer))
1409                mList[mission_num].services[conditional_2].input[i-1] = pChild3->Attribute(buffer);
1410                char buffer2[9]="output";
1411                sprintf(buffer2, "%s%d", buffer2, i);
1412                if(pChild3->Attribute(buffer2))
1413                mList[mission_num].services[conditional_2].output[i-1] = pChild3->Attribute(buffer2);
1414            }
1415                if(pChild3->Attribute("parameter"))
1416                mList[mission_num].services[conditional_2].parameter = pChild3->Attribute("parameter");
1417            mList[mission_num].services[conditional_2].num_conds = cond_array[2];
1418            cond_array[1]+=cond_array[2]+1;
1419            cond_array[2] = 0;
1420
1421
1422        }
1423        if(!strcmp(pChild2->Value(), "shell") || conditional_1 != service_num) {
1424            mList[mission_num].services[conditional_1].name = pChild2->Value();
1425        }
1426        else{
1427            mList[mission_num].services[service_num].name = pChild2->Attribute("name");
1428        }
1429           for(int i = 1; i <= 10; i++) {
1430            char buffer[9]="input";
1431            sprintf(buffer, "%s%d", buffer, i);
1432            if(pChild2->Attribute(buffer))
1433                mList[mission_num].services[conditional_1].input[i-1] = pChild2->Attribute(buffer);
1434            char buffer2[9]="output";
1435            sprintf(buffer2, "%s%d", buffer2, i);
1436            if(pChild2->Attribute(buffer2))
1437            mList[mission_num].services[conditional_1].output[i-1] = pChild2->Attribute(buffer2);
1438        }
1439            if(pChild2->Attribute("parameter"))
1440            mList[mission_num].services[conditional_1].parameter = pChild2->Attribute("parameter");
1441
1442        mList[mission_num].services[conditional_1].num_conds = cond_array[1];
1443        cond_array[0]+=cond_array[1]+1;
1444        cond_array[1] = 0;
1445        }
1446       
1447        if(!strcmp(pChild1->Value(), "shell") || conditional_0 != service_num) {
1448            mList[mission_num].services[conditional_0].name = pChild1->Value();
1449        }
1450        else{
1451        mList[mission_num].services[conditional_0].name = pChild1->Attribute("name");
1452        }
1453        for(int i = 1; i <= 10; i++) {
1454            char buffer[9]="input";
1455            sprintf(buffer, "%s%d", buffer, i);
1456            if(pChild1->Attribute(buffer))
1457                mList[mission_num].services[conditional_0].input[i-1] = pChild1->Attribute(buffer);
1458            char buffer2[9]="output";
1459            sprintf(buffer2, "%s%d", buffer2, i);
1460            if(pChild1->Attribute(buffer2))
1461            mList[mission_num].services[conditional_0].output[i-1] = pChild1->Attribute(buffer2);
1462        }
1463        if(pChild1->Attribute("parameter"))
1464            mList[mission_num].services[conditional_0].parameter = pChild1->Attribute("parameter");
1465        mList[mission_num].services[conditional_0].num_conds = cond_array[0];
1466            cond_array[0] = 0;
1467        service_num++;
1468    }
1469   
1470    mList[mission_num].numServices = service_num;
1471    mList[mission_num].name = pChild0->Attribute("name");
1472        mList[mission_num].missionID = atoi(pChild0->Attribute("id"));
1473    for(int i = 1; i <= 10; i++) {
1474        char buffer[9]="param";
1475        sprintf(buffer, "%s%d", buffer, i);
1476        if(pChild0->Attribute(buffer)){
1477            mList[mission_num].input[i-1] = pChild0->Attribute(buffer);
1478        }
1479    }
1480    mission_num++;
1481    }
1482    LOG("ServiceManagementLayer:: Done Loading Configuration\n");
1483}
1484
1485/* CALLED BY: MessageHandler
1486 * INPUTS: |ID| The ID number of the engine to be registered
1487 * OUTPUTS: <none>
1488 *
1489 * DESCRIPTION: Sends a registration message onto the shell and sends the ACK back to the component
1490 */
1491void
1492ServiceManagementLayer::RegisterCognitiveEngine(int32_t ID)
1493{
1494    SendMessage(shellSocketFD, "register_engine_cognitive");
1495
1496    LOG("ServiceManagementLayer:: CE registration message forwarded to shell.\n");
1497    char buffer[256];
1498    memset(buffer, 0, 256);
1499    ReadMessage(shellSocketFD, buffer);
1500    SendMessage(CE_List[ID].FD, buffer);
1501
1502    TransferRadioConfiguration(ID);
1503    memset(buffer, 0, 256);
1504    TransferExperience(ID);
1505    memset(buffer, 0, 256);
1506    numberOfCognitiveEngines++;
1507    CE_Present = true;
1508}
1509
1510/* CALLED BY: MessageHandler
1511 * INPUTS: |ID| The ID number of the engine to have it's services deregistered
1512 * OUTPUTS: <none>
1513 *
1514 * DESCRIPTION: Deletes individual services from the DB
1515 * NOTE THAT this function only needs to be called if service deregistration is going
1516 * to be done at a different time than component deregistration; it is handled
1517 * more efficiently and directly during that deregistration process.
1518 */
1519void
1520ServiceManagementLayer::DeregisterServices(int32_t ID)
1521{
1522    char buffer[256];
1523    memset(buffer, 0, 256);
1524    ReadMessage(CE_List[ID].FD, buffer);
1525    _services_DB->command="DELETE FROM ";
1526    _services_DB->command.append(_services_DB->tablename);
1527    _services_DB->command.append(" WHERE ID_Num IN (SELECT ");
1528    char tmp[3];
1529    memset(tmp,0,3);
1530    sprintf(tmp, "%d", ID);
1531    _services_DB->command.append(tmp);
1532    _services_DB->command.append(" FROM ");
1533    _services_DB->command.append(_services_DB->tablename);
1534    _services_DB->command.append(" WHERE Service_Name");
1535    _services_DB->command.append("=='");
1536    _services_DB->command.append(buffer);
1537    _services_DB->command.append("');");
1538    char *errorMsg;
1539    int rc = sqlite3_exec(_services_DB->db, _services_DB->command.c_str(), callback, 0, &errorMsg);
1540    if( rc!=SQLITE_OK && rc!=101 )
1541        fprintf(stderr, "SQL error: %s\n", errorMsg);
1542}
1543
1544/* CALLED BY: MessageHandler
1545 * INPUTS: |ID| The ID number of the engine to have it's services deregistered
1546 * OUTPUTS: <none>
1547 *
1548 * DESCRIPTION: Deletes the contact info for the cognitive engine, forwards a deregistration message to the shell
1549 * Also, deletes the services from the DB
1550 */
1551void
1552ServiceManagementLayer::DeregisterCognitiveEngine(int32_t ID)
1553{
1554    LOG("ServiceManagementLayer:: CE deregistration message forwarded to shell.\n");
1555
1556    numberOfCognitiveEngines--;
1557    if(numberOfCognitiveEngines == 0)
1558        CE_Present = false;
1559
1560    SendMessage(shellSocketFD, "deregister_engine_cognitive");
1561    char buffer[256];
1562    memset(buffer, 0, 256);
1563    ReadMessage(shellSocketFD, buffer);
1564    SendMessage(CE_List[ID].FD, buffer);
1565    if(strcmp("deregister_ack", buffer) != 0) {
1566    ERROR(1, "SML:: Failed to close CE socket\n");
1567    }
1568
1569    //Deregister the services
1570    _services_DB->command="DELETE FROM ";
1571    _services_DB->command.append(_services_DB->tablename);
1572    _services_DB->command.append(" WHERE ");
1573    _services_DB->command.append("ID_Num");
1574    _services_DB->command.append("==");
1575    char tmp[3];
1576    memset(tmp,0,3);
1577    sprintf(tmp, "%d", ID);
1578    _services_DB->command.append(tmp);
1579    _services_DB->command.append(";");
1580    char *errorMsg;
1581    int rc = sqlite3_exec(_services_DB->db, _services_DB->command.c_str(), callback, 0, &errorMsg);
1582    if( rc!=SQLITE_OK && rc!=101 )
1583        fprintf(stderr, "SQL error: %s\n", errorMsg);
1584
1585
1586    CE_List[ID].FD = -1;
1587    CE_List[ID].ID_num = -1;
1588
1589    LOG("Cognitive Radio Shell:: CE Socket closed for engine #%d.\n", ID);
1590}
1591
1592
1593/* CALLED BY: test class
1594 * INPUTS: <none>
1595 * OUTPUTS: <none>
1596 *
1597 * DESCRIPTION: Sets up a server socket and listens for communication on either that or the shell socket
1598 */
1599void
1600ServiceManagementLayer::StartSMLServer()
1601{
1602    //printf("Ready for CE Signal! (registration done)\n");
1603    struct timeval selTimeout;
1604    int32_t running = 1;
1605    int32_t port, rc, new_sd = 1;
1606    int32_t desc_ready = 1;
1607        //If there is, call the MessageHandler with the Shell_Msg code of -1
1608    fd_set sockSet, shellSet;
1609
1610    cogEngSrv = CreateTCPServerSocket(SMLport);
1611    int32_t maxDescriptor = cogEngSrv;
1612
1613    if(InitializeTCPServerPort(cogEngSrv) == -1)
1614        ERROR(1,"Error initializing primary port\n");
1615
1616   
1617    while (running) {
1618        /* Zero socket descriptor vector and set for server sockets */
1619        /* This must be reset every time select() is called */
1620        FD_ZERO(&sockSet);
1621        FD_SET(cogEngSrv, &sockSet);
1622       
1623        for(uint16_t k = 0; k < Current_ID; k++){
1624        if(CE_List[k].ID_num != -1)
1625            FD_SET(CE_List[k].FD, &sockSet);
1626    }
1627        //printf("k=%d, CID=%d\n", k, CE_List[k].FD);
1628
1629        /* Timeout specification */
1630        /* This must be reset every time select() is called */
1631        selTimeout.tv_sec = 0;       /* timeout (secs.) */
1632        selTimeout.tv_usec = 0;            /* 0 microseconds */
1633    //Changed both to zero so that select will check messages from the shell instead of blocking
1634    //when there is no command from the CE's to be processed
1635
1636        //Check if there is a message on the socket waiting to be read
1637    rc = select(maxDescriptor + 1, &sockSet, NULL, NULL, &selTimeout);
1638        //printf("rc=%d\n", rc);
1639        if(rc == 0){
1640            //LOG("No echo requests for %i secs...Server still alive\n", 10);
1641   
1642            FD_ZERO(&shellSet);
1643            FD_SET(shellSocketFD, &shellSet);
1644            selTimeout.tv_sec = 0;
1645            selTimeout.tv_usec = 0;
1646            //Check if there is a message on the shell socket ready to be processed
1647            select(shellSocketFD + 1, &shellSet, NULL, NULL, &selTimeout);
1648            //printf("rc2=%d\n", rc2);
1649        //If there is, call the MessageHandler with the Shell_Msg code of -1
1650        if(FD_ISSET(shellSocketFD, &shellSet)){
1651        //printf("shell_msg, %d\n", rc2);
1652            MessageHandler(-1);}
1653    }
1654        else {
1655            desc_ready = rc;
1656            for(port = 0; port <= maxDescriptor && desc_ready > 0; port++) {
1657                if(FD_ISSET(port, &sockSet)) {
1658                    desc_ready -= 1;
1659
1660                    //Check if request is new or on an existing open descriptor
1661                    if(port == cogEngSrv) {
1662            //If new, assign it a descriptor and give it an ID
1663                        new_sd = AcceptTCPConnection(port);
1664             
1665                        if(new_sd < 0)
1666                            break;
1667
1668                        CE_List[Current_ID].FD = new_sd;
1669            CE_List[Current_ID].ID_num = Current_ID;
1670                        MessageHandler(Current_ID);
1671            Current_ID++;
1672   
1673            FD_SET(new_sd,&sockSet);
1674                        if(new_sd > maxDescriptor)
1675                           maxDescriptor = new_sd;
1676                    }
1677                    else {
1678            //If old, figure out which ID it coresponds to and handle it accordingly
1679            for(uint16_t z = 0; z < Current_ID; z++)
1680            {
1681                if(CE_List[z].FD == port){
1682                    MessageHandler(z);}
1683            }
1684                    }
1685                }
1686            }
1687    }
1688    }       
1689
1690    /* Close sockets */
1691    close(cogEngSrv);
1692
1693    //delete &cogEngSrv;
1694    return;
1695}
Note: See TracBrowser for help on using the browser.