#include #include #include SocketConnectionReceiver::SocketConnectionReceiver(int socket, sockaddr_in internalSocketAddress) : mSocket(socket), mInternalSocketAddress(internalSocketAddress) { } SocketConnectionReceiver::~SocketConnectionReceiver() { } void SocketConnectionReceiver::close(void) { if(-1==mSocket)return; std::cout << "Closing connection " << mSocket << std::endl; ::close(mSocket); mSocket=-1; } void SocketConnectionReceiver::threadFunction(int data) { String line; while(true) { readLine(line); if(0==line.length()) { std::cout << "Received an empty command" << std::endl; break; } if(line=="QUIT" || line=="quit")break; Block commands = line.split('|'); if(0==commands.size())continue; commands[0].upper(); if(commands[0]=="PUT") { handlePut(commands); } else { std::cout << "Unrecognized command >" << line << "<" << std::endl; } } close(); return; } /// @brief The client wants to put a file. put {filename} {lengthbytes} /// @param commands [0]=PUT, [1]=filename, [2]=total length /// This will read PCKT {length}. /// The connection will continue reading and writing until it receives PCKT 0 /// @return bool SocketConnectionReceiver::handlePut(Block &commands) { Profiler profiler; Array receiveBuffer; Block subCommands; String strLine; String fileName; size_t fileLength; size_t totalBytesRead=0; fileName = commands[1]; fileLength = commands[commands.size()-1].toLong(); std::cout << "PUT" << " " << fileName << " " << fileLength << std::endl; FileIO writeFile(fileName, FileIO::ByteOrder::LittleEndian, FileIO::Mode::ReadWrite,FileIO::CreationFlags::CreateAlways); if(!writeFile.isOkay()) { std::cout << "Error creating " << fileName << std::endl; return false; } while(true) { size_t bufferLength=0; try { bufferLength = expectPacket(); } catch(Exception &exception) { std::cerr << exception.toString() << '\n'; break; } if(0==bufferLength) { std::cout << "Client indicated end of data" << std::endl; break; } receiveBuffer.size(bufferLength); size_t bytes_read = read(receiveBuffer); totalBytesRead+=bytes_read; if(bufferLength!=bytes_read) { std::cout << "Send/Receive size mismatch. The client indicated a data length of " << bufferLength << " but a data length of " << bytes_read << " was received"; break; } writeFile.write(&receiveBuffer[0], receiveBuffer.size()); double percent = ((double)totalBytesRead / (double)fileLength)*100.00; if(Utility::fmod(percent,10.00,0.0078125)) { double elapsedTimeSeconds = profiler.elapsed()/1000.00; double bytesPerSecond = totalBytesRead / elapsedTimeSeconds; String strBytesPerSecond = Utility::bytesTransferredToString(bytesPerSecond); std::cout << "Transferred " << totalBytesRead << " of " << fileLength << " " << percent << " percent " << strBytesPerSecond << std::endl; } } std::cout << "Transfer complete" << std::endl; std::cout << "Received " << totalBytesRead << " in " << Utility::formatNumber(profiler.end()) << "(ms)" << std::endl; writeFile.close(); return totalBytesRead==fileLength; } size_t SocketConnectionReceiver::expectPacket(void) { Block subCommands; String strLine; readLine(strLine); strLine.makeBlock(subCommands, "|"); if(subCommands.size()!=2) { StringBuffer sb; sb.append("Received an invalid PCKT command"); String str=sb.toString(); std::cout << str << std::endl; throw new InvalidOperationException(str); } if(!(subCommands[0]=="PCKT")) { StringBuffer sb; sb.append("Unexpected command sequence."); sb.append("Expected PCKT but received ").append(subCommands[0]); String str = sb.toString(); std::cout << str << std::endl; throw new InvalidOperationException(str); } size_t bufferLength = subCommands[1].toULong(); return bufferLength; } size_t SocketConnectionReceiver::readLine(String &line) { size_t bytes_read = 0; size_t total_bytes = 0; char ch; char *lpLineBuffer = (char*)line; while (total_bytes < line.lengthBytes() - 1) { bytes_read = ::recv(mSocket, &ch, 1, 0); if (bytes_read <= 0) { return bytes_read; } if (ch=='\r') // Carriage return { recv(mSocket, &ch, 1, 0); // consume the line feed break; } lpLineBuffer[total_bytes++] = ch; } lpLineBuffer[total_bytes] = '\0'; return total_bytes; } size_t SocketConnectionReceiver::read(Array &buffer) { size_t bytes_read = 0; size_t bytes_to_read = buffer.size(); size_t total_bytes_read =0; char *lpLineBuffer = (char*)&buffer[0]; while(bytes_to_read>0) { bytes_read = ::recv(mSocket, lpLineBuffer, bytes_to_read, 0); lpLineBuffer+=bytes_read; bytes_to_read -= bytes_read; total_bytes_read += bytes_read; } return total_bytes_read; }