root/vtcross/branches/wrodgers/ServiceManagementLayer.cpp @ 255

Revision 255, 16.0 KB (checked in by wrodgers, 15 years ago)

Implemented service DB, transfer methods

Line 
1/* Virginia Tech Cognitive Radio Open Source Systems
2 * Virginia Tech, 2009
3 *
4 * LICENSE INFORMATION GOES HERE
5 */
6
7/* When a CE connects, the address of it's local socket is stored and it is assigned an ID number.
8 * Until it sends a message ending communication, any communications from it are processed with
9 * that ID number.  The "sendto" command uses the address data to send messages directly back to
10 * it's local socket, allowing us to get away with a single-socket implementation on the SML-side.
11 *
12 * Services are stored in a SQLite DB by the ID of the CE that registered them.
13 *
14 * Mission support has not been implemented yet, but will be soon.
15 */
16
17/* KNOWN ISSUE: Note that this implementation would be unstable for CE's on any sort of network
18 * since it relies on asyncronous communication occuring linearly and with one partner at a time.
19 * The breakdown occurs specifically when the CE has to send ack messages back to the SML;
20 * since there may be other messages queued on the socket since communication started,
21 * there is no guarentee that the message would be the first up and I have yet to discover a method
22 * for peaking more than 1 item into the message queue without starting to pop messages off first.
23 * Still a work in progress though...
24 */
25
26#include <cstdlib>
27#include <cstring>
28#include <stdint.h>
29
30#include "../../trunk/src/include/vtcross/common.h"
31#include "../../trunk/src/include/vtcross/components.h"
32#include "../../trunk/src/include/vtcross/containers.h"
33#include "../../trunk/src/include/vtcross/debug.h"
34#include "../../trunk/src/include/vtcross/error.h"
35#include "../../trunk/src/include/vtcross/socketcomm.h"
36
37
38#include <cstdlib>
39#include <cstring>
40#include <stdint.h>
41#include <math.h>
42
43
44#include <arpa/inet.h>
45#include <iostream>
46#include <netinet/in.h>
47#include <netdb.h>
48#include <fcntl.h>
49#include <sys/ioctl.h>
50#include <sys/mman.h>
51#include <sys/socket.h>
52#include <sys/types.h>
53#include <sys/wait.h>
54
55#include "../../trunk/src/include/tinyxml/tinyxml.h"
56#include "../../trunk/src/include/tinyxml/tinystr.h"
57#include "sqlite3.h"
58
59typedef struct services_s * services_DB;
60
61struct services_s {
62    char filename[64];
63    char tablename[64];
64    char command[2048];
65    sqlite3 *db;
66    unsigned int num_columns;
67};
68
69services_DB _services_DB;
70
71
72
73
74ServiceManagementLayer::ServiceManagementLayer()
75{
76    LOG("Creating Service Management Layer.\n");
77    shellSocketFD = -1;
78    numberOfCognitiveEngines = 0;
79    CE_Present = false;
80    cogEngSrv = 1;
81
82    Current_ID = 0;
83    LoadConfiguration();
84}
85
86
87ServiceManagementLayer::~ServiceManagementLayer()
88{
89}
90
91
92ServiceManagementLayer::ServiceManagementLayer(const char* serverName, \
93        const char* serverPort)
94{
95    LOG("Creating Service Management Layer.\n");
96
97    ConnectToShell(serverName, serverPort);
98    CE_List = new CE_Reg[10];
99
100    LoadConfiguration();
101    CreateServicesDB();
102}
103
104void
105CreateServicesDB()
106{
107    _services_DB = (services_DB) malloc(sizeof(struct services_s));
108
109    // create database
110
111    // copy filename
112    unsigned int i=0;
113    strcpy(_services_DB->filename, "Services_Table");
114
115    // execute create database command
116    // database handle
117    //_services_DB->db = NULL;
118    sqlite3_open(_services_DB->filename, &(_services_DB->db));
119    char* cols[] = {"ID_Num", "Service_Name"};
120
121    // create table
122
123    // copy tablename
124    strcpy(_services_DB->tablename, "Services");
125
126    // number of columns in the table
127    _services_DB->num_columns = 2;
128
129    // generate command
130    strcpy(_services_DB->command, "CREATE TABLE ");
131    strcat(_services_DB->command, _services_DB->tablename);
132    strcat(_services_DB->command, "(");
133    strcat(_services_DB->command, cols[0]);
134    strcat(_services_DB->command, " INT, ");
135    strcat(_services_DB->command, cols[1]);
136    strcat(_services_DB->command, " TEXT");
137    strcat(_services_DB->command, ");");
138
139    // execute create table command
140    sqlite3_exec(_services_DB->db, _services_DB->command, NULL, 0, NULL);
141}
142
143
144void
145ServiceManagementLayer::SendComponentType()
146{
147    SendMessage(shellSocketFD, "response_sml");
148    LOG("SML responded to GetRemoteComponentType query.\n");
149}
150
151
152void
153ServiceManagementLayer::ConnectToShell(const char* serverName, \
154        const char* serverPort)
155{
156    shellSocketFD = ClientSocket(serverName, serverPort);
157
158    RegisterComponent();
159}
160
161
162void
163ServiceManagementLayer::ShellSignalHandler()
164{
165    char buffer[256];
166
167    memset(buffer, 0, 256);
168    ReadMessage(shellSocketFD, buffer);
169
170    // TODO
171    // If we send integer op codes rather than strings, this process will be
172    // MUCH faster since instead of donig string compares we can simply
173    // switch on the integer value...
174    if(strcmp(buffer, "register_service") == 0) {
175        if(strcmp(buffer, "policy_geo") == 0) {
176        }
177        else if(strcmp(buffer, "policy_time") == 0) {
178        }
179        else if(strcmp(buffer, "policy_spectrum") == 0) {
180        }
181        else if(strcmp(buffer, "policy_spacial") == 0) {
182        }
183    }
184    else if(strcmp(buffer, "deregister_service") == 0) {
185        if(strcmp(buffer, "policy_geo") == 0) {
186        }
187        else if(strcmp(buffer, "policy_time") == 0) {
188        }
189        else if(strcmp(buffer, "policy_spectrum") == 0) {
190        }
191        else if(strcmp(buffer, "policy_spacial") == 0) {
192        }
193    }
194    else if(strcmp(buffer, "query_component_type") == 0) {
195        SendComponentType();
196    }
197    else if(strcmp(buffer, "reset_sml") == 0) {
198        Reset();
199    }
200    else if(strcmp(buffer, "shutdown_sml") == 0) {
201        Shutdown();
202    }
203}
204
205void
206ServiceManagementLayer::CESignalHandler(int32_t ID)
207{
208    char buffer[256];   
209    memset(buffer, 0, 256);     
210    //
211    ReadMessage(cogEngSrv, buffer);
212    if(strcmp(buffer, "register_engine_cognitive") == 0) {
213         RegisterCognitiveEngine(ID);
214    }
215    else if(strcmp(buffer, "register_service") == 0) {
216         ReceiveServices(ID);
217    }
218
219
220}
221
222
223
224void
225ServiceManagementLayer::Shutdown()
226{
227    DeregisterComponent();
228}
229
230
231void
232ServiceManagementLayer::Reset()
233{
234    DeregisterComponent();
235    LoadConfiguration();
236}
237
238
239void
240ServiceManagementLayer::RegisterComponent()
241{
242    SendMessage(shellSocketFD, "register_sml");
243    LOG("ServiceManagementLayer:: Registration message sent.\n");
244}
245
246
247void
248ServiceManagementLayer::DeregisterComponent()
249{
250    SendMessage(shellSocketFD, "deregister_sml");
251    LOG("ServiceManagementLayer:: Deregistration message sent.\n");
252
253    shutdown(shellSocketFD, 2);
254    close(shellSocketFD);
255    shellSocketFD = -1;
256    LOG("ServiceManagementLayer:: Shell socket closed.\n");
257}
258
259/* Streams config data directly from the shell to the CE, and checks
260 * for an "ack" message from the CE after every sent message
261 * to know when to stop communication.
262 */
263void
264ServiceManagementLayer::TransferRadioConfiguration(int32_t ID)
265{
266    struct timeval selTimeout;
267    fd_set sockSet;
268    int32_t rc = 0;
269    char buffer[256];
270    //Send data until the CE sends an ACK message back
271    while(rc == 0){
272        memset(buffer, 0, 256);
273        //Receive data from Shell
274        ReadMessage(shellSocketFD, buffer);
275        //Send data to CE
276        sendto(cogEngSrv, buffer, 256, NULL, (struct sockaddr*) CE_List[ID].sock_ptr, CE_List[ID].sock_len);
277        FD_ZERO(&sockSet);
278        FD_SET(cogEngSrv, &sockSet);
279        selTimeout.tv_sec = 0;
280        selTimeout.tv_usec = 0;
281        //Check if there is a message on the CE socket ready to be processed
282        rc = select(cogEngSrv + 1, &sockSet, NULL, NULL, &selTimeout);
283    }
284    memset(buffer, 0, 256);
285    //Once there is a message on the CE socket, read it and forward it on to the shell
286    ReadMessage(cogEngSrv, buffer);
287    SendMessage(shellSocketFD, buffer);
288}
289
290//Simmilar to TransferRadioConfig, just with Experience data
291void
292ServiceManagementLayer::TransferExperience(int32_t ID)
293{
294    struct timeval selTimeout;
295    fd_set sockSet;
296    int32_t rc = 0;
297    char buffer[256];
298    //Send data until the CE sends an ACK message back
299    while(rc == 0){
300        memset(buffer, 0, 256);
301        //Receive data from Shell
302        ReadMessage(shellSocketFD, buffer);
303        //Send data to CE
304        sendto(cogEngSrv, buffer, 256, NULL, (struct sockaddr*) CE_List[ID].sock_ptr, CE_List[ID].sock_len);
305        FD_ZERO(&sockSet);
306        FD_SET(cogEngSrv, &sockSet);
307        selTimeout.tv_sec = 0;
308        selTimeout.tv_usec = 0;
309        //Check if there is a message on the CE socket ready to be processed
310        rc = select(cogEngSrv + 1, &sockSet, NULL, NULL, &selTimeout);
311    }
312    memset(buffer, 0, 256);
313    //Once there is a message on the CE socket, read it and forward it on to the shell
314    ReadMessage(cogEngSrv, buffer);
315    SendMessage(shellSocketFD, buffer);
316}
317
318
319void
320ServiceManagementLayer::ReceiveServices(int32_t ID)
321{
322    char buffer[256];
323    memset(buffer, 0, 256);
324    ReadMessage(cogEngSrv, buffer);
325    char* cols[] = {"ID_Num", "Service_Name"};
326   
327    // generate command
328    //printf("%s\n", _services_DB->command);
329    strcpy(_services_DB->command, "insert into ");
330    strcat(_services_DB->command, _services_DB->tablename);
331    strcat(_services_DB->command, " (");
332    strcat(_services_DB->command, cols[0]);
333    strcat(_services_DB->command, ", ");
334    strcat(_services_DB->command, cols[1]);
335    strcat(_services_DB->command, ") ");
336    strcat(_services_DB->command, " values(");
337    sprintf(_services_DB->command, "%s%d", _services_DB->command, ID);
338    strcat(_services_DB->command, ", ");
339    strcat(_services_DB->command, buffer);
340    strcat(_services_DB->command, ");");
341   
342    //printf("search command: %s\n", _services_DB->command);
343    // execute add command
344    sqlite3_exec(_services_DB->db, _services_DB->command, NULL, 0, NULL);
345
346}
347
348
349void
350ServiceManagementLayer::SetActiveMission()
351{
352}
353
354
355void
356ServiceManagementLayer::ListServices()
357{
358    // generate commandi
359    strcpy(_services_DB->command, "select ");
360    strcat(_services_DB->command, _services_DB->tablename);
361    strcat(_services_DB->command, ".* from ");
362    strcat(_services_DB->command, _services_DB->tablename);
363    strcat(_services_DB->command, ";");
364
365    // execute print (select all)  command
366    sqlite3_exec(_services_DB->db, _services_DB->command, NULL, 0, NULL);
367    printf("database %s, table %s:\n", _services_DB->filename, _services_DB->tablename);
368}
369
370
371void
372ServiceManagementLayer::ReloadConfiguration()
373{
374    LOG("ServiceManagementLayer:: Reloading Configuration.\n");
375}
376
377
378void
379ServiceManagementLayer::LoadConfiguration()
380{
381    LOG("ServiceManagementLayer:: Loading Configuration.\n");
382}
383
384void
385ServiceManagementLayer::RegisterCognitiveEngine(int32_t ID)
386{
387    SendMessage(shellSocketFD, "register_engine_cognitive");
388    LOG("ServiceManagementLayer:: CE registration message forwarded to shell.\n");
389
390    TransferRadioConfiguration(ID);
391    TransferExperience(ID);
392
393    numberOfCognitiveEngines++;
394    CE_Present = true;
395}
396
397void
398ServiceManagementLayer::DeregisterServices(int32_t ID)
399{
400    char str_buffer[64];
401    strcpy(_services_DB->command, "delete ");
402    strcat(_services_DB->command, _services_DB->tablename);
403    strcat(_services_DB->command, ".* from ");
404    strcat(_services_DB->command, _services_DB->tablename);
405    strcat(_services_DB->command, " where ");
406    strcat(_services_DB->command, "ID_Num");
407    strcat(_services_DB->command, "==");
408    sprintf(str_buffer, "%d;", ID);
409    strcat(_services_DB->command, str_buffer);
410    sqlite3_exec(_services_DB->db, _services_DB->command, NULL, 0, NULL);
411}
412
413
414void
415ServiceManagementLayer::DeregisterCognitiveEngine(int32_t ID)
416{
417    LOG("ServiceManagementLayer:: CE deregistration message forwarded to shell.\n");
418
419    numberOfCognitiveEngines--;
420    if(numberOfCognitiveEngines == 0)
421        CE_Present = false;
422
423    SendMessage(shellSocketFD, "deregister_engine_cognitive");
424    char buffer[256];
425    memset(buffer, 0, 256);
426    ReadMessage(shellSocketFD, buffer);
427    sendto(cogEngSrv, buffer, 256, NULL, (struct sockaddr*) CE_List[ID].sock_ptr, CE_List[ID].sock_len);
428    if(strcmp("deregister_ack", buffer) != 0) {
429        ERROR(1, "SML:: Failed to close CE socket\n");
430    }
431    CE_List[ID].sock_ptr = NULL;
432    CE_List[ID].ID_num = -1;
433
434    LOG("Cognitive Radio Shell:: Socket closed.\n");
435}
436
437/* The basic idea here is that when any time a message is sent over the socket, the sender's address is sent too.
438 * We take advantage of this in order to uniquely identify the component for future communications since
439 * there is only one socket on the SML-side.
440 */
441void
442ServiceManagementLayer::StartSMLServer()
443{
444    struct timeval selTimeout;
445    int32_t running = 1;
446    int32_t port, rc, new_sd = 1;
447    int32_t desc_ready = 1;
448    int32_t timeout = 10;
449    fd_set sockSet;
450
451    cogEngSrv = CreateTCPServerSocket(CEPort);
452    int32_t maxDescriptor = cogEngSrv;
453
454    if(InitializeTCPServerPort(cogEngSrv) == -1)
455        ERROR(1,"Error initializing primary port\n");
456
457    while (running) {
458        /* Zero socket descriptor vector and set for server sockets */
459        /* This must be reset every time select() is called */
460        FD_ZERO(&sockSet);
461        FD_SET(cogEngSrv, &sockSet);
462
463        /* Timeout specification */
464        /* This must be reset every time select() is called */
465        selTimeout.tv_sec = 0;       /* timeout (secs.) */
466        selTimeout.tv_usec = 0;            /* 0 microseconds */
467        //Changed both to zero so that select will check messages from the shell instead of blocking
468        //when there is no command from the CE's to be processed
469
470        //Check if there is a message on the socket waiting to be read
471        rc = select(cogEngSrv + 1, &sockSet, NULL, NULL, &selTimeout);
472        if(rc == 0){
473            //If not, check instead for messages from the shell
474            ShellSignalHandler();       
475        }
476        else {
477            //If so, process the address information and pass the component ID on to the handler
478            char buffer[256];
479            memset(buffer, 0, 256);
480            struct sockaddr_in sock_addr;
481            memset((void *) &sock_addr, 0, sizeof(sock_addr));
482            socklen_t sock_len;
483            //Peak at the next message on the socket to determine its address of orgin
484            recvfrom(cogEngSrv, buffer, 256, MSG_PEEK, (struct sockaddr*) &sock_addr, &sock_len);
485            bool found = false;
486            //Is it from a previously logged address?
487            for(int i = 0; i < Current_ID; i++){
488                if((*CE_List[i].sock_ptr).sin_addr.s_addr == sock_addr.sin_addr.s_addr){
489                        //If so, pass the ID number of that component into the signal handler to process the message
490                        found=true;
491                        CESignalHandler(i);
492                }
493            }
494            //If not, log the address and pass the ID number of that component into the signal handler
495            if(!found){
496                CE_List[Current_ID].sock_ptr = &sock_addr;
497                CE_List[Current_ID].ID_num = Current_ID;
498                CE_List[Current_ID].sock_len = sock_len;
499                CESignalHandler(Current_ID);
500                Current_ID++;
501            }
502        }
503    }       
504
505    /* Close sockets */
506    close(cogEngSrv);
507
508    return;
509}
510
511/*
512
513
514
515rc = select(cogEngSrv + 1, &sockSet, NULL, NULL, &selTimeout);
516        if(rc == 0){
517            LOG("No echo requests for %i secs...Server still alive\n", timeout);
518            ShellSignalHandler();       
519        }
520        else {
521            desc_ready = rc;
522
523            for(port = 0; port <= maxDescriptor && desc_ready > 0; port++) {
524                if(FD_ISSET(port, &sockSet)) {
525                    desc_ready -= 1;
526
527                    //Check if request is new or on an existing open descriptor
528                    if(port == cogEngSrv) {
529                        do {
530                            new_sd = AcceptTCPConnection(port);
531                            if(new_sd < 0)
532                                break;
533                           
534                            CE_List[Current_ID].FD = new_sd;
535                            CE_List[Current_ID].IDNum = Current_ID;
536                            HandleMessage(CE_List[Current_ID]);
537                            Current_ID++;
538       
539                            FD_SET(new_sd,&sockSet);
540                            if(new_sd > maxDescriptor)
541                                maxDescriptor = new_sd;
542                            //LOG("New incoming connection - %i\n\n",new_sd);
543                        } while(new_sd != -1);
544                    }
545                    else {
546                        //LOG("Request on already open descriptor.\n\n");
547                        for(int16_t i = 0; i < Current_ID; i++)
548                        {
549                                if(CE_List[i].FD == port)
550                                        HandleMessage(CE_List[i]);
551                        }
552                    }
553                }
554            }
555    }
556*/
557
558
Note: See TracBrowser for help on using the browser.