#include #include #include SocketConnectionReceiver::SocketConnectionReceiver() : mSocket(-1), mIsRunnable(true) { } /// @brief After a connection has been accepted the SocketConnectionReceiver is initialized with the incoming socket /// @param socket /// @param internalSocketAddress SocketConnectionReceiver::SocketConnectionReceiver(int socket, sockaddr_in internalSocketAddress) : mSocket(socket), mInternalSocketAddress(internalSocketAddress), mIsRunnable(true) { } /// @brief Initialize a SocketConnectionReceiver. /// @param socket /// @param internalSocketAddress void SocketConnectionReceiver::initialize(int socket, sockaddr_in internalSocketAddress) { close(); mSocket=socket; mInternalSocketAddress = internalSocketAddress; } /// @brief The destructor will close the socket SocketConnectionReceiver::~SocketConnectionReceiver() { isRunnable(false); close(); } /// @brief Close the socket and set the mSocket variable to -1 /// @param void SocketConnectionReceiver::close(void) { if(-1==mSocket)return; std::cout << "Closing connection " << mSocket << std::endl; ::close(mSocket); mSocket=-1; } /// @brief The threadFunction will handle the lifetime of the socket connection. When a QUIT is received the socket will be closed and the thread method will exit /// @param data void SocketConnectionReceiver::threadFunction(int data) { String line; while(mIsRunnable) { readLine(line); if(0==line.length()) { std::cout << "Received an empty command" << std::endl; break; } if(line=="QUIT" || line=="quit") { handleQuit(); break; } Block commands = line.split('|'); if(0==commands.size())continue; commands[0].upper(); if(commands[0]=="PUT") { handlePut(commands); } else { std::cout << "Unrecognized command >" << line << "<" << std::endl; } } close(); return; } /// @brief The client wants to put a file. put {filename} {lengthbytes} /// @param commands [0]=PUT, [1]=filename, [2]=total length /// This message contains PUT|{filename}|{lengthbytes} followed by as many PCKT|{length} packets as needed to read the entire file contents and save to disk /// The connection will continue reading packets and saving data to disk until it receives PCKT 0 which signals the end of transmission /// @return true or false bool SocketConnectionReceiver::handlePut(Block &commands) { Profiler profiler; Array receiveBuffer; Block subCommands; String strLine; String fileName; size_t fileLength; size_t totalBytesRead=0; size_t totalPacketsRead=0; size_t availableDiskSpace=0; fileName = commands[1]; fileLength = commands[commands.size()-1].toLong(); std::cout << "PUT" << " " << fileName << " " << Utility::byteCountToString(fileLength,false) << std::endl; availableDiskSpace=Utility::getAvailableDiskSpace(String("./")); std::cout << Utility::byteCountToString(availableDiskSpace,false) << " available space on disk" << std::endl; if(fileLength > availableDiskSpace) { std::cout << "Unsufficient space on disk. Required space " << Utility::byteCountToString(availableDiskSpace,false) << " , available space" << Utility::byteCountToString(availableDiskSpace,false) << std::endl; return false; } FileIO writeFile(fileName, FileIO::ByteOrder::LittleEndian, FileIO::Mode::ReadWrite,FileIO::CreationFlags::CreateAlways); if(!writeFile.isOkay()) { std::cout << "Error creating " << fileName << std::endl; return false; } while(true) { size_t bufferLength=0; try { bufferLength = expectPacket(); } catch(Exception &exception) { std::cerr << exception.toString() << '\n'; break; } if(0==bufferLength) { std::cout << "Client indicated end of data" << std::endl; break; } receiveBuffer.size(bufferLength); size_t bytes_read = read(receiveBuffer); totalBytesRead+=bytes_read; totalPacketsRead++; if(bufferLength!=bytes_read) { std::cout << "Send/Receive size mismatch. The client indicated a data length of " << Utility::formatNumber(bufferLength) << " but a data length of " << Utility::formatNumber(bytes_read) << " was received"; break; } writeFile.write(&receiveBuffer[0], receiveBuffer.size()); double percent = ((double)totalBytesRead / (double)fileLength)*100.00; if(Utility::fmod(percent,10.00,0.0078125)) { double elapsedTimeSeconds = profiler.elapsed()/1000.00; double bytesPerSecond = totalBytesRead / elapsedTimeSeconds; String strBytesPerSecond = Utility::byteCountToString(bytesPerSecond); std::cout << "Received " << Utility::byteCountToString(totalBytesRead,false) << " of " << Utility::byteCountToString(fileLength,false) << " " << percent << " percent " << strBytesPerSecond << std::endl; } } writeFile.close(); std::cout << "Transfer complete. Total packets received " << Utility::formatNumber(totalPacketsRead) << std::endl; std::cout << "Received " << Utility::byteCountToString(totalBytesRead,false) << " in " << Utility::formatNumber(profiler.end()) << "(ms)" << std::endl; return totalBytesRead==fileLength; } /// @brief Read a PCKT message from the socket and return the length. This message is of the form PCKT|{length} and the method returns the second parameter (length) /// @param /// @return size_t SocketConnectionReceiver::expectPacket(void) { Block subCommands; String strLine; readLine(strLine); strLine.makeBlock(subCommands, "|"); if(subCommands.size()!=2) { StringBuffer sb; sb.append("Received an invalid PCKT command"); String str=sb.toString(); std::cout << str << std::endl; throw InvalidOperationException(str); } if(!(subCommands[0]=="PCKT")) { StringBuffer sb; sb.append("Unexpected command sequence."); sb.append("Expected PCKT but received ").append(subCommands[0]); String str = sb.toString(); std::cout << str << std::endl; throw InvalidOperationException(str); } size_t bufferLength = subCommands[1].toULong(); return bufferLength; } /// @brief Read a '\r\n' terminated string from the socket. Return the number of bytes read /// @param line The string to read data into /// @return size_t SocketConnectionReceiver::readLine(String &line) { size_t bytes_read = 0; size_t total_bytes = 0; char ch; char *lpLineBuffer = (char*)line; while (total_bytes < line.lengthBytes() - 1) { bytes_read = ::recv(mSocket, &ch, 1, 0); if (bytes_read <= 0) { return bytes_read; } if (ch=='\r') // Carriage return { recv(mSocket, &ch, 1, 0); // consume the line feed break; } lpLineBuffer[total_bytes++] = ch; } lpLineBuffer[total_bytes] = '\0'; return total_bytes; } /// @brief Read bytes from a socket into the provided buffer. /// @param buffer The buffer must be pre-allocated to a given length. The read operation will continue until all bytes in the buffer have been read /// @return size_t SocketConnectionReceiver::read(Array &buffer) { size_t bytes_read = 0; size_t bytes_to_read = buffer.size(); size_t total_bytes_read =0; char *lpLineBuffer = (char*)&buffer[0]; while(bytes_to_read>0) { bytes_read = ::recv(mSocket, lpLineBuffer, bytes_to_read, 0); lpLineBuffer+=bytes_read; bytes_to_read -= bytes_read; total_bytes_read += bytes_read; } return total_bytes_read; } /// @brief Handle a QUIT message... which just displays /// @param void SocketConnectionReceiver::handleQuit(void) { std::cout << "Received QUIT" << std::endl; }