Hi!
I'm trying to write simple pipe redirector. Pipe have to be readed in message mode (it's awesome, i know, but i can't change this). So i should read all message content into buffer and then in one write send it to the service host.
I'm using stream_handle::async_read_some to read message.
When buffer bigger, then message, all ok - message readed and my handler called. Otherwise access violation (Unhandled exception at 0xfeeefeee).
Call stack (buffer size 16 bytes):
* feeefeee()
* boost::asio::detail::win_iocp_io_service::operation::do_completion(unsigned long last_error=0x000000ea, unsigned int bytes_transferred=0x00000010) Line 77 + 0x16 bytes C++
* boost::asio::detail::win_iocp_io_service::do_one(bool block=true, boost::system::error_code & ec={...}) Line 509 C++
* boost::asio::detail::win_iocp_io_service::run(boost::system::error_code & ec={...}) Line 186 + 0xe bytes C++
* boost::asio::io_service::run() Line 58 + 0xf bytes C++
As you can see, transferred byte count exactly 16, and error code EA (MORE_DATA_AVAIL). This is correct response for message mode pipes, so i should repeat read operation in this case until success. So, after getting this error asio tries to call handler, but overlapped structure data is invalid. Sometimes several callbacks get called, but at the end - access violation.
It seems that something wrong with syncronization, but i have only one thread with io_service::run.
Thanks for answers!
--
Andrew Solodovnikov
====================
Sample code to reproduce problem:
#include
#include
#include
#include <memory>
struct PipeServer
{
PipeServer():m_hPipe(INVALID_HANDLE_VALUE), m_sName(NULL)
{
}
BOOL Create(LPCTSTR szName, DWORD dwTimeout = 2000, DWORD dwBufSize = 4096)
{
m_bStop = FALSE;
m_sName = _tcsdup(szName);
if (m_sName)
{
m_dwTimeout = dwTimeout;
m_dwBufSize = dwBufSize;
InternalCreatePipe();
if (m_hPipe != INVALID_HANDLE_VALUE)
return TRUE;
free(m_sName);
m_sName = NULL;
}
return FALSE;
}
BOOL Accept(HANDLE &theStream)
{
if (m_hPipe != INVALID_HANDLE_VALUE)
{
if (ConnectNamedPipe(m_hPipe, NULL) || (GetLastError() == ERROR_PIPE_CONNECTED))
{
if (!m_bStop)
{
theStream = m_hPipe;
InternalCreatePipe();
return TRUE;
}
VERIFY(::DisconnectNamedPipe(m_hPipe));
}
VERIFY(::CloseHandle(m_hPipe));
m_hPipe = INVALID_HANDLE_VALUE;
}
return FALSE;
}
void Stop()
{
::InterlockedExchange((long *)&m_bStop, TRUE);
// possible hang on ::CallNamedPipe - if another pipe connected between this calls...
DWORD dwTemp;
::CallNamedPipe(m_sName, NULL, 0, NULL, 0, &dwTemp, NMPWAIT_NOWAIT);
}
void Close()
{
if (m_hPipe != INVALID_HANDLE_VALUE)
{
::DisconnectNamedPipe(m_hPipe);
::CloseHandle(m_hPipe);
m_hPipe = INVALID_HANDLE_VALUE;
}
free(m_sName);
m_sName = NULL;
}
private:
void InternalCreatePipe()
{
m_hPipe = ::CreateNamedPipe(
m_sName, // pipe name
PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED, // read/write access
PIPE_TYPE_MESSAGE | // message type pipe
PIPE_READMODE_MESSAGE | // message-read mode
PIPE_WAIT, // blocking mode
PIPE_UNLIMITED_INSTANCES, // max. instances
m_dwBufSize, // output buffer size
m_dwBufSize, // input buffer size
m_dwTimeout, // client time-out
NULL // no security attribute
);
}
private:
BOOL m_bStop;
LPTSTR m_sName;
HANDLE m_hPipe;
DWORD m_dwTimeout;
DWORD m_dwBufSize;
};
#define BLOCK_SIZE 64
struct pipe_session
{
pipe_session(boost::asio::io_service& io_service, HANDLE hClientHandle):
m_hClientHandle(io_service, hClientHandle)
{
io_service.post(boost::bind(&pipe_session::client_read, this));
}
private:
void client_read()
{
//boost::asio::async_read(m_hClientHandle,
m_hClientHandle.async_read_some(
boost::asio::buffer(&szBuffer[0], BLOCK_SIZE),
boost::bind(&pipe_session::handle_client_read, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred
));
}
void handle_client_read(const boost::system::error_code& error, size_t nReaded)
{
if (!error)
{
// end iteration
int i = 0;
}
else
{
if (error.value() == ERROR_MORE_DATA)
{
//m_sbClientBuffer.commit(BLOCK_SIZE);
client_read();
}
}
}
private:
boost::asio::windows::stream_handle m_hClientHandle;
char szBuffer[4096];
};
void server_thread(PipeServer *server)
{
boost::asio::io_service io_service;
boost::asio::add_service(io_service, new boost::asio::windows::stream_handle_service(io_service));
std::auto_ptrboost::asio::io_service::work work(new boost::asio::io_service::work(io_service));
boost::thread t(boost::bind(&boost::asio::io_service::run, &io_service));
HANDLE hStream;
while (server->Accept(hStream))
{
new pipe_session(io_service, hStream);
}
work.reset(); // Allow run() to exit.
t.join();
}
int _tmain(int argc, TCHAR* argv[], TCHAR* envp[])
{
PipeServer server;
if (server.Create(_T("\\\\.\\pipe\\test_channel")))
{
try
{
{
boost::thread t(boost::bind(&server_thread, &server));
getch();
HANDLE hService = CreateFile(_T("\\\\.\\pipe\\test_channel"), GENERIC_READ | GENERIC_WRITE, 0, NULL, OPEN_EXISTING, 0, NULL);
if (hService != INVALID_HANDLE_VALUE)
{
DWORD mode = PIPE_READMODE_MESSAGE;
::SetNamedPipeHandleState(hService, &mode, NULL, NULL);
std::vector<char> data(0xa5);
DWORD dw;
::WriteFile(hService, &data.front(), data.size(), &dw, NULL);
}
getch();
server.Stop();
t.join();
}
}
catch (std::exception& e)
{
std::cerr << "Exception: " << e.what() << "\n";
}
server.Close();
}
return 0;
}