root/vtcross/branches/wrodgers/src/service_management_layer/ServiceManagementLayer_old_2.cpp @ 277

Revision 277, 16.5 KB (checked in by wrodgers, 15 years ago)

Updating SML functionality

Line 
1/* Virginia Tech Cognitive Radio Open Source Systems
2 * Virginia Tech, 2009
3 *
4 * LICENSE INFORMATION GOES HERE
5 */
6
7/* When a CE connects, the address of it's local socket is stored and it is assigned an ID number.
8 * Until it sends a message ending communication, any communications from it are processed with
9 * that ID number.  The "sendto" command uses the address data to send messages directly back to
10 * it's local socket, allowing us to get away with a single-socket implementation on the SML-side.
11 *
12 * Services are stored in a SQLite DB by the ID of the CE that registered them.
13 *
14 * Mission support has not been implemented yet, but will be soon.
15 */
16
17/* KNOWN ISSUE: Note that this implementation would be unstable for CE's on any sort of network
18 * since it relies on asyncronous communication occuring linearly and with one partner at a time.
19 * The breakdown occurs specifically when the CE has to send ack messages back to the SML;
20 * since there may be other messages queued on the socket since communication started,
21 * there is no guarentee that the message would be the first up and I have yet to discover a method
22 * for peaking more than 1 item into the message queue without starting to pop messages off first.
23 * Still a work in progress though...
24 */
25
26#include <cstdlib>
27#include <cstring>
28#include <stdint.h>
29
30#include "../../trunk/src/include/vtcross/common.h"
31#include "../../trunk/src/include/vtcross/components.h"
32#include "../../trunk/src/include/vtcross/containers.h"
33#include "../../trunk/src/include/vtcross/debug.h"
34#include "../../trunk/src/include/vtcross/error.h"
35#include "../../trunk/src/include/vtcross/socketcomm.h"
36
37
38#include <cstdlib>
39#include <cstring>
40#include <stdint.h>
41#include <math.h>
42
43
44#include <arpa/inet.h>
45#include <iostream>
46#include <netinet/in.h>
47#include <netdb.h>
48#include <fcntl.h>
49#include <sys/ioctl.h>
50#include <sys/mman.h>
51#include <sys/socket.h>
52#include <sys/types.h>
53#include <sys/wait.h>
54
55#include "../../trunk/src/include/tinyxml/tinyxml.h"
56#include "../../trunk/src/include/tinyxml/tinystr.h"
57#include "sqlite3.h"
58
59typedef struct services_s * services_DB;
60
61struct services_s {
62    char filename[64];
63    char tablename[64];
64    char command[2048];
65    sqlite3 *db;
66    unsigned int num_columns;
67};
68
69services_DB _services_DB;
70
71
72
73
74ServiceManagementLayer::ServiceManagementLayer()
75{
76    LOG("Creating Service Management Layer.\n");
77    shellSocketFD = -1;
78    numberOfCognitiveEngines = 0;
79    CE_Present = false;
80    cogEngSrv = 1;
81
82    Current_ID = 0;
83    LoadConfiguration();
84}
85
86
87ServiceManagementLayer::~ServiceManagementLayer()
88{
89}
90
91
92ServiceManagementLayer::ServiceManagementLayer(const char* serverName, \
93        const char* serverPort)
94{
95    LOG("Creating Service Management Layer.\n");
96
97    ConnectToShell(serverName, serverPort);
98    CE_List = new CE_Reg[10];
99
100    mList = new Mission[10];
101
102    LoadConfiguration();
103    CreateServicesDB();
104}
105
106void
107ServiceManagementLayer::CreateServicesDB()
108{
109    _services_DB = (services_DB) malloc(sizeof(struct services_s));
110
111    // create database
112
113    // copy filename
114    unsigned int i=0;
115    strcpy(_services_DB->filename, "Services_Table");
116
117    // execute create database command
118    // database handle
119    //_services_DB->db = NULL;
120    sqlite3_open(_services_DB->filename, &(_services_DB->db));
121    char* cols[] = {"ID_Num", "Service_Name"};
122
123    // create table
124
125    // copy tablename
126    strcpy(_services_DB->tablename, "Services");
127
128    // number of columns in the table
129    _services_DB->num_columns = 2;
130
131    // generate command
132    strcpy(_services_DB->command, "CREATE TABLE ");
133    strcat(_services_DB->command, _services_DB->tablename);
134    strcat(_services_DB->command, "(");
135    strcat(_services_DB->command, cols[0]);
136    strcat(_services_DB->command, " INT, ");
137    strcat(_services_DB->command, cols[1]);
138    strcat(_services_DB->command, " TEXT");
139    strcat(_services_DB->command, ");");
140
141    // execute create table command
142    sqlite3_exec(_services_DB->db, _services_DB->command, NULL, 0, NULL);
143}
144
145
146void
147ServiceManagementLayer::SendComponentType()
148{
149    SendMessage(shellSocketFD, "response_sml");
150    LOG("SML responded to GetRemoteComponentType query.\n");
151}
152
153
154void
155ServiceManagementLayer::ConnectToShell(const char* serverName, \
156        const char* serverPort)
157{
158    shellSocketFD = ClientSocket(serverName, serverPort);
159
160    RegisterComponent();
161}
162
163
164void
165ServiceManagementLayer::ShellSignalHandler()
166{
167    char buffer[256];
168
169    memset(buffer, 0, 256);
170    ReadMessage(shellSocketFD, buffer);
171
172    // TODO
173    // If we send integer op codes rather than strings, this process will be
174    // MUCH faster since instead of donig string compares we can simply
175    // switch on the integer value...
176    if(strcmp(buffer, "register_service") == 0) {
177        if(strcmp(buffer, "policy_geo") == 0) {
178        }
179        else if(strcmp(buffer, "policy_time") == 0) {
180        }
181        else if(strcmp(buffer, "policy_spectrum") == 0) {
182        }
183        else if(strcmp(buffer, "policy_spacial") == 0) {
184        }
185    }
186    else if(strcmp(buffer, "deregister_service") == 0) {
187        if(strcmp(buffer, "policy_geo") == 0) {
188        }
189        else if(strcmp(buffer, "policy_time") == 0) {
190        }
191        else if(strcmp(buffer, "policy_spectrum") == 0) {
192        }
193        else if(strcmp(buffer, "policy_spacial") == 0) {
194        }
195    }
196    else if(strcmp(buffer, "query_component_type") == 0) {
197        SendComponentType();
198    }
199    else if(strcmp(buffer, "reset_sml") == 0) {
200        Reset();
201    }
202    else if(strcmp(buffer, "shutdown_sml") == 0) {
203        Shutdown();
204    }
205}
206
207void
208ServiceManagementLayer::CESignalHandler(int32_t ID)
209{
210    char buffer[256];   
211    memset(buffer, 0, 256);     
212    //
213    ReadMessage(cogEngSrv, buffer);
214    if(strcmp(buffer, "register_engine_cognitive") == 0) {
215         RegisterCognitiveEngine(ID);
216    }
217    else if(strcmp(buffer, "register_service") == 0) {
218         ReceiveServices(ID);
219    }
220    else if(strcmp(buffer, "send_component_type") == 0) {
221         SendComponentType();
222    }
223
224
225}
226
227
228
229void
230ServiceManagementLayer::Shutdown()
231{
232    DeregisterComponent();
233}
234
235
236void
237ServiceManagementLayer::Reset()
238{
239    DeregisterComponent();
240    LoadConfiguration();
241}
242
243
244void
245ServiceManagementLayer::RegisterComponent()
246{
247    SendMessage(shellSocketFD, "register_sml");
248    LOG("ServiceManagementLayer:: Registration message sent.\n");
249}
250
251
252void
253ServiceManagementLayer::DeregisterComponent()
254{
255    SendMessage(shellSocketFD, "deregister_sml");
256    LOG("ServiceManagementLayer:: Deregistration message sent.\n");
257
258    shutdown(shellSocketFD, 2);
259    close(shellSocketFD);
260    shellSocketFD = -1;
261    LOG("ServiceManagementLayer:: Shell socket closed.\n");
262}
263
264/* Streams config data directly from the shell to the CE, and checks
265 * for an "ack" message from the CE after every sent message
266 * to know when to stop communication.
267 */
268void
269ServiceManagementLayer::TransferRadioConfiguration(int32_t ID)
270{
271    struct timeval selTimeout;
272    fd_set sockSet;
273    int32_t rc = 0;
274    char buffer[256];
275    //Send data until the CE sends an ACK message back
276    while(rc == 0){
277        memset(buffer, 0, 256);
278        //Receive data from Shell
279        ReadMessage(shellSocketFD, buffer);
280        //Send data to CE
281        sendto(cogEngSrv, buffer, 256, NULL, (struct sockaddr*) CE_List[ID].sock_ptr, CE_List[ID].sock_len);
282        FD_ZERO(&sockSet);
283        FD_SET(cogEngSrv, &sockSet);
284        selTimeout.tv_sec = 0;
285        selTimeout.tv_usec = 0;
286        //Check if there is a message on the CE socket ready to be processed
287        rc = select(cogEngSrv + 1, &sockSet, NULL, NULL, &selTimeout);
288    }
289    memset(buffer, 0, 256);
290    //Once there is a message on the CE socket, read it and forward it on to the shell
291    ReadMessage(cogEngSrv, buffer);
292    SendMessage(shellSocketFD, buffer);
293}
294
295//Simmilar to TransferRadioConfig, just with Experience data
296void
297ServiceManagementLayer::TransferExperience(int32_t ID)
298{
299    struct timeval selTimeout;
300    fd_set sockSet;
301    int32_t rc = 0;
302    char buffer[256];
303    //Send data until the CE sends an ACK message back
304    while(rc == 0){
305        memset(buffer, 0, 256);
306        //Receive data from Shell
307        ReadMessage(shellSocketFD, buffer);
308        //Send data to CE
309        sendto(cogEngSrv, buffer, 256, NULL, (struct sockaddr*) CE_List[ID].sock_ptr, CE_List[ID].sock_len);
310        FD_ZERO(&sockSet);
311        FD_SET(cogEngSrv, &sockSet);
312        selTimeout.tv_sec = 0;
313        selTimeout.tv_usec = 0;
314        //Check if there is a message on the CE socket ready to be processed
315        rc = select(cogEngSrv + 1, &sockSet, NULL, NULL, &selTimeout);
316    }
317    memset(buffer, 0, 256);
318    //Once there is a message on the CE socket, read it and forward it on to the shell
319    ReadMessage(cogEngSrv, buffer);
320    SendMessage(shellSocketFD, buffer);
321}
322
323
324void
325ServiceManagementLayer::ReceiveServices(int32_t ID)
326{
327    char buffer[256];
328    memset(buffer, 0, 256);
329    ReadMessage(cogEngSrv, buffer);
330    char* cols[] = {"ID_Num", "Service_Name"};
331   
332    // generate command
333    //printf("%s\n", _services_DB->command);
334    strcpy(_services_DB->command, "insert into ");
335    strcat(_services_DB->command, _services_DB->tablename);
336    strcat(_services_DB->command, " (");
337    strcat(_services_DB->command, cols[0]);
338    strcat(_services_DB->command, ", ");
339    strcat(_services_DB->command, cols[1]);
340    strcat(_services_DB->command, ") ");
341    strcat(_services_DB->command, " values(");
342    sprintf(_services_DB->command, "%s%d", _services_DB->command, ID);
343    strcat(_services_DB->command, ", ");
344    strcat(_services_DB->command, buffer);
345    strcat(_services_DB->command, ");");
346   
347    //printf("search command: %s\n", _services_DB->command);
348    // execute add command
349    sqlite3_exec(_services_DB->db, _services_DB->command, NULL, 0, NULL);
350
351}
352
353
354void
355ServiceManagementLayer::SetActiveMission()
356{
357}
358
359
360void
361ServiceManagementLayer::ListServices()
362{
363    // generate commandi
364    strcpy(_services_DB->command, "select ");
365    strcat(_services_DB->command, _services_DB->tablename);
366    strcat(_services_DB->command, ".* from ");
367    strcat(_services_DB->command, _services_DB->tablename);
368    strcat(_services_DB->command, ";");
369
370    // execute print (select all)  command
371    sqlite3_exec(_services_DB->db, _services_DB->command, NULL, 0, NULL);
372    printf("database %s, table %s:\n", _services_DB->filename, _services_DB->tablename);
373}
374
375
376void
377ServiceManagementLayer::ReloadConfiguration()
378{
379    LOG("ServiceManagementLayer:: Reloading Configuration.\n");
380}
381
382
383void
384ServiceManagementLayer::LoadConfiguration(const char* SMLConfig)
385{
386    TiXmlHandle hRoot(0);
387    LOG("ServiceManagementLayer:: Loading Configuration.\n");
388    TiXmlDocument doc( SMLConfig );
389    bool loadOkay = doc.LoadFile();
390    if(!loadOkay)
391        ERROR(1,"Loading SML configuration failed: %s\n", SMLConfig);
392
393    TiXmlHandle hDoc(&doc);
394   
395    pElem = hDoc.FirstChildElement().Element();
396
397    if(!pElem)
398        ERROR(1, "No valid root!");
399
400    hRoot = TiXmlHandle(pElem);
401}
402
403void
404ServiceManagementLayer::RegisterCognitiveEngine(int32_t ID)
405{
406    SendMessage(shellSocketFD, "register_engine_cognitive");
407    LOG("ServiceManagementLayer:: CE registration message forwarded to shell.\n");
408
409    TransferRadioConfiguration(ID);
410    TransferExperience(ID);
411
412    numberOfCognitiveEngines++;
413    CE_Present = true;
414}
415
416void
417ServiceManagementLayer::DeregisterServices(int32_t ID)
418{
419    char str_buffer[64];
420    strcpy(_services_DB->command, "delete ");
421    strcat(_services_DB->command, _services_DB->tablename);
422    strcat(_services_DB->command, ".* from ");
423    strcat(_services_DB->command, _services_DB->tablename);
424    strcat(_services_DB->command, " where ");
425    strcat(_services_DB->command, "ID_Num");
426    strcat(_services_DB->command, "==");
427    sprintf(str_buffer, "%d;", ID);
428    strcat(_services_DB->command, str_buffer);
429    sqlite3_exec(_services_DB->db, _services_DB->command, NULL, 0, NULL);
430}
431
432
433void
434ServiceManagementLayer::DeregisterCognitiveEngine(int32_t ID)
435{
436    LOG("ServiceManagementLayer:: CE deregistration message forwarded to shell.\n");
437
438    numberOfCognitiveEngines--;
439    if(numberOfCognitiveEngines == 0)
440        CE_Present = false;
441
442    SendMessage(shellSocketFD, "deregister_engine_cognitive");
443    char buffer[256];
444    memset(buffer, 0, 256);
445    ReadMessage(shellSocketFD, buffer);
446    sendto(cogEngSrv, buffer, 256, NULL, (struct sockaddr*) CE_List[ID].sock_ptr, CE_List[ID].sock_len);
447    if(strcmp("deregister_ack", buffer) != 0) {
448        ERROR(1, "SML:: Failed to close CE socket\n");
449    }
450    CE_List[ID].sock_ptr = NULL;
451    CE_List[ID].ID_num = -1;
452
453    LOG("Cognitive Radio Shell:: Socket closed.\n");
454}
455
456/* The basic idea here is that when any time a message is sent over the socket, the sender's address is sent too.
457 * We take advantage of this in order to uniquely identify the component for future communications since
458 * there is only one socket on the SML-side.
459 */
460void
461ServiceManagementLayer::StartSMLServer()
462{
463    struct timeval selTimeout;
464    int32_t running = 1;
465    int32_t port, rc, new_sd = 1;
466    int32_t desc_ready = 1;
467    int32_t timeout = 10;
468    fd_set sockSet;
469
470    cogEngSrv = CreateTCPServerSocket(CEPort);
471    int32_t maxDescriptor = cogEngSrv;
472
473    if(InitializeTCPServerPort(cogEngSrv) == -1)
474        ERROR(1,"Error initializing primary port\n");
475
476    while (running) {
477        /* Zero socket descriptor vector and set for server sockets */
478        /* This must be reset every time select() is called */
479        FD_ZERO(&sockSet);
480        FD_SET(cogEngSrv, &sockSet);
481
482        /* Timeout specification */
483        /* This must be reset every time select() is called */
484        selTimeout.tv_sec = 0;       /* timeout (secs.) */
485        selTimeout.tv_usec = 0;            /* 0 microseconds */
486        //Changed both to zero so that select will check messages from the shell instead of blocking
487        //when there is no command from the CE's to be processed
488
489        //Check if there is a message on the socket waiting to be read
490        rc = select(cogEngSrv + 1, &sockSet, NULL, NULL, &selTimeout);
491        if(rc == 0){
492            //If not, check instead for messages from the shell
493            ShellSignalHandler();       
494        }
495        else {
496            //If so, process the address information and pass the component ID on to the handler
497            char buffer[256];
498            memset(buffer, 0, 256);
499            struct sockaddr_in sock_addr;
500            memset((void *) &sock_addr, 0, sizeof(sock_addr));
501            socklen_t sock_len;
502            //Peak at the next message on the socket to determine its address of orgin
503            recvfrom(cogEngSrv, buffer, 256, MSG_PEEK, (struct sockaddr*) &sock_addr, &sock_len);
504            bool found = false;
505            //Is it from a previously logged address?
506            for(int i = 0; i < Current_ID; i++){
507                if((*CE_List[i].sock_ptr).sin_addr.s_addr == sock_addr.sin_addr.s_addr){
508                        //If so, pass the ID number of that component into the signal handler to process the message
509                        found=true;
510                        CESignalHandler(i);
511                }
512            }
513            //If not, log the address and pass the ID number of that component into the signal handler
514            if(!found){
515                CE_List[Current_ID].sock_ptr = &sock_addr;
516                CE_List[Current_ID].ID_num = Current_ID;
517                CE_List[Current_ID].sock_len = sock_len;
518                CESignalHandler(Current_ID);
519                Current_ID++;
520            }
521        }
522    }       
523
524    /* Close sockets */
525    close(cogEngSrv);
526
527    return;
528}
529
530/*
531
532
533
534rc = select(cogEngSrv + 1, &sockSet, NULL, NULL, &selTimeout);
535        if(rc == 0){
536            LOG("No echo requests for %i secs...Server still alive\n", timeout);
537            ShellSignalHandler();       
538        }
539        else {
540            desc_ready = rc;
541
542            for(port = 0; port <= maxDescriptor && desc_ready > 0; port++) {
543                if(FD_ISSET(port, &sockSet)) {
544                    desc_ready -= 1;
545
546                    //Check if request is new or on an existing open descriptor
547                    if(port == cogEngSrv) {
548                        do {
549                            new_sd = AcceptTCPConnection(port);
550                            if(new_sd < 0)
551                                break;
552                           
553                            CE_List[Current_ID].FD = new_sd;
554                            CE_List[Current_ID].IDNum = Current_ID;
555                            HandleMessage(CE_List[Current_ID]);
556                            Current_ID++;
557       
558                            FD_SET(new_sd,&sockSet);
559                            if(new_sd > maxDescriptor)
560                                maxDescriptor = new_sd;
561                            //LOG("New incoming connection - %i\n\n",new_sd);
562                        } while(new_sd != -1);
563                    }
564                    else {
565                        //LOG("Request on already open descriptor.\n\n");
566                        for(int16_t i = 0; i < Current_ID; i++)
567                        {
568                                if(CE_List[i].FD == port)
569                                        HandleMessage(CE_List[i]);
570                        }
571                    }
572                }
573            }
574    }
575*/
576
577
Note: See TracBrowser for help on using the browser.